本文是大数据系列第 19 篇,演示如何用 Flume 实时采集 Hive 运行日志并按时间分区写入 HDFS。
案例目标
将 Hive 运行时产生的日志文件实时采集到 HDFS,路径按日期和小时分区,方便后续用 Hive SQL 进行日志分析:
/flume/20240717/1430/logs-xxxxx
使用的组件:
- Source:exec source,执行
tail -F命令实时追踪日志文件 - Channel:memory channel,高性能内存缓冲
- Sink:HDFS sink,按时间分区写入 HDFS
配置文件
创建配置文件 /opt/wzk/flume_test/flume-exec-hdfs.conf:
# Agent 组件声明
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Source:实时追踪 Hive 日志
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /tmp/root/hive.log
# Channel:内存缓冲
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 500
# Sink:写入 HDFS,按分钟分区
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://h121.wzk.icu:9000/flume/%Y%m%d/%H%M
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.batchSize = 500
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# 绑定
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
关键参数详解
HDFS 路径中的时间占位符
%Y%m%d/%H%M 会被替换为实际时间,如 20240717/1430。注意:需要在 Event 的 header 中有 timestamp 字段,或者在 Source 上配置 TimestampInterceptor。
若不想依赖 Event header,可以开启本地时间戳:
a2.sinks.k2.hdfs.useLocalTimeStamp = true
文件滚动策略
HDFS Sink 通过以下三个参数控制何时关闭当前文件、开启新文件:
| 参数 | 值 | 说明 |
|---|---|---|
rollInterval | 60 | 每 60 秒滚动一次(0 表示不按时间滚动) |
rollSize | 134217700 | 文件超过 128MB 时滚动(0 表示不按大小滚动) |
rollCount | 0 | 不按 Event 数量滚动 |
三个条件满足任一即触发滚动。本例采用纯按时间滚动策略。
副本数控制
minBlockReplicas = 1 设置 HDFS 写入时的最小副本数为 1(默认值跟随 HDFS 集群配置),在测试环境中减少写入等待时间。
batchSize
Source 每次从 Channel 取出并写入 HDFS 的 Event 数量,值越大吞吐量越高,但内存占用也越大。
启动采集任务
先确认 Hive 日志路径存在(执行一条 HQL 后会自动生成):
ls /tmp/root/hive.log
启动 Flume Agent:
$FLUME_HOME/bin/flume-ng agent \
--name a2 \
--conf-file /opt/wzk/flume_test/flume-exec-hdfs.conf \
-Dflume.root.logger=INFO,console
验证结果
在 Hive 中执行几条 SQL 产生日志,然后查看 HDFS 上的文件:
hdfs dfs -ls /flume/
hdfs dfs -ls /flume/20240717/
等待滚动周期(60 秒)后,文件名从 .tmp 后缀变为正式文件,表示写入完成:
/flume/20240717/1430/logs-h121.wzk.icu-1721214600000.1721214660000
用 hdfs dfs -cat 可以查看文件内容,应与 Hive 日志一致。