本文是大数据系列第 18 篇,通过最简单的 Flume 案例演示完整的 Source→Channel→Sink 数据流管道。
案例目标
使用以下三个组件搭建最简单的 Flume Agent:
- Source:netcat source,监听 TCP 端口,接收通过 telnet 发送的文本
- Channel:memory channel,将 Event 缓存在 JVM 内存中
- Sink:logger sink,将 Event 内容打印到控制台
这是验证 Flume 安装是否正常的标准 Hello World 场景。
组件说明
| 组件 | 类型 | 特点 |
|---|---|---|
| netcat source | netcat | 监听 TCP 端口,每行输入作为一个 Event |
| memory channel | memory | 高性能,进程崩溃时数据可能丢失 |
| logger sink | logger | 输出到控制台日志,仅用于调试 |
配置文件
创建目录和配置文件 /opt/wzk/flume_test/flume-netcat-logger.conf:
mkdir -p /opt/wzk/flume_test
配置文件内容:
# 声明 Agent 的三个组件名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# Source 配置:监听 TCP 8888 端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 8888
# Channel 配置:内存缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
# Sink 配置:输出到控制台日志
a1.sinks.k1.type = logger
# 绑定关系:Source 连接 Channel,Sink 连接 Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
关键参数说明:
capacity:Channel 最多缓存的 Event 数量(10000)transactionCapacity:每次事务最多处理的 Event 数量(100)bind:netcat 监听的主机名或 IP
启动 Flume Agent
确认 8888 端口未被占用:
lsof -i:8888
启动 Agent,指定 Agent 名称和配置文件:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf-file /opt/wzk/flume_test/flume-netcat-logger.conf \
-Dflume.root.logger=INFO,console
参数说明:
--name a1:与配置文件中的 Agent 名称一致--conf-file:配置文件路径-Dflume.root.logger=INFO,console:将日志级别设为 INFO 并输出到控制台
发送测试数据
安装 telnet 客户端(如未安装):
sudo apt install telnet
在另一个终端连接到 Flume 监听端口:
telnet h122.wzk.icu 8888
连接成功后,输入任意文本并回车,例如:
hello flume
big data test
观察输出
回到 Flume 启动的终端,可以看到类似如下的日志输出:
INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 hello flume }
INFO sink.LoggerSink: Event: { headers:{} body: 62 69 67 20 64 61 74 61 20 74 65 big data te }
Event body 同时显示十六进制和可读文本,说明数据已成功经过 Source → Channel → Sink 完整链路。
小结
这个案例验证了 Flume 的基本工作原理:Source 接收数据封装为 Event,Channel 负责缓冲,Sink 消费 Event 并写出。后续案例将把 Sink 替换为 HDFS Sink,实现真实的日志采集落盘。