Big Data 249 - Offline Data Warehouse DWD and DWS Layer
Basic Introduction
There are two tables to process: Order Table and Order Product Table.
- Order Table: This is a periodic fact table. To preserve order status history, we can use a zipper table (slowly changing dimension) for processing.
- Order Product Table: A regular fact table, processed using standard methods.
Order Status Codes:
- -3: User rejected delivery
- -2: Unpaid order
- -1: User cancelled
- 0: Awaiting shipment
- 1: In delivery
- 2: User confirmed receipt
Role of the DWD Layer
The DWD layer can be understood as the data warehouse’s “detail layer” or “refinement layer.” Its core function is to transform raw data from the ODS layer into higher quality and greater business value:
- Positioned in the middle of the data warehouse’s layered architecture
- Performs business logic processing, data cleaning, deduplication, and normalization on ODS layer data to form detailed business fact tables
- Provides standardized data sources for the DWS layer and other upper-layer applications
DWD Layer Table Creation
Order Fact Table (Zipper Table):
-- Order Fact Table (Zipper Table)
DROP TABLE IF EXISTS dwd.dwd_trade_orders;
create table dwd.dwd_trade_orders(
`orderId` int,
`orderNo` string,
`userId` bigint,
`status` tinyint,
`productMoney` decimal,
`totalMoney` decimal,
`payMethod` tinyint,
`isPay` tinyint,
`areaId` int,
`tradeSrc` tinyint,
`tradeType` int,
`isRefund` tinyint,
`dataFlag` tinyint,
`createTime` string,
`payTime` string,
`modifiedTime` string,
`start_date` string,
`end_date` string
) COMMENT 'Order Fact Zipper Table'
partitioned by (dt string)
STORED AS PARQUET;
DWD Layer Data Loading Script
dwd_load_trade_orders.sh:
#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
INSERT OVERWRITE TABLE dwd.dwd_trade_orders
partition(dt)
SELECT orderId,
orderNo,
userId,
status,
productMoney,
totalMoney,
payMethod,
isPay,
areaId,
tradeSrc,
tradeType,
isRefund,
dataFlag,
createTime,
payTime,
modifiedTime,
case when modifiedTime is not null
then from_unixtime(unix_timestamp(modifiedTime, 'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd')
else from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd')
end as start_date,
'9999-12-31' as end_date,
from_unixtime(unix_timestamp(createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') as dt
FROM ods.ods_trade_orders
WHERE dt='$do_date'
union all
SELECT A.orderId,
A.orderNo,
A.userId,
A.status,
A.productMoney,
A.totalMoney,
A.payMethod,
A.isPay,
A.areaId,
A.tradeSrc,
A.tradeType,
A.isRefund,
A.dataFlag,
A.createTime,
A.payTime,
A.modifiedTime,
A.start_date,
CASE WHEN B.orderid IS NOT NULL AND A.end_date > '$do_date'
THEN date_add('$do_date', -1)
ELSE A.end_date END AS end_date,
from_unixtime(unix_timestamp(A.createTime, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd') as dt
FROM (SELECT * FROM dwd.dwd_trade_orders WHERE dt>date_add('$do_date', -15)) A
left outer join (SELECT * FROM ods.ods_trade_orders WHERE dt='$do_date') B
ON A.orderId = B.orderId;
"
hive -e "$sql"
DWS Layer Table Creation
Order Detail Table (Lightweight Summary Fact Table):
-- Order Detail Table (Lightweight Summary Fact Table). Each order's details
DROP TABLE IF EXISTS dws.dws_trade_orders;
create table if not exists dws.dws_trade_orders(
orderid string, -- Order ID
cat_3rd_id string, -- Product third-level category ID
shopid string, -- Shop ID
paymethod tinyint, -- Payment method
productsnum bigint, -- Product quantity
paymoney double, -- Order product detail amount
paytime string -- Order time
)
partitioned by (dt string)
STORED AS PARQUET;
Order Detail Wide Table:
-- Order Detail Wide Table
DROP TABLE IF EXISTS dws.dws_trade_orders_w;
create table if not exists dws.dws_trade_orders_w(
orderid string, -- Order ID
cat_3rd_id string, -- Product third-level category ID
thirdname string, -- Product third-level category name
secondname string, -- Product second-level category name
firstname string, -- Product first-level category name
shopid string, -- Shop ID
shopname string, -- Shop name
regionname string, -- Shop region
cityname string, -- Shop city
paymethod tinyint, -- Payment method
productsnum bigint, -- Product quantity
paymoney double, -- Order detail amount
paytime string -- Order time
)
partitioned by (dt string)
STORED AS PARQUET;
DWS Layer Data Loading Script
dws_load_trade_orders.sh:
#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
insert overwrite table dws.dws_trade_orders
partition(dt='$do_date')
select t1.orderid as orderid,
t3.categoryid as cat_3rd_id,
t3.shopid as shopid,
t1.paymethod as paymethod,
t2.productnum as productsnum,
t2.productnum*t2.productprice as pay_money,
t1.paytime as paytime
from (select orderid, paymethod, paytime
from dwd.dwd_trade_orders
where dt='$do_date') T1
left join
(select orderid, productid, productnum, productprice
from ods.ods_trade_order_product
where dt='$do_date') T2
on t1.orderid = t2.orderid
left join
(select productid, shopid, categoryid
from dim.dim_trade_product_info
where start_dt <= '$do_date'
and end_dt >= '$do_date' ) T3
on t2.productid=t3.productid;
insert overwrite table dws.dws_trade_orders_w
partition(dt='$do_date')
select t1.orderid,
t1.cat_3rd_id,
t2.thirdname,
t2.secondname,
t2.firstname,
t1.shopid,
t3.shopname,
t3.regionname,
t3.cityname,
t1.paymethod,
t1.productsnum,
t1.paymoney,
t1.paytime
from (select orderid,
cat_3rd_id,
shopid,
paymethod,
productsnum,
paymoney,
paytime
from dws.dws_trade_orders
where dt='$do_date') T1
join
(select thirdid, thirdname, secondid, secondname,
firstid, firstname
from dim.dim_trade_product_cat
where dt='$do_date') T2
on T1.cat_3rd_id = T2.thirdid
join
(select shopid, shopname, regionname, cityname
from dim.dim_trade_shops_org
where dt='$do_date') T3
on T1.shopid = T3.shopid
"
hive -e "$sql"
Data Source Tables
dwd.dwd_trade_orders(Zipper table, partitioned)ods.ods_trade_product(Partitioned table)dim.dim_trade_product_info(Dimension table, zipper table)dim.dim_trade_product_cat(Partitioned table)dim.dim_trade_shops_org(Partitioned table)