Offline Data Warehouse - ADS Layer & Airflow Task Scheduling System
ADS Layer Development
Requirements
Calculate for the current day:
- All order information nationwide
- Order information by primary product category nationwide
- Order information by secondary product category nationwide
- All order information by region
- Order information by primary product category by region
- Order information by secondary product category by region
- All order information by city
- Order information by primary product category by city
- Order information by secondary product category by city
Table used: dws.dws_trade_orders_w
ADS Layer Table Creation
-- ADS Layer Order Analysis Table
DROP TABLE IF EXISTS ads.ads_trade_order_analysis;
create table if not exists ads.ads_trade_order_analysis(
areatype string, -- Region type: national, region, city
regionname string, -- Region name
cityname string, -- City name
categorytype string, -- Product category type: primary, secondary
category1 string, -- Primary category name
category2 string, -- Secondary category name
totalcount bigint, -- Order count
total_productnum bigint, -- Product count
totalmoney double -- Payment amount
)
partitioned by (dt string)
row format delimited fields terminated by ',';
ADS Layer Data Loading
vim ads_load_trade_order_analysis.sh
One order may contain multiple products, and each product belongs to different categories. This causes one order to have multiple category records, which are counted separately:
#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
with mid_orders as (
select regionname,
cityname,
firstname category1,
secondname category2,
count(distinct orderid) as totalcount,
sum(productsnum) as total_productnum,
sum(paymoney) as totalmoney
from dws.dws_trade_orders_w
where dt='$do_date'
group by regionname, cityname, firstname, secondname
)
insert overwrite table ads.ads_trade_order_analysis
partition(dt='$do_date')
select '全国' as areatype,
'' as regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category1
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category2
union all
select '大区' as areatype,
regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname
union all
select '大区' as areatype,
regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category1
union all
select '大区' as areatype,
regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category2
union all
select '城市' as areatype,
'' as regionname,
cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname
union all
select '城市' as areatype,
'' as regionname,
cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category1
union all
select '城市' as areatype,
'' as regionname,
cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category2;
"
hive -e "$sql"
Note: Since one order may have multiple records in dws.dws_trade_orders_w, use count(distinct orderid) when counting orders.
Summary
# Load ODS data (including DataX migrated data)
/data/lagoudw/script/trade/ods_load_trade.sh
# Load DIM layer data
/data/lagoudw/script/trade/dim_load_product_cat.sh
/data/lagoudw/script/trade/dim_load_shop_org.sh
/data/lagoudw/script/trade/dim_load_payment.sh
/data/lagoudw/script/trade/dim_load_product_info.sh
# Load DWD layer data
/data/lagoudw/script/trade/dwd_load_trade_orders.sh
# Load DWS layer data
/data/lagoudw/script/trade/dws_load_trade_orders.sh
# Load ADS layer data
/data/lagoudw/script/trade/ads_load_trade_order_analysis.sh
Airflow Introduction
Airflow is an open-source scheduling tool developed by Airbnb, written in Python. It was launched in 2014 and open-sourced in spring 2016. In 2016, it joined the Apache Software Foundation’s incubation program.
Airflow defines a workflow as a directed acyclic graph (DAG) of tasks and assigns them to a set of compute nodes, executing them in order based on their dependencies.
Airflow advantages:
- Flexible and easy to use: Airflow is written in Python, workflow definitions are also in Python
- Powerful: Supports multiple types of jobs, can customize different types of jobs such as Shell, Python, MySQL, Oracle, Hive, etc.
- Simple and elegant: Job definitions are clear and straightforward
- Easily extensible: Provides various base classes for extension, with multiple executors to choose from
Apache Airflow is an open-source task scheduling and workflow management platform, primarily used for developing, debugging, and monitoring data pipelines. Airflow uses Python scripts to define tasks and dependencies, helping users programmatically build dynamic, visualized workflows.
Architecture
- WebServer: A daemon process that accepts HTTP requests and interacts with Airflow through a Python Flask web application. WebServer features include: aborting, resuming, and triggering tasks, monitoring running tasks, resuming from breakpoints, querying task status, viewing logs and other detailed information.
- Scheduler: A daemon process that periodically polls task scheduling to determine whether to trigger task execution.
- Worker: A daemon process responsible for starting Executor on the machine to execute tasks. With CeleryExecutor, Worker services can be deployed on multiple machines.
Core Features
Code-Based Workflow Definition
Workflows are defined as Python scripts, called DAGs (Directed Acyclic Graph). Each DAG includes a set of tasks and their dependencies. Supports parameterization and dynamic DAG generation, suitable for complex workflow scenarios.
Task Scheduling
Supports flexible task scheduling. Users can define task execution cycles through intervals, specific times, etc. The scheduler automatically triggers tasks in order based on dependencies.
Distributed Task Execution
Tasks can run distributedly, supporting horizontal scaling. Multiple Worker nodes can improve task processing capability.
Visualization Interface
Airflow provides a powerful Web UI to view DAG running status, logs, and historical records. Graphical display of DAG structure helps understand workflow execution at a glance.
High Extensibility
Supports extending functionality through plugin mechanisms, such as custom Operators or Hooks. Built-in various operators and connectors (like BashOperator, PythonOperator, MySqlOperator, etc.) for integrating various tasks and external systems.
Dependency Management
Airflow supports scheduling based on dependencies between tasks. Users can explicitly define task sequence and dependencies.
Key Concepts
DAG
DAG (Directed Acyclic Graph):
- In AirFlow, a DAG defines a complete job. All Tasks in the same DAG share the same scheduling time.
- Parameters: dag_id (unique identifier for DAG), default_args (default parameters, used if the current DAG instance job doesn’t have corresponding configuration)
- schedule_interval: Configure DAG execution cycle, can use crontab syntax
Task
- Task is a specific job in a DAG, depends on DAG, must exist in some DAG. Tasks can configure dependencies within the DAG.
- dag: The job belongs to the corresponding DAG
- task_id: Task identifier
- owner: Task owner
- start_date: Task start time