概述

本文演示一套从日志采集到会员指标分析的完整离线数仓链路,通过 Flume Taildir 监听本地日志目录,将数据写入 HDFS 分区目录,再由 Hive 外部表加载至 ODS,并进一步清洗落入 DWD,最终支撑活跃会员、新增会员、会员留存等指标计算。

数据流

数据采集 => ODS => DWD => DWS => ADS => MySQL
活跃会员、新增会员、会员留存

脚本执行顺序

# 加载 ODS / DWD 层采集
ods_load_startlog.sh
dwd_load_startlog.sh
# 活跃会员
dws_load_member_start.sh
ads_load_member_active.sh
# 新增会员
dws_load_member_add_day.sh
ads_load_member_add.sh
# 会员留存
dws_load_member_retention_day.sh
ads_load_member_retention.sh

Flume 配置

监听日志目录:/opt/wzk/logs/start/.*log

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event

# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.batchSize = 1000

启动命令:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

HDFS 数据清理

hadoop fs -rm -r /user/data/logs/start/
hadoop fs -rm -r /user/data/logs/event/

hadoop fs -mkdir /user/data/logs/start
hadoop fs -mkdir /user/data/logs/event

ODS 层

定义

ODS(Operational Data Store)是操作型数据存储层,用于存放从业务系统中抽取的原始数据,是数仓的”数据输入口”。

功能

  • 数据完整存储:保持数据原始性,便于追溯问题
  • 解耦:实现业务系统与数据仓库之间的隔离
  • 实时性支持:如果数据需要实时分析,ODS 可以支持数据的近实时同步

加载 ODS

sh ods_load_startlog.sh 2020-07-21

Hive 查询:

use ods;
select * from ods_start_log limit 3;

DWD 层

定义

DWD(Data Warehouse Detail)是明细数据层,存储经过清洗和轻度处理的宽表数据。

功能

  • 数据细化:清洗、标准化后的数据,供后续主题建模使用
  • 性能优化:减少后续查询时的数据清洗和转换开销
  • 维度宽表化:对多表进行宽表关联,便于业务分析

数据处理

  • 数据清洗:去除脏数据、空值处理、格式统一
  • 数据整合:按照业务逻辑关联多表,增加计算字段
  • 去重与规范化:保证数据唯一性

加载 DWD

sh dwd_load_startlog.sh 2020-07-21

Hive 查询:

use dwd;
select * from dwd.dwd_start_log limit 3;

错误速查

症状根因定位修复
Flume 启动后无预期日志输出启动参数拼写错误检查 -Dflume.roog.logger 改为标准 logger 参数
HDFS 清理后后续采集目录异常enent 与 event 拼写不一致对比删除命令与创建命令统一为 event
Flume 监听不到文件监听目录与复制目录不一致检查 filegroups 正则与 cp 目标目录
HDFS 有目录但 Hive 查不到数据分区目录日期字段依赖拦截器检查自定义拦截器 LogTypeInterceptor 输出 header
ODS 可查但 DWD 无数据DWD 脚本日期参数与 ODS 实际分区不匹配核对脚本参数与 ODS 分区统一业务日期口径
日志文件复制后无新增消费Taildir positionFile 记录了旧偏移检查 positionFile 测试前清理或重置
HDFS 文件滚动异常rollCount/rollInterval/idleTimeout 均设为 0检查 sink 滚动参数设置合理阈值
文章中的日期混乱样例日志时间跨 2020/2021/2024核对正文中的日期片段统一演示日期