TL;DR

  • Scenario: Flume TAILDIR collects startup logs and event logs simultaneously, requires partitioning by in-log time, by source to different directories to land on HDFS.
  • Conclusion: Use custom Interceptor to extract logtime, use filegroups headers or interceptor to supplement logtype, HDFS path uses %{header} for dynamic routing.
  • Output: A reusable configuration template + common pitfalls quick reference (class loading, placeholders, rolling strategy, permissions and positionFile).

Custom Interceptor

Upload the packaged jar to Flume’s lib directory: /opt/servers/apache-flume-1.9.0-bin/lib/

Test Effect

Create configuration file and start Flume agent, use telnet to test data transmission.

Collect Startup Logs (Using Custom Interceptor)

Configuration File

Create new configuration file, configure TAILDIR source, custom interceptor, memory channel and HDFS sink.

Key Configuration

  • Add custom interceptor to source
  • Remove timestamp a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • Write files according to logtime in header

Collect Startup Logs and Event Logs

This system needs to collect two types of logs: startup logs and event logs. Different logs are placed in different directories, to get all logs at once need to monitor multiple directories.

Overall Approach

  • taildir monitors multiple directories
  • Modify custom interceptor, add different markers to data from different sources
  • HDFS Sink writes files according to markers

Agent Introduction

Flume is a distributed, reliable system for collecting, aggregating and transmitting large amounts of log data.

Core Components of Agent

Source

Source is the starting point of data flow, responsible for receiving external data. Supports: Avro Source, Syslog Source, Exec Source, HTTP Source, Spooling Directory Source, etc.

Channel

Channel is the data buffer area of Agent, used to temporarily store events between Source and Sink. Common types: Memory Channel, File Channel, Kafka Channel.

Sink

Sink is the endpoint of data flow, responsible for passing events to downstream storage or processing system. Supports: HDFS Sink, Kafka Sink, Elasticsearch Sink, Logger Sink, Avro Sink.

Agent Configuration

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# custom interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# Configure file rolling method (file size 32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# Number of events flushed to hdfs
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Explanation:

  • filegroups: Specify filegroups, can have multiple, separated by spaces (taildir source can simultaneously monitor files in multiple directories)
  • headers.filegroups.headerKey: Add header key to Event, different filegroups can configure different values.

Error Quick Reference

SymptomRoot Cause AnalysisFix
Flume starts with “Unknown/Unrecognized option” error or parameters not taking effectAgent startup parameters wrongCheck flume-ng agent output usage/Unknown option; use --name a1; logger use -Dflume.root.logger=INFO,console
Startup reports ClassNotFound: custom interceptor not foundjar not placed in Flume classpath or missing dependenciesConfirm if jar is in $FLUME_HOME/lib; put in /opt/servers/apache-flume-1.9.0-bin/lib/; prefer using jar-with-dependencies
Interceptor works but %{logtime} is empty, HDFS path partition abnormalHeader not written correctly (parsing failed/field missing/regex not matched)Add LoggerSink or Debug log before Sink to print headers; set fallback for missing time in interceptor
HDFS files not generated for a long time, or generated very slowlyrollInterval/idleTimeout/rollCount configured cause “only size-based rolling”, not triggered in low trafficSet hdfs.rollInterval=60 or hdfs.idleTimeout=60 in low traffic scenarios
HDFS write reports Permission denied / lease errorHDFS directory permission/user identity mismatch; Kerberos/proxy user not configuredAuthorize target directory for Flume running user; enable security parameters like hdfs.kerberosPrincipal according to cluster
TAILDIR not collecting or only collected once, after restart starts from beginning/not continuingpositionFile path not writable/corrupted/multiple agents using same fileCheck if positionFile updates; ensure directory writable; each agent has independent positionFile
When collecting multiple directories, logtype lost or mixedheaders configuration wrong or interceptor overwrittenCompare headers output in LoggerSink between start/event; ensure filegroups = f1 f2 matches headers.f1.* / headers.f2.* completely