TL;DR

  • 场景:Flume TAILDIR 同时采集启动日志与事件日志,要求按日志内时间分区、按来源分目录落 HDFS。
  • 结论:用自定义 Interceptor 抽取 logtime,并用 filegroups headers 或拦截器补齐 logtype,HDFS path 通过 %{header} 动态路由。
  • 产出:一套可复用配置模板 + 常见坑位速查(类加载、占位符、滚动策略、权限与 positionFile)。

自定义拦截器

将打包的jar上传到Flume的lib目录:/opt/servers/apache-flume-1.9.0-bin/lib/

测试效果

创建配置文件并启动Flume agent,使用telnet测试数据传入。

采集启动日志(使用自定义拦截器)

配置文件

新建配置文件,配置TAILDIR source、自定义拦截器、memory channel和HDFS sink。

关键配置

  • 给source增加自定义拦截器
  • 去掉时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true
  • 根据header中的logtime写文件

采集启动日志和事件日志

本系统中要采集两种日志:启动日志和事件日志。不同的日志放置在不同的目录下,要想一次拿到全部日志需要监控多个目录。

总体思路

  • taildir 监控多个目录
  • 修改自定义拦截器,不同来源的数据加上不同标志
  • HDFS、Sink 根据标志写文件

Agent介绍

Flume 是一个分布式、高可靠、可用来收集、聚合和传输大量日志数据的系统。

Agent的核心组成部分

Source (源)

Source 是数据流起点,负责接收外部数据。支持:Avro Source、Syslog Source、Exec Source、HTTP Source、Spooling Directory Source等。

Channel (通道)

Channel 是 Agent 的数据缓冲区域,用于在 Source 和 Sink 之间暂存事件。常见类型:Memory Channel、File Channel、Kafka Channel。

Sink (接收器)

Sink 是数据流的终点,负责将事件传递到下游存储或处理系统。支持:HDFS Sink、Kafka Sink、Elasticsearch Sink、Logger Sink、Avro Sink。

Agent配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

说明

  • filegroups:指定filegroups,可以有多个,以空格分割(taildir source可以同时监控多个目录中的文件)
  • headers.filegroups.headerKey:给Event增加header Key,不同的filegroup,可配置不同的value。

错误速查

症状根因定位修复
Flume 启动直接报 “Unknown/Unrecognized option” 或参数不生效agent 启动参数写错看 flume-ng agent 输出的 usage/Unknown option;用 --name a1;logger 用 -Dflume.root.logger=INFO,console
启动报 ClassNotFound:自定义拦截器找不到jar 未放到 Flume classpath 或未包含依赖确认 jar 是否在 $FLUME_HOME/lib;放入 /opt/servers/apache-flume-1.9.0-bin/lib/;优先使用 jar-with-dependencies
Interceptor 生效但 %{logtime} 为空,HDFS 路径分区异常未正确写 header(解析失败/字段缺失/正则不匹配)在 Sink 前加 LoggerSink 或 Debug 日志打印 headers;在拦截器里对缺失时间设置兜底
HDFS 上迟迟不生成文件,或生成很慢rollInterval/idleTimeout/rollCount 配置导致”只按大小滚动”,低流量不触发低流量场景设置 hdfs.rollInterval=60hdfs.idleTimeout=60
HDFS 写入报 Permission denied / lease 错误HDFS 目录权限/用户身份不匹配;Kerberos/代理用户未配置给 Flume 运行用户授权目标目录;按集群启用 hdfs.kerberosPrincipal 等安全参数
TAILDIR 不采集或只采到一次,重启后从头/不续采positionFile 路径不可写/格式损坏/多 agent 共用同一文件检查 positionFile 是否更新;确保目录可写;每个 agent 独立 positionFile
多目录采集时 logtype 丢失或混写headers 配置写错或拦截器覆盖在 LoggerSink 输出 headers 对比 start/event;保证 filegroups = f1 f2 与 headers.f1.* / headers.f2.* 完全一致