Overview

This article demonstrates a complete offline data warehouse pipeline from log collection to member metric analysis. Flume Taildir monitors local log directories, writes data to HDFS partition directories, then Hive external tables load to ODS, further clean to DWD, ultimately supporting active members, new members, member retention metrics calculation.

Data Flow

Data Collection => ODS => DWD => DWS => ADS => MySQL
Active Members, New Members, Member Retention

Script Execution Order

# Load ODS / DWD layer collection
ods_load_startlog.sh
dwd_load_startlog.sh
# Active Members
dws_load_member_start.sh
ads_load_member_active.sh
# New Members
dws_load_member_add_day.sh
ads_load_member_add.sh
# Member Retention
dws_load_member_retention_day.sh
ads_load_member_retention.sh

Flume Configuration

Monitor log directory: /opt/wzk/logs/start/.*log

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# taildir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/wzk/conf/startlog_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/wzk/logs/start/.*log
a1.sources.r1.headers.f1.logtype = start
a1.sources.r1.filegroups.f2 = /opt/wzk/logs/event/.*log
a1.sources.r1.headers.f2.logtype = event

# custom interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = icu.wzk.LogTypeInterceptor$Builder

# memorychannel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 2000

# hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}/
a1.sinks.k1.hdfs.filePrefix = startlog.
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollSize = 33554432
a1.sinks.k1.hdfs.batchSize = 1000

Start command:

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs2.conf -name a1 -Dflume.roog.logger=INFO,console

HDFS Data Cleanup

hadoop fs -rm -r /user/data/logs/start/
hadoop fs -rm -r /user/data/logs/event/

hadoop fs -mkdir /user/data/logs/start
hadoop fs -mkdir /user/data/logs/event

ODS Layer

Definition

ODS (Operational Data Store) is the operational data storage layer, used to store raw data extracted from business systems, serving as the “data input” of the data warehouse.

Functions

  • Data Complete Storage: Maintains data originality for problem tracing
  • Decoupling: Implements isolation between business systems and data warehouse
  • Real-time Support: If data needs real-time analysis, ODS can support near-real-time data synchronization

Load ODS

sh ods_load_startlog.sh 2020-07-21

Hive query:

use ods;
select * from ods_start_log limit 3;

DWD Layer

Definition

DWD (Data Warehouse Detail) is the detail data layer, storing cleaned and lightly processed wide table data.

Functions

  • Data Refinement: Cleaned and standardized data for subsequent subject modeling
  • Performance Optimization: Reduces data cleaning and transformation overhead for subsequent queries
  • Dimension Wide Table: Wide table joins multiple tables for business analysis

Data Processing

  • Data Cleaning: Remove dirty data, handle null values, unify formats
  • Data Integration: Join multiple tables according to business logic, add computed fields
  • Deduplication and Normalization: Ensure data uniqueness

Load DWD

sh dwd_load_startlog.sh 2020-07-21

Hive query:

use dwd;
select * from dwd.dwd_start_log limit 3;

Error Troubleshooting

SymptomRoot CauseFix
No expected log output after Flume startsStart parameter spelling errorCheck and change -Dflume.roog.logger to standard logger parameter
HDFS cleanup causes subsequent collection directory exceptionInconsistent spelling between enent and eventCompare delete command with create command, unify to event
Flume cannot monitor filesMonitor directory inconsistent with copy directoryCheck filegroups regex and cp target directory
HDFS has directory but Hive cannot find dataPartition directory date field depends on interceptorCheck custom interceptor LogTypeInterceptor output header
ODS queryable but DWD has no dataDWD script date parameter doesn’t match ODS actual partitionVerify script parameters and ODS partition unify business date口径
No new consumption after log file copiedTaildir positionFile recorded old offsetCheck positionFile cleanup or reset before test
HDFS file rolling abnormalrollCount/rollInterval/idleTimeout all set to 0Check sink rolling parameter settings for reasonable thresholds
Dates in article confusingSample log time spans 2020/2021/2024Verify date fragments in main text unify demo date