TL;DR
- Scenario: Multi-directory log collection, mark Header by log type, land to HDFS partition by event time
- Conclusion: Taildir Source + custom Interceptor extract JSON timestamp to write Header, HDFS Sink routes and rolls based on Header
- Output: A reusable “log collection→Header marking→partition by day landing→production nohup start” implementation template
Log Data Collection Summary
Summary First, Below is the Main Content
In Apache Flume, Interceptor is a key component of the data flow pipeline, allowing modification or filtering of events before they enter the Flume Channel. Through custom interceptors, you can implement specific business logic such as data filtering, field addition or modification, format conversion, etc. Custom Interceptor refers to user-written Java code to extend Flume’s functionality based on requirements, rather than using default interceptors.
- Use taildir source to monitor specified multiple directories, can add different Headers to logs from different directories
- On each directory can use regex to match multiple files
- Use custom interceptor, main function is to get timestamp from JSON string, add to event’s header
- hdfs sink uses information in event header to write data (control file writing location)
- hdfs file rolling methods (based on file size, based on event count, based on time)
- Adjust Flume JVM memory allocation
Working Principle
-
Event Generation (Source): Flume collects data from various external systems through Source component. Common Source types include Exec Source, Spooling Directory Source, NetCat Source, Kafka Source. Source encapsulates collected raw data as Flume Event object, containing event header and payload data in byte array form.
-
Interceptor (Interceptor): Before events enter Channel, can do preprocessing through chain interceptors: Timestamp Interceptor, Host Interceptor, Regex Filtering Interceptor, Search-and-Replace Interceptor, Custom Interceptor. Interceptors can be used singly or multiple in series, executed in configuration order.
-
Transmission (Channel): Acts as event buffer to ensure reliable transmission. Main types: Memory Channel, File Channel, JDBC Channel, Kafka Channel. Channel provides transaction support, ensuring reliable event delivery between Source and Sink.
-
Consumption (Sink): Responsible for writing events to target system, common implementations: HDFS Sink, HBase Sink, Kafka Sink, File Roll Sink, Avro Sink. Sink supports batch writing and failure retry mechanism, can configure corresponding parameters based on target system characteristics.
Development and Deployment Notes
- Dependency Management: Developing custom interceptor requires depending on Flume’s core libraries, such as flume-ng-core and flume-ng-sdk.
- Testing: Test interceptor logic locally, ensure functionality is correct and performance meets expectations.
- Deployment: Upload JAR file to Flume Agent’s lib directory and restart Flume service.
- Performance Monitoring: Custom interceptor may affect Flume performance, especially when interceptor logic is complex. Suggest monitoring resource usage in production environment.
Collect Startup Logs and Event Logs
Custom Interceptor
After coding and packaging, upload to server, place in $FLUME_HOME/lib
Write Code
package icu.wzk;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// Get Event's body
String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
// Get Event's Header
Map<String, String> headerMap = event.getHeaders();
// Parse body to get JSON string
String[] bodyArr = eventBody.split("\\s+");
try {
String jsonStr = bodyArr[6];
String timestampStr = "";
JSONObject jsonObject = JSON.parseObject(jsonStr);
if (headerMap.getOrDefault("logtype", "").equals("start")) {
// Get startup timestamp
jsonObject.getJSONObject("app_active").getString("time");
} else if (headerMap.getOrDefault("logtype", "").equals("event")) {
// Get timestamp of first record in event log
JSONArray jsonArray = jsonObject.getJSONArray("wzk_event");
if (jsonArray.size() > 0) {
timestampStr = jsonArray.getJSONObject(0).getString("time");
}
}
// Convert timestamp to yyyy-MM-dd
long timestamp = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// Put converted string into Header
headerMap.put("logtime", date);
event.setHeaders(headerMap);
} catch (Exception e) {
headerMap.put("logtime", "Unknown");
event.setHeaders(headerMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
List<Event> lstEvent = new ArrayList<>();
for (Event event: events){
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
Package Project
mvn clean package
Start Test
flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console
Test Result
Write to log file:
vim /opt/wzk/logs/start/test.log
The written content is as follows:
2020-08-02 18:19:32.959 [main] INFO icu.wzk.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1596342840284},"attr":{"area":"大庆","uid":"2F10092A2","app_v":"1.1.15","event_type":"common","device_id":"1FB872-9A1002","os_type":"2.8","channel":"TB","language":"chinese","brand":"iphone-8"}}
Write to log file:
vim /opt/wzk/logs/event/test.log
The written content is as follows:
2020-08-02 18:20:11.877 [main] INFO icu.wzk.ecommerce.AppEvent - {"wzk_event":[{"name":"goods_detail_loading","json":{"entry":"1","goodsid":"0","loading_time":"93","action":"3","staytime":"56","showtype":"2"},"time":1596343881690},{"name":"loading","json":{"loading_time":"15","action":"3","loading_type":"3","type":"1"},"time":1596356988428},{"name":"notification","json":{"action":"1","type":"2"},"time":1596374167278},{"name":"favorites","json":{"course_id":1,"id":0,"userid":0},"time":1596350933962}],"attr":{"area":"长治","uid":"2F10092A4","app_v":"1.1.14","event_type":"common","device_id":"1FB872-9A1004","os_type":"0.5.0","channel":"QL","language":"chinese","brand":"xiaomi-0"}}
View Result
Console has output results, HDFS also output corresponding content.
Production Environment
In production environment, recommend using:
nohup flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console > dev/null 2>&1 &
- nohup: This command allows user to exit account, close terminal, and corresponding process continues running
- /dev/null: Represents Linux empty device file, all content written to this file is lost, also called black hole
- Standard input 0, obtains input from keyboard /proc/self/fd/0
- Standard output 1, outputs to screen (console) /proc/self/fd/1
- Error output 2, outputs to screen (console) /proc/self/fd/2
- /dev/null: Standard output 1 redirected to /dev/null, at this time standard output doesn’t exist, nowhere can find output content
- 2>&1: Error output will output to same place as standard output
- /dev/null 2>&1: Will not output any information to console, also won’t output any information to file
Error Quick Reference
| Symptom | Root Cause Analysis | Fix |
|---|---|---|
| HDFS path not partitioned by date, logtime empty or Unknown | Interceptor didn’t write timestampStr correctly/exception caught | Check Flume console, add logs in Interceptor, check if Header contains logtime; fix timestampStr assignment logic; don’t swallow key info in exceptions |
| Startup log branch always can’t get timestamp | Start branch only called getString but didn’t assign to timestampStr | Review start branch code path in intercept; write startup timestamp to timestampStr |
| ArrayIndexOutOfBounds/JSON parsing failure | After eventBody.split, fixed to take bodyArr[6], strongly coupled to log format | Grab one original log body, print split array length and content; use more robust extraction method |
| NumberFormatException (timestamp parsing failed) | timestampStr empty/not pure number/field path inconsistent | Print timestampStr, JSON structure; confirm time field type; add null check and type compatibility |
| Date not as expected (cross timezone/cross day) | ZoneId.systemDefault() affected by server timezone | Compare server timezone with business timezone; spot check against original timestamp; fix business timezone |
| Startup command parameters not taking effect/log level wrong | JVM parameter spelling error | Compare with Flume official parameter names; observe if still outputs default level; correct JVM parameter key name |
| After nohup still has output or command reports file not found | Redirection path wrong (dev/null missing ”/“) | Execute nohup command directly to observe error; change to >/dev/null 2>&1 |
| Too many HDFS small files/rolling too frequent | rollSize/rollCount/rollInterval configuration unreasonable | Count HDFS directory file count and average size; set reasonable rolling threshold based on throughput |
| Memory spike/throughput drop | Memory Channel or interceptor parsing overhead too large | Observe Flume JVM, GC, Channel backoff; adjust Channel type and capacity; optimize JSON parsing |