本文是大数据系列第 18 篇,通过最简单的 Flume 案例演示完整的 Source→Channel→Sink 数据流管道。

完整图文版(含截图):CSDN 原文 | 掘金

案例目标

使用以下三个组件搭建最简单的 Flume Agent:

  • Source:netcat source,监听 TCP 端口,接收通过 telnet 发送的文本
  • Channel:memory channel,将 Event 缓存在 JVM 内存中
  • Sink:logger sink,将 Event 内容打印到控制台

这是验证 Flume 安装是否正常的标准 Hello World 场景。

组件说明

组件类型特点
netcat sourcenetcat监听 TCP 端口,每行输入作为一个 Event
memory channelmemory高性能,进程崩溃时数据可能丢失
logger sinklogger输出到控制台日志,仅用于调试

配置文件

创建目录和配置文件 /opt/wzk/flume_test/flume-netcat-logger.conf

mkdir -p /opt/wzk/flume_test

配置文件内容:

# 声明 Agent 的三个组件名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# Source 配置:监听 TCP 8888 端口
a1.sources.r1.type = netcat
a1.sources.r1.bind = h122.wzk.icu
a1.sources.r1.port = 8888

# Channel 配置:内存缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Sink 配置:输出到控制台日志
a1.sinks.k1.type = logger

# 绑定关系:Source 连接 Channel,Sink 连接 Channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

关键参数说明:

  • capacity:Channel 最多缓存的 Event 数量(10000)
  • transactionCapacity:每次事务最多处理的 Event 数量(100)
  • bind:netcat 监听的主机名或 IP

启动 Flume Agent

确认 8888 端口未被占用:

lsof -i:8888

启动 Agent,指定 Agent 名称和配置文件:

$FLUME_HOME/bin/flume-ng agent \
  --name a1 \
  --conf-file /opt/wzk/flume_test/flume-netcat-logger.conf \
  -Dflume.root.logger=INFO,console

参数说明:

  • --name a1:与配置文件中的 Agent 名称一致
  • --conf-file:配置文件路径
  • -Dflume.root.logger=INFO,console:将日志级别设为 INFO 并输出到控制台

发送测试数据

安装 telnet 客户端(如未安装):

sudo apt install telnet

在另一个终端连接到 Flume 监听端口:

telnet h122.wzk.icu 8888

连接成功后,输入任意文本并回车,例如:

hello flume
big data test

观察输出

回到 Flume 启动的终端,可以看到类似如下的日志输出:

INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65  hello flume }
INFO sink.LoggerSink: Event: { headers:{} body: 62 69 67 20 64 61 74 61 20 74 65  big data te }

Event body 同时显示十六进制和可读文本,说明数据已成功经过 Source → Channel → Sink 完整链路。

小结

这个案例验证了 Flume 的基本工作原理:Source 接收数据封装为 Event,Channel 负责缓冲,Sink 消费 Event 并写出。后续案例将把 Sink 替换为 HDFS Sink,实现真实的日志采集落盘。