Big Data 243 - Offline Data Warehouse: E-commerce Core Transaction Incremental Import

TL;DR

  • Scenario: Three core e-commerce transaction tables do daily incremental to offline data warehouse ODS, partitioned by dt
  • Conclusion: DataX uses MySQLReader + HDFSWriter, extracts by time field, HDFS directory partitioned, Hive only does partition mounting
  • Output: 3 reusable JSON jobs + one execution order + common pitfalls quick reference card

Business Requirements

The most critical business in e-commerce systems, e-commerce operational activities are centered around this theme. Selected metrics include: order count, product count, payment amount, analyzed by sales region and product type.

Requirement Areas

Core e-commerce transactions can be divided into the following main stages:

  • Product Browsing: User behavior data for browsing products
  • Add to Cart: User behavior of adding products to shopping cart
  • Order Placement: Order generation behavior completed by users on the e-commerce platform
  • Payment: Payment is a crucial part of transactions
  • Shipping: Shipping data records merchant shipping time, logistics company, tracking number, etc.
  • Receipt and Review: User reviews, returns, and exchanges after receiving products

Incremental Data Import

Three incremental tables:

  • Order table wzk_trade_orders
  • Order product table wzk_order_produce
  • Product information table wzk_product_info

Order Table

wzk_trade_orders => ods.ods_trade_orders

JSON configuration example:

{
  "job": {
    "setting": {
      "speed": { "channel": 1 },
      "errorLimit": { "record": 0 }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "hive",
            "password": "hive@wzk.icu",
            "connection": [
              {
                "querySql": [
                  "select orderId, orderNo, userId, status, productMoney, totalMoney, payMethod, isPay, areaId, tradeSrc, tradeType, isRefund, dataFlag, createTime, payTime, modifiedTime from wzk_trade_orders where date_format(modifiedTime, '%Y-%m-%d')='$do_date'"
                ],
                "jdbcUrl": ["jdbc:mysql://h122.wzk.icu:3306/ebiz"]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://h121.wzk.icu:9000",
            "fileType": "text",
            "path": "/user/data/trade.db/orders/dt=$do_date",
            "fileName": "orders_$do_date",
            "column": [
              { "name": "orderId", "type": "INT" },
              { "name": "orderNo", "type": "STRING" },
              { "name": "userId", "type": "BIGINT" },
              { "name": "status", "type": "TINYINT" },
              { "name": "productMoney", "type": "Float" },
              { "name": "totalMoney", "type": "Float" },
              { "name": "payMethod", "type": "TINYINT" },
              { "name": "isPay", "type": "TINYINT" },
              { "name": "areaId", "type": "INT" },
              { "name": "tradeSrc", "type": "TINYINT" },
              { "name": "tradeType", "type": "INT" },
              { "name": "isRefund", "type": "TINYINT" },
              { "name": "dataFlag", "type": "TINYINT" },
              { "name": "createTime", "type": "STRING" },
              { "name": "payTime", "type": "STRING" },
              { "name": "modifiedTime", "type": "STRING" }
            ],
            "writeMode": "append",
            "fieldDelimiter": ","
          }
        }
      }
    ]
  }
}

Execute command:

do_date='2020-07-12'
# Create directory
hdfs dfs -mkdir -p /user/data/trade.db/orders/dt=$do_date
# Data migration
python $DATAX_HOME/bin/datax.py -p "-Ddo_date=$do_date" /opt/wzk/datax/orders.json
# Load data
hive -e "alter table ods.ods_trade_orders add partition(dt='$do_date')"

Order Detail Table

wzk_order_product => ods.ods_trade_order_product

JSON configuration similar, using createTime as the incremental field

Product Information Table

wzk_product_info => ods.ods_trade_product_info

JSON configuration similar, using modifyTime as the incremental field

Error Quick Reference

SymptomRoot CauseDiagnosisFix
DataX reads 0 records, task succeeds but no dataWrong time field for incremental or no changes that dayExecute same SQL directly in MySQL firstClarify “incremental definition”: orders use modifiedTime, details use createTime, products use modifyTime
MySQLReader very slow/database under heavy loaddate_format(col,‘%Y-%m-%d’) causes index to not be usedMySQL EXPLAIN to see if key is命中Change to range filter: col >= ‘$do_date 00:00:00’ and col < ‘$do_date 23:59:59’
HDFSWriter reports path doesn’t exist/insufficient permissionsDirectory not created or HDFS permissions don’t allow writinghdfs dfs -ls to check directory/ownerPre-create directory and authorize
Written file format normal but Hive can’t query dataHive partition not mounted or partition path inconsistentshow partitions / describe formattedExecute alter table … add partition(dt=’…’)
Amount precision abnormal (0.1 becomes 0.10000001)Float storage causes precision issueCompare MySQL vs HDFS/Hive resultsChange amount to DECIMAL
Incremental duplicates/missing datawriteMode=append and rerunning same day causes duplicatesCompare partition file line count vs MySQL daily countDesign for idempotency: clean partition directory first before writing