TL;DR
- Scenario: Offline data warehouse collects logs to write to HDFS, Flume Agent throughput, stability and landing partition time accuracy need to be guaranteed simultaneously
- Conclusion: Prioritize using batch parameters with Channel capacity/transaction to improve throughput, use JVM heap parameters to solve OOM, use interceptor to write business time into header to correct partition path
- Output: Reusable Flume 1.9.0 tuning checklist + OOM quick fix commands + custom Interceptor (logtime header) implementation template
Flume Optimization Configuration
Flume is a distributed, reliable and efficient data collection, aggregation and transmission system, widely used in big data ecosystem. To improve Flume’s performance and stability, optimization configuration is crucial.
Batch Processing
- Parameter: batchSize
- Function: Controls the number of events transmitted per batch in Flume.
- Configuration Suggestions:
- Source to Channel: Adjust based on Source throughput and Channel throughput capability, recommended value 100-1000
- Channel to Sink: Adjust based on Sink processing capability and target system write performance, recommended value 500-5000
Compressed Transmission
- Parameter: compressionType
- Function: Compress events before transmission, reducing network bandwidth consumption.
- Supported Compression Types: gzip, snappy, lz4, etc.
- Configuration Suggestions: Choose appropriate compression type based on whether target system supports decompression
Source Optimization
Taildir Source
- Parameters: batchSize and fileHeader
- batchSize: Set number of events read from file at one time
- fileHeader: Whether to add file name to event header, recommended to enable for subsequent processing
Kafka Source
- Parameters: kafka.consumer.timeout.ms and fetch.message.max.bytes
- kafka.consumer.timeout.ms: Set Kafka consumer timeout for reading data, usually 100-500ms
- fetch.message.max.bytes: Set maximum message size per read, default usually 1MB, can be adjusted according to business scenario
Channel Optimization
Memory Channel
- Parameters: capacity and transactionCapacity
- capacity: Maximum number of events allowed in Channel
- transactionCapacity: Maximum number of events allowed in single transaction
File Channel
- Parameters: checkpointDir and dataDirs
- checkpointDir: Directory for storing Channel state
- dataDirs: Directory for storing event data, suggest setting multiple disk paths to improve IO performance
- Configuration Suggestions: Ensure disk IO performance is sufficient, avoid bottlenecks
Flume Error Resolution
When storing log files to logs directory, if OOM occurs, it’s because by default Flume JVM maximum allocation is 20M, this value is too small and needs adjustment.
Solution: Add the following to $FLUME_HOME/conf/flume-env.sh:
export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
Flume Memory Parameter Settings and Optimization:
- According to log data volume, JVM heap generally should be set to 4G or higher
- -Xms -Xmx best set to the same to reduce performance impact from memory jitter
Custom Interceptor
The previous Flume Agent configuration used local time, which may cause incorrect data storage path. To solve this problem, need to use custom interceptor.
Custom Interceptor Principle
Custom interceptor principle:
- Custom interceptor needs to extend Flume’s Interceptor
- Event consists of header and body (received string)
- Get header and body
- Get time from body, convert timestamp to string yyyy-MM-dd
- Put converted string into header
Custom Interceptor Implementation
Custom interceptor implementation:
- Get event’s header
- Get event’s body
- Parse body to get JSON string
- Parse JSON string to get timestamp
- Convert timestamp to string yyyy-MM-dd
- Put converted string into header
- Return event
Import Dependencies
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.23</version>
</dependency>
</dependencies>
Write Code
package icu.wzk;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class CustomerInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// Process each record here
String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
// Get Event's Header
Map<String, String> headerMap = event.getHeaders();
// Parse Body to get JSON string
String[] bodyArr = eventBody.split("\\s+");
try {
String jsonStr = bodyArr[6];
// Parse JSON string to get timestamp
JSONObject jsonObject = JSON.parseObject(jsonStr);
String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
// Convert timestamp to string yyyy-MM-dd
// Convert string to Long
long timestampLong = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestampLong);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// Put converted string into header
headerMap.put("logtime", date);
event.setHeaders(headerMap);
} catch (Exception e) {
headerMap.put("logtime", "Unknown");
event.setHeaders(headerMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> lstEvent = new ArrayList<>();
for (Event event : list) {
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}
@Override
public void configure(Context context) {
}
}
}