TL;DR
- 场景:离线数仓采集日志写入 HDFS,Flume Agent 吞吐、稳定性与落盘分区时间准确性需要同时保证
- 结论:优先用批量参数与 Channel 容量/事务配合提升吞吐,用 JVM 堆参数解决 OOM,用拦截器把业务时间写入 header 修正分区路径
- 产出:可复用的 Flume 1.9.0 调优清单 + OOM 速修命令 + 自定义 Interceptor(logtime header)落地模板
Flume的优化配置
Flume 是一种分布式、可靠且高效的数据收集、聚合和传输系统,广泛应用于大数据生态系统中。为了提升 Flume 的性能和稳定性,优化配置至关重要。
批量处理
- 参数:batchSize
- 作用:控制 Flume 在批量传输时每次传输的事件数量。
- 配置建议:
- Source 到 Channel:根据 Source 的吞吐量和 Channel 的吞吐能力调整,推荐值为 100-1000
- Channel 到 Sink:根据 Sink 的处理能力和目标系统的写入性能调整,推荐值为 500-5000
压缩传输
- 参数:compressionType
- 作用:对事件进行压缩后传输,减少网络带宽消耗。
- 支持的压缩类型:gzip、snappy、lz4 等
- 配置建议:根据目标系统是否支持解压缩功能选择合适的压缩类型
Source 优化
Taildir Source
- 参数:batchSize 和 fileHeader
- batchSize:设置单次从文件中读取的事件数量
- fileHeader:是否在事件头部添加文件名,推荐开启以便于后续处理
Kafka Source
- 参数:kafka.consumer.timeout.ms 和 fetch.message.max.bytes
- kafka.consumer.timeout.ms:设置 Kafka 消费者读取数据的超时时间,通常为 100-500ms
- fetch.message.max.bytes:设置每次读取的最大消息大小,默认值通常为 1MB,可以根据业务场景适当调整
Channel 优化
Memory Channel
- 参数:capacity 和 transactionCapacity
- capacity:Channel 中允许的最大事件数
- transactionCapacity:单次事务中允许的最大事件数
File Channel
- 参数:checkpointDir 和 dataDirs
- checkpointDir:存储 Channel 状态的目录
- dataDirs:存储事件数据的目录,建议设置多个磁盘路径以提升 IO 性能
- 配置建议:确保磁盘 IO 性能足够,避免瓶颈
Flume报错解决
向 logs 目录中存放入日志文件,此时如果出现OOM的日志,是因为缺省情况下FlumeJVM的最大分配20M,这个值太小,需要调整。
解决方案:在 $FLUME_HOME/conf/flume-env.sh 中增加以下内容:
export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
Flume内存参数设置及优化:
- 根据日志数据量大小,JVM堆一般要设置为4G或者更高
- -Xms -Xmx最好设置一致,减少内存抖动带来的性能影响
自定义拦截器
前面FlumeAgent的配置使用了本地时间,可能导致数据存放的路径不正确。要解决上面的问题就需要使用自定义拦截器。
自定义拦截器原理
自定义拦截器的原理:
- 自定义拦截器要集成 Flume 的 Interceptor
- Event 分为 header 和 body (接收的字符串)
- 获取 header 和 body
- 从 body 中 获取 time,并将时间戳转换为字符串 yyyy-MM-dd
- 将转换后的字符串放置到header中
自定义拦截器实现
自定义拦截器的实现:
- 获取event的header
- 获取event的body
- 解析body获取json串
- 解析json串获取时间戳
- 将时间戳转换为字符串 yyyy-MM-dd
- 将转换后的字符串放置header中
- 返回event
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.23</version>
</dependency>
</dependencies>
编写代码
package icu.wzk;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class CustomerInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 这里是逐条处理
String eventBody = new String(event.getBody(), StandardCharsets.UTF_8);
// 获取Event的Header
Map<String, String> headerMap = event.getHeaders();
// 解析Body获取JSON字符串
String[] bodyArr = eventBody.split("\\s+");
try {
String jsonStr = bodyArr[6];
// 解析JSON字符串获取时间戳
JSONObject jsonObject = JSON.parseObject(jsonStr);
String timestampStr = jsonObject.getJSONObject("app_active").getString("time");
// 将时间戳转换字符串 yyyy-MM-dd
// 将字符串转换为Long
long timestampLong = Long.parseLong(timestampStr);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Instant instant = Instant.ofEpochMilli(timestampLong);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String date = formatter.format(localDateTime);
// 将转换后的字符串放置header中
headerMap.put("logtime", date);
event.setHeaders(headerMap);
} catch (Exception e) {
headerMap.put("logtime", "Unknown");
event.setHeaders(headerMap);
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> lstEvent = new ArrayList<>();
for (Event event : list) {
Event outEvent = intercept(event);
if (outEvent != null) {
lstEvent.add(outEvent);
}
}
return lstEvent;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomerInterceptor();
}
@Override
public void configure(Context context) {
}
}
}