TL;DR

  • Scenario: Offline data warehouse collects logs to write to HDFS, Flume Agent throughput, stability and landing partition time accuracy need to be guaranteed simultaneously
  • Conclusion: Prioritize using batch parameters with Channel capacity/transaction to improve throughput, use JVM heap parameters to solve OOM, use interceptor to write business time into header to correct partition path
  • Output: Reusable Flume 1.9.0 tuning checklist + OOM quick fix commands + custom Interceptor (logtime header) implementation template

Flume Optimization Configuration

Flume is a distributed, reliable and efficient data collection, aggregation and transmission system, widely used in big data ecosystem. To improve Flume’s performance and stability, optimization configuration is crucial.

Batch Processing

  • Parameter: batchSize
  • Function: Controls the number of events transmitted per batch in Flume.
  • Configuration Suggestions:
    • Source to Channel: Adjust based on Source throughput and Channel throughput capability, recommended value 100-1000
    • Channel to Sink: Adjust based on Sink processing capability and target system write performance, recommended value 500-5000

Compressed Transmission

  • Parameter: compressionType
  • Function: Compress events before transmission, reducing network bandwidth consumption.
  • Supported Compression Types: gzip, snappy, lz4, etc.
  • Configuration Suggestions: Choose appropriate compression type based on whether target system supports decompression

Source Optimization

Taildir Source

  • Parameters: batchSize and fileHeader
  • batchSize: Set number of events read from file at one time
  • fileHeader: Whether to add file name to event header, recommended to enable for subsequent processing

Kafka Source

  • Parameters: kafka.consumer.timeout.ms and fetch.message.max.bytes
  • kafka.consumer.timeout.ms: Set Kafka consumer timeout for reading data, usually 100-500ms
  • fetch.message.max.bytes: Set maximum message size per read, default usually 1MB, can be adjusted according to business scenario

Channel Optimization

Memory Channel

  • Parameters: capacity and transactionCapacity
  • capacity: Maximum number of events allowed in Channel
  • transactionCapacity: Maximum number of events allowed in single transaction

File Channel

  • Parameters: checkpointDir and dataDirs
  • checkpointDir: Directory for storing Channel state
  • dataDirs: Directory for storing event data, suggest setting multiple disk paths to improve IO performance
  • Configuration Suggestions: Ensure disk IO performance is sufficient, avoid bottlenecks

Flume Error Resolution

When storing log files to logs directory, if OOM occurs, it’s because by default Flume JVM maximum allocation is 20M, this value is too small and needs adjustment.

Solution: Add the following to $FLUME_HOME/conf/flume-env.sh:

export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"

Flume Memory Parameter Settings and Optimization:

  • According to log data volume, JVM heap generally should be set to 4G or higher
  • -Xms -Xmx best set to the same to reduce performance impact from memory jitter

Custom Interceptor

The previous Flume Agent configuration used local time, which may cause incorrect data storage path. To solve this problem, need to use custom interceptor.

Custom Interceptor Principle

Custom interceptor principle:

  • Custom interceptor needs to extend Flume’s Interceptor
  • Event consists of header and body (received string)
  • Get header and body
  • Get time from body, convert timestamp to string yyyy-MM-dd
  • Put converted string into header

Custom Interceptor Implementation

Custom interceptor implementation:

  1. Get event’s header
  2. Get event’s body
  3. Parse body to get JSON string
  4. Parse JSON string to get timestamp
  5. Convert timestamp to string yyyy-MM-dd
  6. Put converted string into header
  7. Return event

Import Dependencies

<dependencies>
  <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.1.23</version>
  </dependency>
</dependencies>

Write Code

package icu.wzk;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class CustomerInterceptor implements Interceptor {

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Process each record here
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
        // Get Event's Header
        Map<String, String> headerMap = event.getHeaders();
        // Parse Body to get JSON string
        String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            // Parse JSON string to get timestamp
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
            // Convert timestamp to string yyyy-MM-dd
            // Convert string to Long
            long timestampLong = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestampLong);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // Put converted string into header
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "Unknown");
            event.setHeaders(headerMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event : list) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomerInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}