TL;DR
- 场景:Flume TAILDIR 同时采集启动日志与事件日志,要求按日志内时间分区、按来源分目录落 HDFS。
- 结论:用自定义 Interceptor 抽取 logtime,并用 filegroups headers 或拦截器补齐 logtype,HDFS path 通过 %{header} 动态路由。
- 产出:一套可复用配置模板 + 常见坑位速查(类加载、占位符、滚动策略、权限与 positionFile)。
自定义拦截器
将打包的jar上传到Flume的lib目录:/opt/servers/apache-flume-1.9.0-bin/lib/
测试效果
创建配置文件并启动Flume agent,使用telnet测试数据传入。
采集启动日志(使用自定义拦截器)
配置文件
新建配置文件,配置TAILDIR source、自定义拦截器、memory channel和HDFS sink。
关键配置
- 给source增加自定义拦截器
- 去掉时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
- 根据header中的logtime写文件
采集启动日志和事件日志
本系统中要采集两种日志:启动日志和事件日志。不同的日志放置在不同的目录下,要想一次拿到全部日志需要监控多个目录。
总体思路
- taildir 监控多个目录
- 修改自定义拦截器,不同来源的数据加上不同标志
- HDFS、Sink 根据标志写文件
Agent介绍
Flume 是一个分布式、高可靠、可用来收集、聚合和传输大量日志数据的系统。
Agent的核心组成部分
Source (源)
Source 是数据流起点,负责接收外部数据。支持:Avro Source、Syslog Source、Exec Source、HTTP Source、Spooling Directory Source等。
Channel (通道)
Channel 是 Agent 的数据缓冲区域,用于在 Source 和 Sink 之间暂存事件。常见类型:Memory Channel、File Channel、Kafka Channel。
Sink (接收器)
Sink 是数据流的终点,负责将事件传递到下游存储或处理系统。支持:HDFS Sink、Kafka Sink、Elasticsearch Sink、Logger Sink、Avro Sink。
Agent配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
说明:
- filegroups:指定filegroups,可以有多个,以空格分割(taildir source可以同时监控多个目录中的文件)
- headers.filegroups.headerKey:给Event增加header Key,不同的filegroup,可配置不同的value。
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| Flume 启动直接报 “Unknown/Unrecognized option” 或参数不生效 | agent 启动参数写错 | 看 flume-ng agent 输出的 usage/Unknown option;用 --name a1;logger 用 -Dflume.root.logger=INFO,console |
| 启动报 ClassNotFound:自定义拦截器找不到 | jar 未放到 Flume classpath 或未包含依赖 | 确认 jar 是否在 $FLUME_HOME/lib;放入 /opt/servers/apache-flume-1.9.0-bin/lib/;优先使用 jar-with-dependencies |
Interceptor 生效但 %{logtime} 为空,HDFS 路径分区异常 | 未正确写 header(解析失败/字段缺失/正则不匹配) | 在 Sink 前加 LoggerSink 或 Debug 日志打印 headers;在拦截器里对缺失时间设置兜底 |
| HDFS 上迟迟不生成文件,或生成很慢 | rollInterval/idleTimeout/rollCount 配置导致”只按大小滚动”,低流量不触发 | 低流量场景设置 hdfs.rollInterval=60 或 hdfs.idleTimeout=60 |
| HDFS 写入报 Permission denied / lease 错误 | HDFS 目录权限/用户身份不匹配;Kerberos/代理用户未配置 | 给 Flume 运行用户授权目标目录;按集群启用 hdfs.kerberosPrincipal 等安全参数 |
| TAILDIR 不采集或只采到一次,重启后从头/不续采 | positionFile 路径不可写/格式损坏/多 agent 共用同一文件 | 检查 positionFile 是否更新;确保目录可写;每个 agent 独立 positionFile |
| 多目录采集时 logtype 丢失或混写 | headers 配置写错或拦截器覆盖 | 在 LoggerSink 输出 headers 对比 start/event;保证 filegroups = f1 f2 与 headers.f1.* / headers.f2.* 完全一致 |