Overview
This article introduces completing parsing, cleaning, and detail modeling from ODS to DWD for offline data warehouse based on advertising events in tracking logs. The core pipeline can be implemented, but there are some risk points in current script and field design.
Requirements Analysis
Event Log Data Sample
{
"wzk_event": [{
"name": "goods_detail_loading",
"json": {
"entry": "3",
"goodsid": "0",
"loading_time": "80",
"action": "4",
"staytime": "68",
"showtype": "4"
},
"time": 1596225273755
}, {
"name": "loading",
"json": {
"loading_time": "18",
"action": "1",
"loading_type": "2",
"type": "3"
},
"time": 1596231657803
}, ...
Collected Event Types
- Goods detail page loading: goods_detail_loading
- Goods list: loading
- Message notification: notification
- Product comment: comment
- Favorites: favorites
- Like/Praise: praise
- Advertisement: ad
Advertising Field Description
- action: User behavior (0 impression, 1 click after impression, 2 purchase)
- duration: Stay duration
- shop_id: Merchant id
- event_type: “ad”
- ad_type: 1 JPG, 2 PNG, 3 GIF, 4 SWF
- show_style: 0 static image, 1 dynamic image
- product_id: Product id
- place: Ad placement (1 homepage, 2 left, 3 right, 4 list page)
- sort: Sort position
Requirements Metrics
Click Count Statistics (Hourly Statistics)
- Impression count, distinct user ID count, distinct user count
- Click count, distinct user ID count, distinct user count
- Purchase count, distinct user ID count, distinct user count
Conversion Rate - Funnel Analysis
- Click-through rate = Click count / Impression count
- Purchase rate = Click count / Click count
Campaign Impression Effect Evaluation
- Behavior (impression, click, purchase), time period, ad placement, product, count corresponding times
- Time period, ad placement, product, top N with most impression counts
ODS Layer
Create External Table
use ods;
drop table if exists ods.ods_log_event;
CREATE EXTERNAL TABLE ods.ods_log_event(
`str` string
) PARTITIONED BY (`dt` string)
STORED AS TEXTFILE
LOCATION '/user/data/logs/event';
Partition Loading Script
vim /opt/wzk/hive/ods_load_event_log.sh
#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
alter table ods.ods_log_event add partition (dt='$do_date');
"
hive -e "$sql"
DWD Layer
Create Event Detail Table
use dwd;
-- All event details
drop table if exists dwd.dwd_event_log;
CREATE EXTERNAL TABLE dwd.dwd_event_log(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`event_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`name` string,
`event_json` string,
`report_time` string)
PARTITIONED BY (`dt` string)
stored as parquet;
Create Advertising Detail Table
use dwd;
-- Advertising click detail
drop table if exists dwd.dwd_ad;
CREATE TABLE dwd.dwd_ad(
`device_id` string,
`uid` string,
`app_v` string,
`os_type` string,
`event_type` string,
`language` string,
`channel` string,
`area` string,
`brand` string,
`report_time` string,
`duration` int,
`ad_action` int,
`shop_id` int,
`ad_type` int,
`show_style` smallint,
`product_id` int,
`place` string,
`sort` int,
`hour` string
)
PARTITIONED BY (`dt` string)
stored as parquet;
UDF Development
UDF Purpose
- Implement complex business logic encapsulation
- Improve code reusability
- Extend SQL functionality
- Optimize query performance
JSON Array Parsing UDF
public class ParseJsonArray extends UDF {
public ArrayList<String> evaluate(String jsonStr) {
if (Strings.isNullOrEmpty(jsonStr)) {
return null;
}
try{
// Get jsonArray
JSONArray jsonArray = JSON.parseArray(jsonStr);
ArrayList<String> lst = new ArrayList<>();
for(Object o: jsonArray) {
lst.add(o.toString());
}
return lst;
}catch (JSONException e){
return null;
}
}
}
Data Loading Scripts
Event Log Loading Script
vim /opt/wzk/hive/dwd_load_event_log.sh
USE dwd;
add jar /opt/wzk/hive-parse-json-array-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE temporary function wzk_json_array AS 'icu.wzk.ParseJsonArray';
WITH tmp_start AS (
SELECT SPLIT(str, ' ')[7] AS line
FROM ods.ods_log_event
WHERE dt='$do_date'
)
-- Insert data into target table dwd_event_log
INSERT OVERWRITE TABLE dwd.dwd_event_log
PARTITION (dt='$do_date')
SELECT
device_id,
uid,
app_v,
os_type,
event_type,
language,
channel,
area,
brand,
get_json_object(k, '$.name') AS name,
get_json_object(k, '$.json') AS json,
get_json_object(k, '$.time') AS time
FROM (
SELECT
get_json_object(line, '$.attr.device_id') AS device_id,
get_json_object(line, '$.attr.uid') AS uid,
get_json_object(line, '$.attr.app_v') AS app_v,
get_json_object(line, '$.attr.os_type') AS os_type,
get_json_object(line, '$.attr.event_type') AS event_type,
get_json_object(line, '$.attr.language') AS language,
get_json_object(line, '$.attr.channel') AS channel,
get_json_object(line, '$.attr.area') AS area,
get_json_object(line, '$.attr.brand') AS brand,
get_json_object(line, '$.wzk_event') AS wzk_event,
line
FROM tmp_start
) A
LATERAL VIEW EXPLODE(wzk_json_array(line, 'wzk_event')) B AS k;
hive -e "$sql"
Advertising Log Loading Script
vim /opt/wzk/hive/dwd_load_ad_log.sh
#!/bin/bash
source /etc/profile
if [ -n "$1" ]; then
do_date=$1
else
do_date=$(date -d "-1 day" +%F)
fi
sql="
INSERT OVERWRITE TABLE dwd.dwd_ad
PARTITION (dt='$do_date')
SELECT
device_id,
uid,
app_v,
os_type,
event_type,
language,
channel,
area,
brand,
report_time,
get_json_object(event_json, '$.duration') AS duration,
get_json_object(event_json, '$.ad_action') AS ad_action,
get_json_object(event_json, '$.shop_id') AS shop_id,
get_json_object(event_json, '$.ad_type') AS ad_type,
get_json_object(event_json, '$.show_style') AS show_style,
get_json_object(event_json, '$.product_id') AS product_id,
get_json_object(event_json, '$.place') AS place,
get_json_object(event_json, '$.sort') AS sort,
from_unixtime(ceil(report_time/1000), 'HH') AS report_hour
FROM dwd.dwd_event_log
WHERE dt='$do_date' AND name='ad';
"
hive -e "$sql"
Error Troubleshooting
| Symptom | Root Cause | Fix |
|---|---|---|
| UDF call reports parameter mismatch | Java UDF evaluate(String) only accepts 1 parameter, but SQL written as wzk_json_array(line, ‘wzk_event’) | Check UDF class signature and call parameter count |
| Many advertising fields parsed as null | Required field is action, but script gets $.ad_action | Change $.ad_action to $.action according to original tracking JSON |
| place, action field type confusion | Table structure and tracking field type design inconsistent | Unify numeric field types |
| Hour field cannot be correctly generated | report_time is string type directly participates in calculation | Explicitly cast to bigint: from_unixtime(cast(report_time as bigint)/1000, ‘HH’) |
| Event split result abnormal | SPLIT(str, ’ ’)[7] depends on fixed space position | Change to more stable regex extraction or directly write JSON original to lake |
| Field meaning confusion | SELECT alias doesn’t match target table field name | Explicitly use alias consistent with table structure |
Data Flow Pipeline
Log => Flume => ODS Layer => Cleaning, Transformation => DWD Layer (Event Detail) => Advertising Event Filtering => Advertising Wide Table
Through this solution, hourly statistics, funnel conversion rate analysis, and ad placement effect evaluation for advertising impressions, clicks, and purchases can be implemented.