TL;DR

  • Scenario: Multi-directory log collection, mark Header by log type, land to HDFS partition by event time
  • Conclusion: Taildir Source + custom Interceptor extract JSON timestamp to write Header, HDFS Sink routes and rolls based on Header
  • Output: A reusable “log collection→Header marking→partition by day landing→production nohup start” implementation template

Log Data Collection Summary

Summary First, Below is the Main Content

In Apache Flume, Interceptor is a key component of the data flow pipeline, allowing modification or filtering of events before they enter the Flume Channel. Through custom interceptors, you can implement specific business logic such as data filtering, field addition or modification, format conversion, etc. Custom Interceptor refers to user-written Java code to extend Flume’s functionality based on requirements, rather than using default interceptors.

  • Use taildir source to monitor specified multiple directories, can add different Headers to logs from different directories
  • On each directory can use regex to match multiple files
  • Use custom interceptor, main function is to get timestamp from JSON string, add to event’s header
  • hdfs sink uses information in event header to write data (control file writing location)
  • hdfs file rolling methods (based on file size, based on event count, based on time)
  • Adjust Flume JVM memory allocation

Working Principle

  • Event Generation (Source): Flume collects data from various external systems through Source component. Common Source types include Exec Source, Spooling Directory Source, NetCat Source, Kafka Source. Source encapsulates collected raw data as Flume Event object, containing event header and payload data in byte array form.

  • Interceptor (Interceptor): Before events enter Channel, can do preprocessing through chain interceptors: Timestamp Interceptor, Host Interceptor, Regex Filtering Interceptor, Search-and-Replace Interceptor, Custom Interceptor. Interceptors can be used singly or multiple in series, executed in configuration order.

  • Transmission (Channel): Acts as event buffer to ensure reliable transmission. Main types: Memory Channel, File Channel, JDBC Channel, Kafka Channel. Channel provides transaction support, ensuring reliable event delivery between Source and Sink.

  • Consumption (Sink): Responsible for writing events to target system, common implementations: HDFS Sink, HBase Sink, Kafka Sink, File Roll Sink, Avro Sink. Sink supports batch writing and failure retry mechanism, can configure corresponding parameters based on target system characteristics.

Development and Deployment Notes

  • Dependency Management: Developing custom interceptor requires depending on Flume’s core libraries, such as flume-ng-core and flume-ng-sdk.
  • Testing: Test interceptor logic locally, ensure functionality is correct and performance meets expectations.
  • Deployment: Upload JAR file to Flume Agent’s lib directory and restart Flume service.
  • Performance Monitoring: Custom interceptor may affect Flume performance, especially when interceptor logic is complex. Suggest monitoring resource usage in production environment.

Collect Startup Logs and Event Logs

Custom Interceptor

After coding and packaging, upload to server, place in $FLUME_HOME/lib

Write Code

package icu.wzk;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // Get Event's body
        String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
        // Get Event's Header
        Map<String, String> headerMap = event.getHeaders();
        // Parse body to get JSON string
        String[] bodyArr = eventBody.split("\\s+");
        try {
            String jsonStr = bodyArr[6];
            String timestampStr = "";
            JSONObject jsonObject = JSON.parseObject(jsonStr);
            if (headerMap.getOrDefault("logtype", "").equals("start")) {
                // Get startup timestamp
                jsonObject.getJSONObject("app_active").getString("time");
            } else if (headerMap.getOrDefault("logtype", "").equals("event")) {
                // Get timestamp of first record in event log
                JSONArray jsonArray = jsonObject.getJSONArray("wzk_event");
                if (jsonArray.size() > 0) {
                    timestampStr = jsonArray.getJSONObject(0).getString("time");
                }
            }
            // Convert timestamp to yyyy-MM-dd
            long timestamp = Long.parseLong(timestampStr);
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
            Instant instant = Instant.ofEpochMilli(timestamp);
            LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
            String date = formatter.format(localDateTime);
            // Put converted string into Header
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);
        } catch (Exception e) {
            headerMap.put("logtime", "Unknown");
            event.setHeaders(headerMap);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> lstEvent = new ArrayList<>();
        for (Event event: events){
            Event outEvent = intercept(event);
            if (outEvent != null) {
                lstEvent.add(outEvent);
            }
        }
        return lstEvent;
    }

    @Override
    public void close() {
    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}

Package Project

mvn clean package

Start Test

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console

Test Result

Write to log file:

vim /opt/wzk/logs/start/test.log

The written content is as follows:

2020-08-02 18:19:32.959 [main] INFO icu.wzk.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1596342840284},"attr":{"area":"大庆","uid":"2F10092A2","app_v":"1.1.15","event_type":"common","device_id":"1FB872-9A1002","os_type":"2.8","channel":"TB","language":"chinese","brand":"iphone-8"}}

Write to log file:

vim /opt/wzk/logs/event/test.log

The written content is as follows:

2020-08-02 18:20:11.877 [main] INFO icu.wzk.ecommerce.AppEvent - {"wzk_event":[{"name":"goods_detail_loading","json":{"entry":"1","goodsid":"0","loading_time":"93","action":"3","staytime":"56","showtype":"2"},"time":1596343881690},{"name":"loading","json":{"loading_time":"15","action":"3","loading_type":"3","type":"1"},"time":1596356988428},{"name":"notification","json":{"action":"1","type":"2"},"time":1596374167278},{"name":"favorites","json":{"course_id":1,"id":0,"userid":0},"time":1596350933962}],"attr":{"area":"长治","uid":"2F10092A4","app_v":"1.1.14","event_type":"common","device_id":"1FB872-9A1004","os_type":"0.5.0","channel":"QL","language":"chinese","brand":"xiaomi-0"}}

View Result

Console has output results, HDFS also output corresponding content.

Production Environment

In production environment, recommend using:

nohup flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console > dev/null 2>&1 &
  • nohup: This command allows user to exit account, close terminal, and corresponding process continues running
  • /dev/null: Represents Linux empty device file, all content written to this file is lost, also called black hole
  • Standard input 0, obtains input from keyboard /proc/self/fd/0
  • Standard output 1, outputs to screen (console) /proc/self/fd/1
  • Error output 2, outputs to screen (console) /proc/self/fd/2
  • /dev/null: Standard output 1 redirected to /dev/null, at this time standard output doesn’t exist, nowhere can find output content
  • 2>&1: Error output will output to same place as standard output
  • /dev/null 2>&1: Will not output any information to console, also won’t output any information to file

Error Quick Reference

SymptomRoot Cause AnalysisFix
HDFS path not partitioned by date, logtime empty or UnknownInterceptor didn’t write timestampStr correctly/exception caughtCheck Flume console, add logs in Interceptor, check if Header contains logtime; fix timestampStr assignment logic; don’t swallow key info in exceptions
Startup log branch always can’t get timestampStart branch only called getString but didn’t assign to timestampStrReview start branch code path in intercept; write startup timestamp to timestampStr
ArrayIndexOutOfBounds/JSON parsing failureAfter eventBody.split, fixed to take bodyArr[6], strongly coupled to log formatGrab one original log body, print split array length and content; use more robust extraction method
NumberFormatException (timestamp parsing failed)timestampStr empty/not pure number/field path inconsistentPrint timestampStr, JSON structure; confirm time field type; add null check and type compatibility
Date not as expected (cross timezone/cross day)ZoneId.systemDefault() affected by server timezoneCompare server timezone with business timezone; spot check against original timestamp; fix business timezone
Startup command parameters not taking effect/log level wrongJVM parameter spelling errorCompare with Flume official parameter names; observe if still outputs default level; correct JVM parameter key name
After nohup still has output or command reports file not foundRedirection path wrong (dev/null missing ”/“)Execute nohup command directly to observe error; change to >/dev/null 2>&1
Too many HDFS small files/rolling too frequentrollSize/rollCount/rollInterval configuration unreasonableCount HDFS directory file count and average size; set reasonable rolling threshold based on throughput
Memory spike/throughput dropMemory Channel or interceptor parsing overhead too largeObserve Flume JVM, GC, Channel backoff; adjust Channel type and capacity; optimize JSON parsing