概述
本文演示一套从日志采集到会员指标分析的完整离线数仓链路,通过 Flume Taildir 监听本地日志目录,将数据写入 HDFS 分区目录,再由 Hive 外部表加载至 ODS,并进一步清洗落入 DWD,最终支撑活跃会员、新增会员、会员留存等指标计算。
数据流
数据采集 => ODS => DWD => DWS => ADS => MySQL
活跃会员、新增会员、会员留存
脚本执行顺序
# 加载 ODS / DWD 层采集
ods_load_startlog.sh
dwd_load_startlog.sh
# 活跃会员
dws_load_member_start.sh
ads_load_member_active.sh
# 新增会员
dws_load_member_add_day.sh
ads_load_member_add.sh
# 会员留存
dws_load_member_retention_day.sh
ads_load_member_retention.sh
Flume 配置
监听日志目录:/opt/wzk/logs/start/.*log
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.batchSize = 1000
启动命令:
flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console
HDFS 数据清理
hadoop fs -rm -r /user/data/logs/start/
hadoop fs -rm -r /user/data/logs/event/
hadoop fs -mkdir /user/data/logs/start
hadoop fs -mkdir /user/data/logs/event
ODS 层
定义
ODS(Operational Data Store)是操作型数据存储层,用于存放从业务系统中抽取的原始数据,是数仓的”数据输入口”。
功能
- 数据完整存储:保持数据原始性,便于追溯问题
- 解耦:实现业务系统与数据仓库之间的隔离
- 实时性支持:如果数据需要实时分析,ODS 可以支持数据的近实时同步
加载 ODS
sh ods_load_startlog.sh 2020-07-21
Hive 查询:
use ods;
select * from ods_start_log limit 3;
DWD 层
定义
DWD(Data Warehouse Detail)是明细数据层,存储经过清洗和轻度处理的宽表数据。
功能
- 数据细化:清洗、标准化后的数据,供后续主题建模使用
- 性能优化:减少后续查询时的数据清洗和转换开销
- 维度宽表化:对多表进行宽表关联,便于业务分析
数据处理
- 数据清洗:去除脏数据、空值处理、格式统一
- 数据整合:按照业务逻辑关联多表,增加计算字段
- 去重与规范化:保证数据唯一性
加载 DWD
sh dwd_load_startlog.sh 2020-07-21
Hive 查询:
use dwd;
select * from dwd.dwd_start_log limit 3;
错误速查
| 症状 | 根因 | 定位修复 |
|---|---|---|
| Flume 启动后无预期日志输出 | 启动参数拼写错误 | 检查 -Dflume.roog.logger 改为标准 logger 参数 |
| HDFS 清理后后续采集目录异常 | enent 与 event 拼写不一致 | 对比删除命令与创建命令统一为 event |
| Flume 监听不到文件 | 监听目录与复制目录不一致 | 检查 filegroups 正则与 cp 目标目录 |
| HDFS 有目录但 Hive 查不到数据 | 分区目录日期字段依赖拦截器 | 检查自定义拦截器 LogTypeInterceptor 输出 header |
| ODS 可查但 DWD 无数据 | DWD 脚本日期参数与 ODS 实际分区不匹配 | 核对脚本参数与 ODS 分区统一业务日期口径 |
| 日志文件复制后无新增消费 | Taildir positionFile 记录了旧偏移 | 检查 positionFile 测试前清理或重置 |
| HDFS 文件滚动异常 | rollCount/rollInterval/idleTimeout 均设为 0 | 检查 sink 滚动参数设置合理阈值 |
| 文章中的日期混乱 | 样例日志时间跨 2020/2021/2024 | 核对正文中的日期片段统一演示日期 |