This is article 20 in the Big Data series. Demonstrates Flume replication mode with dual Sink architecture—same data written to both HDFS and local filesystem.
Complete illustrated version: CSDN Original | Juejin
Application Scenario
Production environment often needs to write same log data to multiple targets:
- HDFS: For Hive / Spark offline analysis and reporting
- Kafka / Local Files: For real-time stream processing or quick queries
Flume uses Replicating Channel Selector to copy same Event to multiple Channels, each Channel corresponds to different Sink target, naturally supporting this multi-path distribution requirement.
Architecture Design
This case uses three Agents in cascade:
[Agent 1] taildir source
↓ replicating channel selector
┌─────────────┐
↓ ↓
Channel c1 Channel c2
↓ ↓
Avro Sink Avro Sink
(port 9091) (port 9092)
↓ ↓
[Agent 2] [Agent 3]
HDFS Sink file_roll Sink
- Agent 1: Responsible for log collection, distributes to two downstream Agents via replication mode
- Agent 2: Receives Agent 1’s data, writes to HDFS (partitioned by time)
- Agent 3: Receives Agent 1’s data, writes to local file (file_roll)
Configuration Files
Agent 1: Collection + Replication Distribution
File: flume-taildir-avro.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
# Source: monitor log file changes
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# Replication mode: write each Event to both c1 and c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Channel c1 → Sink k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
# Sink k1: Avro forward to Agent 2 (HDFS)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h121.wzk.icu
a1.sinks.k1.port = 9091
a1.sinks.k1.channel = c1
# Channel c2 → Sink k2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Sink k2: Avro forward to Agent 3 (Local file)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = h121.wzk.icu
a1.sinks.k2.port = 9092
a1.sinks.k2.channel = c2
Agent 2: Receive and Write to HDFS
File: flume-avro-hdfs.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = h121.wzk.icu
a2.sources.r1.port = 9091
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h121.wzk.icu:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.batchSize = 500
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.minBlockReplicas = 1
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
Agent 3: Receive and Write to Local File
File: flume-avro-file.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = h121.wzk.icu
a3.sources.r1.port = 9092
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 500
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /root/flume/output
a3.sinks.k1.sink.rollInterval = 30
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
Start Order
Must start from downstream to upstream, ensure downstream receiver is ready before upstream connects:
# Step 1: Start Agent 3 (local file Sink)
$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-avro-file.conf \
-Dflume.root.logger=INFO,console &
# Step 2: Start Agent 2 (HDFS Sink)
$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &
# Step 3: Start Agent 1 (collection Source)
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console
If Agent 1 is started first, Avro Sink will error due to unable to connect to downstream.
Verify Results
After generating new logs, check both targets:
# Check HDFS
hdfs dfs -ls /flume2/20240717/
# Check local file
ls -la /root/flume/output/
Both locations should have new files with identical content, indicating dual Sink architecture works normally.
taildir Source vs exec Source
Compared to exec source used in previous article, this example uses taildir source, main advantages:
| Feature | exec source | taildir source |
|---|---|---|
| Resume from checkpoint | Not supported | Supported (position file records offset) |
| Multi-file monitoring | Need multiple sources | Single source supports multiple files/dirs |
| After Agent restart | Read from beginning or lose | Continue from last position |
taildir source is recommended for production environments.