TL;DR
- Scenario: Flume TAILDIR collects startup logs and event logs simultaneously, requires partitioning by in-log time, by source to different directories to land on HDFS.
- Conclusion: Use custom Interceptor to extract logtime, use filegroups headers or interceptor to supplement logtype, HDFS path uses %{header} for dynamic routing.
- Output: A reusable configuration template + common pitfalls quick reference (class loading, placeholders, rolling strategy, permissions and positionFile).
Custom Interceptor
Upload the packaged jar to Flume’s lib directory: /opt/servers/apache-flume-1.9.0-bin/lib/
Test Effect
Create configuration file and start Flume agent, use telnet to test data transmission.
Collect Startup Logs (Using Custom Interceptor)
Configuration File
Create new configuration file, configure TAILDIR source, custom interceptor, memory channel and HDFS sink.
Key Configuration
- Add custom interceptor to source
- Remove timestamp a1.sinks.k1.hdfs.useLocalTimeStamp = true
- Write files according to logtime in header
Collect Startup Logs and Event Logs
This system needs to collect two types of logs: startup logs and event logs. Different logs are placed in different directories, to get all logs at once need to monitor multiple directories.
Overall Approach
- taildir monitors multiple directories
- Modify custom interceptor, add different markers to data from different sources
- HDFS Sink writes files according to markers
Agent Introduction
Flume is a distributed, reliable system for collecting, aggregating and transmitting large amounts of log data.
Core Components of Agent
Source
Source is the starting point of data flow, responsible for receiving external data. Supports: Avro Source, Syslog Source, Exec Source, HTTP Source, Spooling Directory Source, etc.
Channel
Channel is the data buffer area of Agent, used to temporarily store events between Source and Sink. Common types: Memory Channel, File Channel, Kafka Channel.
Sink
Sink is the endpoint of data flow, responsible for passing events to downstream storage or processing system. Supports: HDFS Sink, Kafka Sink, Elasticsearch Sink, Logger Sink, Avro Sink.
Agent Configuration
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# custom interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# Configure file rolling method (file size 32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# Number of events flushed to hdfs
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Explanation:
- filegroups: Specify filegroups, can have multiple, separated by spaces (taildir source can simultaneously monitor files in multiple directories)
- headers.filegroups.headerKey: Add header key to Event, different filegroups can configure different values.
Error Quick Reference
| Symptom | Root Cause Analysis | Fix |
|---|---|---|
| Flume starts with “Unknown/Unrecognized option” error or parameters not taking effect | Agent startup parameters wrong | Check flume-ng agent output usage/Unknown option; use --name a1; logger use -Dflume.root.logger=INFO,console |
| Startup reports ClassNotFound: custom interceptor not found | jar not placed in Flume classpath or missing dependencies | Confirm if jar is in $FLUME_HOME/lib; put in /opt/servers/apache-flume-1.9.0-bin/lib/; prefer using jar-with-dependencies |
Interceptor works but %{logtime} is empty, HDFS path partition abnormal | Header not written correctly (parsing failed/field missing/regex not matched) | Add LoggerSink or Debug log before Sink to print headers; set fallback for missing time in interceptor |
| HDFS files not generated for a long time, or generated very slowly | rollInterval/idleTimeout/rollCount configured cause “only size-based rolling”, not triggered in low traffic | Set hdfs.rollInterval=60 or hdfs.idleTimeout=60 in low traffic scenarios |
| HDFS write reports Permission denied / lease error | HDFS directory permission/user identity mismatch; Kerberos/proxy user not configured | Authorize target directory for Flume running user; enable security parameters like hdfs.kerberosPrincipal according to cluster |
| TAILDIR not collecting or only collected once, after restart starts from beginning/not continuing | positionFile path not writable/corrupted/multiple agents using same file | Check if positionFile updates; ensure directory writable; each agent has independent positionFile |
| When collecting multiple directories, logtype lost or mixed | headers configuration wrong or interceptor overwritten | Compare headers output in LoggerSink between start/event; ensure filegroups = f1 f2 matches headers.f1.* / headers.f2.* completely |