本文是大数据系列第 20 篇,演示 Flume 复制模式下的双 Sink 架构——一份数据同时落盘 HDFS 和本地文件系统。
应用场景
生产环境中常需要将同一份日志数据写入多个目标:
- HDFS:供 Hive / Spark 进行离线分析和报表
- Kafka / 本地文件:供实时流处理或快速查询使用
Flume 通过 Replicating Channel Selector 将同一 Event 复制到多个 Channel,每个 Channel 对应不同的 Sink 目标,天然支持这种多路分发需求。
架构设计
本案例使用三个 Agent 级联:
[Agent 1] taildir source
↓ replicating channel selector
┌─────────────┐
↓ ↓
Channel c1 Channel c2
↓ ↓
Avro Sink Avro Sink
(port 9091) (port 9092)
↓ ↓
[Agent 2] [Agent 3]
HDFS Sink file_roll Sink
- Agent 1:负责日志采集,通过复制模式分发给两个下游 Agent
- Agent 2:接收 Agent 1 的数据,写入 HDFS(按时间分区)
- Agent 3:接收 Agent 1 的数据,写入本地文件(file_roll)
配置文件
Agent 1:采集 + 复制分发
文件:flume-taildir-avro.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
# Source:监控日志文件变化
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# 复制模式:将每个 Event 同时写入 c1 和 c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2
# Channel c1 → Sink k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
# Sink k1:Avro 转发给 Agent 2 (HDFS)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h121.wzk.icu
a1.sinks.k1.port = 9091
a1.sinks.k1.channel = c1
# Channel c2 → Sink k2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Sink k2:Avro 转发给 Agent 3 (本地文件)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = h121.wzk.icu
a1.sinks.k2.port = 9092
a1.sinks.k2.channel = c2
Agent 2:接收并写入 HDFS
文件:flume-avro-hdfs.conf
a2.sources = r1
a2.channels = c1
a2.sinks = k1
a2.sources.r1.type = avro
a2.sources.r1.bind = h121.wzk.icu
a2.sources.r1.port = 9091
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h121.wzk.icu:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.batchSize = 500
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.minBlockReplicas = 1
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
Agent 3:接收并写入本地文件
文件:flume-avro-file.conf
a3.sources = r1
a3.channels = c1
a3.sinks = k1
a3.sources.r1.type = avro
a3.sources.r1.bind = h121.wzk.icu
a3.sources.r1.port = 9092
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 500
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /root/flume/output
a3.sinks.k1.sink.rollInterval = 30
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
启动顺序
必须从下游到上游依次启动,确保下游接收端在上游连接前已就绪:
# 第一步:启动 Agent 3(本地文件 Sink)
$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-avro-file.conf \
-Dflume.root.logger=INFO,console &
# 第二步:启动 Agent 2(HDFS Sink)
$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &
# 第三步:启动 Agent 1(采集 Source)
$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console
若先启动 Agent 1,Avro Sink 会因连接不到下游而报错。
验证结果
产生新日志后,分别检查两个目标:
# 查看 HDFS
hdfs dfs -ls /flume2/20240717/
# 查看本地文件
ls -la /root/flume/output/
两处均应有新文件生成,内容相同,即表示双 Sink 架构工作正常。
taildir Source vs exec Source
相比上一篇使用的 exec source,本例换用 taildir source,主要优势:
| 特性 | exec source | taildir source |
|---|---|---|
| 断点续传 | 不支持 | 支持(position 文件记录偏移) |
| 多文件监控 | 需多个 source | 单个 source 支持多文件/目录 |
| Agent 重启后 | 从头读取或丢失 | 从上次位置继续 |
生产环境推荐使用 taildir source。