This is article 20 in the Big Data series. Demonstrates Flume replication mode with dual Sink architecture—same data written to both HDFS and local filesystem.

Complete illustrated version: CSDN Original | Juejin

Application Scenario

Production environment often needs to write same log data to multiple targets:

  • HDFS: For Hive / Spark offline analysis and reporting
  • Kafka / Local Files: For real-time stream processing or quick queries

Flume uses Replicating Channel Selector to copy same Event to multiple Channels, each Channel corresponds to different Sink target, naturally supporting this multi-path distribution requirement.

Architecture Design

This case uses three Agents in cascade:

[Agent 1] taildir source
    ↓ replicating channel selector
  ┌─────────────┐
  ↓             ↓
Channel c1    Channel c2
  ↓             ↓
Avro Sink     Avro Sink
(port 9091)   (port 9092)
  ↓             ↓
[Agent 2]    [Agent 3]
HDFS Sink   file_roll Sink
  • Agent 1: Responsible for log collection, distributes to two downstream Agents via replication mode
  • Agent 2: Receives Agent 1’s data, writes to HDFS (partitioned by time)
  • Agent 3: Receives Agent 1’s data, writes to local file (file_roll)

Configuration Files

Agent 1: Collection + Replication Distribution

File: flume-taildir-avro.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# Source: monitor log file changes
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/root/.*log

# Replication mode: write each Event to both c1 and c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

# Channel c1 → Sink k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500

# Sink k1: Avro forward to Agent 2 (HDFS)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h121.wzk.icu
a1.sinks.k1.port = 9091
a1.sinks.k1.channel = c1

# Channel c2 → Sink k2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500

# Sink k2: Avro forward to Agent 3 (Local file)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = h121.wzk.icu
a1.sinks.k2.port = 9092
a1.sinks.k2.channel = c2

Agent 2: Receive and Write to HDFS

File: flume-avro-hdfs.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = h121.wzk.icu
a2.sources.r1.port = 9091

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500

a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h121.wzk.icu:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.batchSize = 500
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.minBlockReplicas = 1

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

Agent 3: Receive and Write to Local File

File: flume-avro-file.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1

a3.sources.r1.type = avro
a3.sources.r1.bind = h121.wzk.icu
a3.sources.r1.port = 9092

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 500

a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /root/flume/output
a3.sinks.k1.sink.rollInterval = 30

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

Start Order

Must start from downstream to upstream, ensure downstream receiver is ready before upstream connects:

# Step 1: Start Agent 3 (local file Sink)
$FLUME_HOME/bin/flume-ng agent --name a3 \
  --conf-file flume-avro-file.conf \
  -Dflume.root.logger=INFO,console &

# Step 2: Start Agent 2 (HDFS Sink)
$FLUME_HOME/bin/flume-ng agent --name a2 \
  --conf-file flume-avro-hdfs.conf \
  -Dflume.root.logger=INFO,console &

# Step 3: Start Agent 1 (collection Source)
$FLUME_HOME/bin/flume-ng agent --name a1 \
  --conf-file flume-taildir-avro.conf \
  -Dflume.root.logger=INFO,console

If Agent 1 is started first, Avro Sink will error due to unable to connect to downstream.

Verify Results

After generating new logs, check both targets:

# Check HDFS
hdfs dfs -ls /flume2/20240717/

# Check local file
ls -la /root/flume/output/

Both locations should have new files with identical content, indicating dual Sink architecture works normally.

taildir Source vs exec Source

Compared to exec source used in previous article, this example uses taildir source, main advantages:

Featureexec sourcetaildir source
Resume from checkpointNot supportedSupported (position file records offset)
Multi-file monitoringNeed multiple sourcesSingle source supports multiple files/dirs
After Agent restartRead from beginning or loseContinue from last position

taildir source is recommended for production environments.