TL;DR
- 场景:用启动日志/事件日志在离线数仓统计新增、活跃(DAU/WAU/MAU)、留存。
- 结论:采集侧用 Flume 1.8+ Taildir 监控日志落 HDFS 分区,数仓分层承接口径与聚合。
- 产出:一套可复用的 Taildir→HDFS Sink 配置模板 + 指标口径对齐要点 + 常见故障排查清单。
需求分析
会员数据是后期营销的很重要的数据,网店会专门针对会员进行一系列营销活动,电商会员一般门槛较低,注册网站即可加入,有些电商平台的高级会员具有时效性,需要购买的VIP会员卡或一年内消费达到多少才能成为高级会员。
计算指标
- 新增会员:每次新增会员数
- 活跃会员:每日、每周、每月的活跃会员数
- 会员留存:1、2、3日会员的留存数,1、2、3日的会员留存率
指标口径业务逻辑
会员识别标准
会员定义:以移动设备为唯一识别标准,每台独立设备视为一个会员。具体识别方式:
- Android系统:通过设备IMEI号(国际移动设备识别码)进行识别,每部Android设备具有唯一的15位IMEI编码
- iOS系统:通过OpenUDID(开放统一设备标识符)进行识别,这是iOS设备的通用唯一标识符
- 特殊情况处理:同一用户的多台设备会分别计算为不同会员;设备刷机或恢复出厂设置后仍视为同一会员
活跃会员定义
- 日活跃会员(DAU):在自然日内(00:00-23:59)至少打开应用一次的独立设备,无论打开次数多少都计为1个活跃会员
- 周活跃会员(WAU):在自然周(周一至周日)内至少启动应用一次的独立设备
- 月活跃会员(MAU):在自然月(1日至月末)内至少启动应用一次的独立设备
会员活跃率计算
- 日活跃率(DAU率):计算公式 = 日活跃会员数 ÷ 总注册会员数 × 100%
- 周活跃率(WAU率):计算公式 = 周活跃会员数 ÷ 总注册会员数 × 100%
- 月活跃率(MAU率):计算公式 = 月活跃会员数 ÷ 总注册会员数 × 100%,通常MAU率在10-20%为健康水平
新增会员统计
新增会员定义:设备首次安装并启动应用的会员
- 防重复计算机制:卸载后重装不计为新会员;换机但登录同一账号不计为新会员
- 统计维度:日新增、周新增、月新增
留存分析
- 留存会员:在初始时间段(如某日/周/月)新增的会员,在后续时间段仍保持活跃
- 留存率计算:计算公式 = 留存会员数 ÷ 初始新增会员数 × 100%
- 常见指标:次日留存、7日留存、30日留存
日志数据采集
原始日志数据(一条启动日志)
2020-07-30 14:18:47.339 [main] INFO com.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"1","error_code":"0"},"time":1596111888529},"attr":{"area":"泰安","uid":"2F10092A9","app_v":"1.1.13","event_type":"common","device_id":"1FB872-9A1009","os_type":"4.7.3","channel":"DK","language":"chinese","brand":"iphone-9"}}
taildir source特点
- 使用正则表达式匹配目录中的文件名
- 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
- 高可靠,不会丢失数据
- 不会对跟踪文件有任何处理,不会重命名也不会删除
- 不支持Windows,不能读二进制文件,支持按行读取文本文件
tail source配置
- positionFile:配置检查点文件的路径,检查点文件会以JSON格式保存已经读取文件的位置,解决断点续传的问题
- filegroups:指定filegroups,可以有多个,以空格分隔(taildir source可以同时监控多个目录中的文件)
- filegroups.f1:配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配
HDFS Sink配置
HDFS Sink都会采用滚动生成文件的方式,滚动生成文件的策略有:
- 基于时间:hdfs.rollInterval 30秒
- 基于文件大小:hdfs.rollSize 1024字节
- 基于Event数量:hdfs.rollCount 10个event
- 基于文件空闲时间:hdfs.idleTimeout 0
- 0 禁用
- minBlockReplicase,默认值与HDFS副本数一致。设为1是为了让Flume感知不到HDFS的块复制
Agent配置
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/start/%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
# 配置文件滚动方式(文件大小32M)
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
# 向hdfs上刷新的event的个数
a1.sinks.k1.hdfs.batchSize = 1000
# 使用本地时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Flume的优化配置
flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs1.conf -name a1 -Dflume.root.logger=INFO,console
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| Taildir 不采集、HDFS 无新文件 | filegroups 正则没匹配到文件/路径写错 | Flume 日志搜 TaildirSource、看监控到的文件列表;核对路径是否命中 |
| positionFile 不更新/采集断续 | positionFile 无权限或目录不存在 | 看 positionFile 是否生成 JSON、是否增长;检查 Flume 运行用户权限 |
| HDFS 写入失败、反复重试 | HDFS 权限/NameNode 连接/路径不存在 | Flume 日志搜 hdfs / Permission denied / Failed;HDFS 侧看目标目录权限 |
| 文件迟迟不滚动、HDFS 目录空 | rollSize/rollInterval/rollCount 都被禁用或条件不触发 | 低流量测试加 rollInterval(如 60/300 秒)或调小 rollSize |
| 内存飙升/Channel OOM/吞吐抖动 | memory channel 容量过大、下游 HDFS 写慢导致堆积 | 降低 capacity;加大 Sink 并发/优化 HDFS;必要时换 file channel |
| 数据重复/缺失(断点后异常) | positionFile 被清空/回滚;日志轮转策略与 Taildir 追踪不一致 | 保持 positionFile 稳定持久;避免频繁删改;日志轮转用追加模式更稳 |
| 分区日期不对(按 UTC/时间错位) | 时间戳来源与 useLocalTimeStamp 不一致 | 开启 hdfs.useLocalTimeStamp=true |