Overview
This article demonstrates a complete offline data warehouse pipeline from log collection to member metric analysis. Flume Taildir monitors local log directories, writes data to HDFS partition directories, then Hive external tables load to ODS, further clean to DWD, ultimately supporting active members, new members, member retention metrics calculation.
Data Flow
Data Collection => ODS => DWD => DWS => ADS => MySQL
Active Members, New Members, Member Retention
Script Execution Order
# Load ODS / DWD layer collection
ods_load_startlog.sh
dwd_load_startlog.sh
# Active Members
dws_load_member_start.sh
ads_load_member_active.sh
# New Members
dws_load_member_add_day.sh
ads_load_member_add.sh
# Member Retention
dws_load_member_retention_day.sh
ads_load_member_retention.sh
Flume Configuration
Monitor log directory: /opt/wzk/logs/start/.*log
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event
# custom interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder
# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000
# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.batchSize = 1000
Start command:
flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console
HDFS Data Cleanup
hadoop fs -rm -r /user/data/logs/start/
hadoop fs -rm -r /user/data/logs/event/
hadoop fs -mkdir /user/data/logs/start
hadoop fs -mkdir /user/data/logs/event
ODS Layer
Definition
ODS (Operational Data Store) is the operational data storage layer, used to store raw data extracted from business systems, serving as the “data input” of the data warehouse.
Functions
- Data Complete Storage: Maintains data originality for problem tracing
- Decoupling: Implements isolation between business systems and data warehouse
- Real-time Support: If data needs real-time analysis, ODS can support near-real-time data synchronization
Load ODS
sh ods_load_startlog.sh 2020-07-21
Hive query:
use ods;
select * from ods_start_log limit 3;
DWD Layer
Definition
DWD (Data Warehouse Detail) is the detail data layer, storing cleaned and lightly processed wide table data.
Functions
- Data Refinement: Cleaned and standardized data for subsequent subject modeling
- Performance Optimization: Reduces data cleaning and transformation overhead for subsequent queries
- Dimension Wide Table: Wide table joins multiple tables for business analysis
Data Processing
- Data Cleaning: Remove dirty data, handle null values, unify formats
- Data Integration: Join multiple tables according to business logic, add computed fields
- Deduplication and Normalization: Ensure data uniqueness
Load DWD
sh dwd_load_startlog.sh 2020-07-21
Hive query:
use dwd;
select * from dwd.dwd_start_log limit 3;
Error Troubleshooting
| Symptom | Root Cause | Fix |
|---|---|---|
| No expected log output after Flume starts | Start parameter spelling error | Check and change -Dflume.roog.logger to standard logger parameter |
| HDFS cleanup causes subsequent collection directory exception | Inconsistent spelling between enent and event | Compare delete command with create command, unify to event |
| Flume cannot monitor files | Monitor directory inconsistent with copy directory | Check filegroups regex and cp target directory |
| HDFS has directory but Hive cannot find data | Partition directory date field depends on interceptor | Check custom interceptor LogTypeInterceptor output header |
| ODS queryable but DWD has no data | DWD script date parameter doesn’t match ODS actual partition | Verify script parameters and ODS partition unify business date口径 |
| No new consumption after log file copied | Taildir positionFile recorded old offset | Check positionFile cleanup or reset before test |
| HDFS file rolling abnormal | rollCount/rollInterval/idleTimeout all set to 0 | Check sink rolling parameter settings for reasonable thresholds |
| Dates in article confusing | Sample log time spans 2020/2021/2024 | Verify date fragments in main text unify demo date |