本文是大数据系列第 19 篇,演示如何用 Flume 实时采集 Hive 运行日志并按时间分区写入 HDFS。

完整图文版(含截图):CSDN 原文 | 掘金

案例目标

将 Hive 运行时产生的日志文件实时采集到 HDFS,路径按日期和小时分区,方便后续用 Hive SQL 进行日志分析:

/flume/20240717/1430/logs-xxxxx

使用的组件:

  • Source:exec source,执行 tail -F 命令实时追踪日志文件
  • Channel:memory channel,高性能内存缓冲
  • Sink:HDFS sink,按时间分区写入 HDFS

配置文件

创建配置文件 /opt/wzk/flume_test/flume-exec-hdfs.conf

# Agent 组件声明
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Source:实时追踪 Hive 日志
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /tmp/root/hive.log

# Channel:内存缓冲
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 500

# Sink:写入 HDFS,按分钟分区
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://h121.wzk.icu:9000/flume/%Y%m%d/%H%M
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.batchSize = 500
a2.sinks.k2.hdfs.rollInterval = 60
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1

# 绑定
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

关键参数详解

HDFS 路径中的时间占位符

%Y%m%d/%H%M 会被替换为实际时间,如 20240717/1430。注意:需要在 Event 的 header 中有 timestamp 字段,或者在 Source 上配置 TimestampInterceptor。

若不想依赖 Event header,可以开启本地时间戳:

a2.sinks.k2.hdfs.useLocalTimeStamp = true

文件滚动策略

HDFS Sink 通过以下三个参数控制何时关闭当前文件、开启新文件:

参数说明
rollInterval60每 60 秒滚动一次(0 表示不按时间滚动)
rollSize134217700文件超过 128MB 时滚动(0 表示不按大小滚动)
rollCount0不按 Event 数量滚动

三个条件满足任一即触发滚动。本例采用纯按时间滚动策略。

副本数控制

minBlockReplicas = 1 设置 HDFS 写入时的最小副本数为 1(默认值跟随 HDFS 集群配置),在测试环境中减少写入等待时间。

batchSize

Source 每次从 Channel 取出并写入 HDFS 的 Event 数量,值越大吞吐量越高,但内存占用也越大。

启动采集任务

先确认 Hive 日志路径存在(执行一条 HQL 后会自动生成):

ls /tmp/root/hive.log

启动 Flume Agent:

$FLUME_HOME/bin/flume-ng agent \
  --name a2 \
  --conf-file /opt/wzk/flume_test/flume-exec-hdfs.conf \
  -Dflume.root.logger=INFO,console

验证结果

在 Hive 中执行几条 SQL 产生日志,然后查看 HDFS 上的文件:

hdfs dfs -ls /flume/
hdfs dfs -ls /flume/20240717/

等待滚动周期(60 秒)后,文件名从 .tmp 后缀变为正式文件,表示写入完成:

/flume/20240717/1430/logs-h121.wzk.icu-1721214600000.1721214660000

hdfs dfs -cat 可以查看文件内容,应与 Hive 日志一致。