概述

本文介绍基于埋点日志中的广告事件,完成离线数仓从 ODS 到 DWD 的解析、清洗与明细建模。核心链路可落地,但当前脚本与字段设计存在一些风险点。

需求分析

事件日志数据样例

{
  "wzk_event": [{
    "name": "goods_detail_loading",
    "json": {
      "entry": "3",
      "goodsid": "0",
      "loading_time": "80",
      "action": "4",
      "staytime": "68",
      "showtype": "4"
    },
    "time": 1596225273755
  }, {
    "name": "loading",
    "json": {
      "loading_time": "18",
      "action": "1",
      "loading_type": "2",
      "type": "3"
    },
    "time": 1596231657803
  }, ...

采集的事件类型

  • 商品详情页加载:goods_detail_loading
  • 商品列表:loading
  • 消息通知:notification
  • 商品评论:comment
  • 收藏:favorites
  • 点赞:praise
  • 广告:ad

广告字段说明

  • action:用户行为(0 曝光、1 曝光后点击、2 购买)
  • duration:停留时长
  • shop_id:商家 id
  • event_type:“ad”
  • ad_type:1 JPG、2 PNG、3 GIF、4 SWF
  • show_style:0 静态图、1 动态图
  • product_id:产品 id
  • place:广告位置(1 首页、2 左侧、3 右侧、4 列表页)
  • sort:排序位置

需求指标

点击次数统计(分时统计)

  • 曝光次数、不同用户 ID 数、不同用户数
  • 点击次数、不同用户 ID 数、不同用户数
  • 购买次数、不同用户 ID 数、不同用户数

转化率-漏斗分析

  • 点击率 = 点击次数/曝光次数
  • 购买率 = 点击次数/点击次数

活动曝光效果评估

  • 行为(曝光、点击、购买)、时间段、广告位、产品、统计对应的次数
  • 时间段、广告位、商品,曝光次数最多的前 N 个

ODS 层

建立外部表

use ods;
drop table if exists ods.ods_log_event;
CREATE EXTERNAL TABLE ods.ods_log_event(
  `str` string
) PARTITIONED BY (`dt` string)
STORED AS TEXTFILE
LOCATION '/user/data/logs/event';

分区加载脚本

vim /opt/wzk/hive/ods_load_event_log.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
  do_date=$1
else
  do_date=`date -d "-1 day" +%F`
fi
sql="
alter table ods.ods_log_event add partition (dt='$do_date');
"
hive -e "$sql"

DWD 层

建立事件明细表

use dwd;
-- 所有事件明细
drop table if exists dwd.dwd_event_log;
CREATE EXTERNAL TABLE dwd.dwd_event_log(
  `device_id` string,
  `uid` string,
  `app_v` string,
  `os_type` string,
  `event_type` string,
  `language` string,
  `channel` string,
  `area` string,
  `brand` string,
  `name` string,
  `event_json` string,
  `report_time` string)
PARTITIONED BY (`dt` string)
stored as parquet;

建立广告明细表

use dwd;
-- 广告点击明细
drop table if exists dwd.dwd_ad;
CREATE TABLE dwd.dwd_ad(
  `device_id` string,
  `uid` string,
  `app_v` string,
  `os_type` string,
  `event_type` string,
  `language` string,
  `channel` string,
  `area` string,
  `brand` string,
  `report_time` string,
  `duration` int,
  `ad_action` int,
  `shop_id` int,
  `ad_type` int,
  `show_style` smallint,
  `product_id` int,
  `place` string,
  `sort` int,
  `hour` string
)
PARTITIONED BY (`dt` string)
stored as parquet;

UDF 开发

UDF 作用

  • 实现复杂业务逻辑封装
  • 提高代码复用性
  • 扩展 SQL 功能
  • 优化查询性能

JSON 数组解析 UDF

public class ParseJsonArray extends UDF {
    public ArrayList<String> evaluate(String jsonStr) {
        if (Strings.isNullOrEmpty(jsonStr)) {
            return null;
        }
        try{
            // 获取 jsonArray
            JSONArray jsonArray = JSON.parseArray(jsonStr);
            ArrayList<String> lst = new ArrayList<>();
            for(Object o: jsonArray) {
                lst.add(o.toString());
            }
            return lst;
        }catch (JSONException e){
            return null;
        }
    }
}

数据加载脚本

事件日志加载脚本

vim /opt/wzk/hive/dwd_load_event_log.sh

USE dwd;
add jar /opt/wzk/hive-parse-json-array-1.0-SNAPSHOT-jar-with-dependencies.jar;
CREATE temporary function wzk_json_array AS 'icu.wzk.ParseJsonArray';

WITH tmp_start AS (
  SELECT SPLIT(str, ' ')[7] AS line
  FROM ods.ods_log_event
  WHERE dt='$do_date'
)

-- 插入数据到目标表 dwd_event_log
INSERT OVERWRITE TABLE dwd.dwd_event_log
PARTITION (dt='$do_date')
SELECT
  device_id,
  uid,
  app_v,
  os_type,
  event_type,
  language,
  channel,
  area,
  brand,
  get_json_object(k, '$.name') AS name,
  get_json_object(k, '$.json') AS json,
  get_json_object(k, '$.time') AS time
FROM (
  SELECT
    get_json_object(line, '$.attr.device_id') AS device_id,
    get_json_object(line, '$.attr.uid') AS uid,
    get_json_object(line, '$.attr.app_v') AS app_v,
    get_json_object(line, '$.attr.os_type') AS os_type,
    get_json_object(line, '$.attr.event_type') AS event_type,
    get_json_object(line, '$.attr.language') AS language,
    get_json_object(line, '$.attr.channel') AS channel,
    get_json_object(line, '$.attr.area') AS area,
    get_json_object(line, '$.attr.brand') AS brand,
    get_json_object(line, '$.wzk_event') AS wzk_event,
    line
  FROM tmp_start
) A
LATERAL VIEW EXPLODE(wzk_json_array(line, 'wzk_event')) B AS k;

hive -e "$sql"

广告日志加载脚本

vim /opt/wzk/hive/dwd_load_ad_log.sh

#!/bin/bash
source /etc/profile
if [ -n "$1" ]; then
  do_date=$1
else
  do_date=$(date -d "-1 day" +%F)
fi

sql="
INSERT OVERWRITE TABLE dwd.dwd_ad
PARTITION (dt='$do_date')
SELECT
  device_id,
  uid,
  app_v,
  os_type,
  event_type,
  language,
  channel,
  area,
  brand,
  report_time,
  get_json_object(event_json, '$.duration') AS duration,
  get_json_object(event_json, '$.ad_action') AS ad_action,
  get_json_object(event_json, '$.shop_id') AS shop_id,
  get_json_object(event_json, '$.ad_type') AS ad_type,
  get_json_object(event_json, '$.show_style') AS show_style,
  get_json_object(event_json, '$.product_id') AS product_id,
  get_json_object(event_json, '$.place') AS place,
  get_json_object(event_json, '$.sort') AS sort,
  from_unixtime(ceil(report_time/1000), 'HH') AS report_hour
FROM dwd.dwd_event_log
WHERE dt='$do_date' AND name='ad';
"

hive -e "$sql"

错误速查卡

症状根因定位修复
UDF 调用报参数不匹配Java UDF evaluate(String) 只接收 1 个参数,但 SQL 写成 wzk_json_array(line, ‘wzk_event’)检查 UDF 类签名与调用参数个数
广告字段解析后大量为空需求字段是 action,脚本却取 $.ad_action对照原始埋点 JSON 将 $.ad_action 改为 $.action
place、action 字段类型混乱表结构与埋点字段类型设计不一致统一数值型字段类型
小时字段无法正确生成report_time 是 string 类型直接参与计算显式转 bigint:from_unixtime(cast(report_time as bigint)/1000, ‘HH’)
事件拆分结果异常SPLIT(str, ’ ’)[7] 依赖固定空格位置改为更稳妥的正则提取或直接按 JSON 原文入湖
字段含义混淆SELECT 别名与目标表字段名不一致显式使用与表结构一致的别名

数据流转链路

日志 → Flume → ODS 层 → 清洗、转换 → DWD 层(事件明细) → 广告事件过滤 → 广告宽表

通过本方案可实现广告曝光、点击、购买的分时统计、漏斗转化率分析与广告位效果评估。