大数据-243 离线数仓 - 实战电商核心交易增量导入(DataX - HDFS - Hive 分区)
TL;DR
- 场景:电商核心交易三张表做每日增量,落地离线数仓 ODS,按 dt 分区
- 结论:DataX 用 MySQLReader + HDFSWriter,按时间字段抽取,HDFS 目录分区化,Hive 只做分区挂载
- 产出:3 份可复用 JSON 任务 + 一套执行顺序 + 常见坑位速查卡
业务需求
电商系统业务中最关键的业务,电商的运营活动都是围绕这个主题展开。 选取的指标包括:订单数、商品数、支付金额,对这些指标按销售区域、商品类型分析。
需求板块
电商平台的核心交易可以分为以下几个主要环节:
- 商品浏览:用户浏览商品的行为数据
- 加入购物车:用户将商品添加到购物车中的行为
- 下单:用户在电商平台上完成的订单生成行为
- 支付:支付是交易中至关重要的环节
- 发货:商品发货数据记录了商家发货的时间、物流公司、物流单号等信息
- 收货和评价:用户收到商品后的评价、退换货行为等
增量数据导入
三张增量表:
- 订单表 wzk_trade_orders
- 订单产品表 wzk_order_produce
- 产品信息表 wzk_product_info
订单表
wzk_trade_orders => ods.ods_trade_orders
JSON 配置示例:
{
"job": {
"setting": {
"speed": { "channel": 1 },
"errorLimit": { "record": 0 }
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "hive",
"password": "hive@wzk.icu",
"connection": [
{
"querySql": [
"select orderId, orderNo, userId, status, productMoney, totalMoney, payMethod, isPay, areaId, tradeSrc, tradeType, isRefund, dataFlag, createTime, payTime, modifiedTime from wzk_trade_orders where date_format(modifiedTime, '%Y-%m-%d')='$do_date'"
],
"jdbcUrl": ["jdbc:mysql://h122.wzk.icu:3306/ebiz"]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://h121.wzk.icu:9000",
"fileType": "text",
"path": "/user/data/trade.db/orders/dt=$do_date",
"fileName": "orders_$do_date",
"column": [
{ "name": "orderId", "type": "INT" },
{ "name": "orderNo", "type": "STRING" },
{ "name": "userId", "type": "BIGINT" },
{ "name": "status", "type": "TINYINT" },
{ "name": "productMoney", "type": "Float" },
{ "name": "totalMoney", "type": "Float" },
{ "name": "payMethod", "type": "TINYINT" },
{ "name": "isPay", "type": "TINYINT" },
{ "name": "areaId", "type": "INT" },
{ "name": "tradeSrc", "type": "TINYINT" },
{ "name": "tradeType", "type": "INT" },
{ "name": "isRefund", "type": "TINYINT" },
{ "name": "dataFlag", "type": "TINYINT" },
{ "name": "createTime", "type": "STRING" },
{ "name": "payTime", "type": "STRING" },
{ "name": "modifiedTime", "type": "STRING" }
],
"writeMode": "append",
"fieldDelimiter": ","
}
}
}
]
}
}
执行命令:
do_date='2020-07-12'
# 创建目录
hdfs dfs -mkdir -p /user/data/trade.db/orders/dt=$do_date
# 数据迁移
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/orders.json
# 加载数据
hive -e "alter table ods.ods_trade_orders add partition(dt='$do_date')"
订单明细表
wzk_order_product => ods.ods_trade_order_product
JSON 配置类似,使用 createTime 作为增量字段
产品明细表
wzk_product_info => ods.ods_trade_product_info
JSON 配置类似,使用 modifyTime 作为增量字段
错误速查
| 症状 | 根因 | 定位 | 修复 |
|---|---|---|---|
| DataX 读取 0 条,任务成功但没数据 | 增量条件用错时间字段或当天无变更 | 先在 MySQL 直接执行同款 SQL | 明确”增量口径”:订单用 modifiedTime、明细用 createTime、产品用 modifyTime |
| MySQLReader 很慢/数据库压力大 | date_format(col,‘%Y-%m-%d’) 让索引失效 | MySQL EXPLAIN 看 key 是否命中 | 改为范围过滤:col >= ‘$do_date 00:00:00’ and col < ‘$do_date 23:59:59’ |
| HDFSWriter 报路径不存在/权限不足 | 目录未创建或 HDFS 权限不允许写入 | hdfs dfs -ls 看目录/owner | 预创建目录并授权 |
| 写入文件格式正常但 Hive 查询不到数据 | Hive 分区未挂载或分区路径不一致 | show partitions / describe formatted | 执行 alter table … add partition(dt=’…’) |
| 金额精度异常(0.1 变 0.10000001) | Float 存储导致精度问题 | 对比 MySQL 与 HDFS/Hive 结果 | 金额改用 DECIMAL |
| 增量重复/漏数 | writeMode=append 且同一天重跑会重复 | 对比分区文件行数与 MySQL 当天 count | 设计幂等:分区目录按天先清理再写 |