本文是大数据系列第 20 篇,演示 Flume 复制模式下的双 Sink 架构——一份数据同时落盘 HDFS 和本地文件系统。

完整图文版(含截图):CSDN 原文 | 掘金

应用场景

生产环境中常需要将同一份日志数据写入多个目标:

  • HDFS:供 Hive / Spark 进行离线分析和报表
  • Kafka / 本地文件:供实时流处理或快速查询使用

Flume 通过 Replicating Channel Selector 将同一 Event 复制到多个 Channel,每个 Channel 对应不同的 Sink 目标,天然支持这种多路分发需求。

架构设计

本案例使用三个 Agent 级联:

[Agent 1] taildir source
    ↓ replicating channel selector
  ┌─────────────┐
  ↓             ↓
Channel c1    Channel c2
  ↓             ↓
Avro Sink     Avro Sink
(port 9091)   (port 9092)
  ↓             ↓
[Agent 2]    [Agent 3]
HDFS Sink   file_roll Sink
  • Agent 1:负责日志采集,通过复制模式分发给两个下游 Agent
  • Agent 2:接收 Agent 1 的数据,写入 HDFS(按时间分区)
  • Agent 3:接收 Agent 1 的数据,写入本地文件(file_roll)

配置文件

Agent 1:采集 + 复制分发

文件:flume-taildir-avro.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

# Source:监控日志文件变化
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/root/.*log

# 复制模式:将每个 Event 同时写入 c1 和 c2
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2

# Channel c1 → Sink k1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500

# Sink k1:Avro 转发给 Agent 2 (HDFS)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h121.wzk.icu
a1.sinks.k1.port = 9091
a1.sinks.k1.channel = c1

# Channel c2 → Sink k2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500

# Sink k2:Avro 转发给 Agent 3 (本地文件)
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = h121.wzk.icu
a1.sinks.k2.port = 9092
a1.sinks.k2.channel = c2

Agent 2:接收并写入 HDFS

文件:flume-avro-hdfs.conf

a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = h121.wzk.icu
a2.sources.r1.port = 9091

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500

a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://h121.wzk.icu:9000/flume2/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.batchSize = 500
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.minBlockReplicas = 1

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

Agent 3:接收并写入本地文件

文件:flume-avro-file.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1

a3.sources.r1.type = avro
a3.sources.r1.bind = h121.wzk.icu
a3.sources.r1.port = 9092

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 500

a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /root/flume/output
a3.sinks.k1.sink.rollInterval = 30

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动顺序

必须从下游到上游依次启动,确保下游接收端在上游连接前已就绪:

# 第一步:启动 Agent 3(本地文件 Sink)
$FLUME_HOME/bin/flume-ng agent --name a3 \
  --conf-file flume-avro-file.conf \
  -Dflume.root.logger=INFO,console &

# 第二步:启动 Agent 2(HDFS Sink)
$FLUME_HOME/bin/flume-ng agent --name a2 \
  --conf-file flume-avro-hdfs.conf \
  -Dflume.root.logger=INFO,console &

# 第三步:启动 Agent 1(采集 Source)
$FLUME_HOME/bin/flume-ng agent --name a1 \
  --conf-file flume-taildir-avro.conf \
  -Dflume.root.logger=INFO,console

若先启动 Agent 1,Avro Sink 会因连接不到下游而报错。

验证结果

产生新日志后,分别检查两个目标:

# 查看 HDFS
hdfs dfs -ls /flume2/20240717/

# 查看本地文件
ls -la /root/flume/output/

两处均应有新文件生成,内容相同,即表示双 Sink 架构工作正常。

taildir Source vs exec Source

相比上一篇使用的 exec source,本例换用 taildir source,主要优势:

特性exec sourcetaildir source
断点续传不支持支持(position 文件记录偏移)
多文件监控需多个 source单个 source 支持多文件/目录
Agent 重启后从头读取或丢失从上次位置继续

生产环境推荐使用 taildir source