Offline Data Warehouse - ADS Layer & Airflow Task Scheduling System

ADS Layer Development

Requirements

Calculate for the current day:

  • All order information nationwide
  • Order information by primary product category nationwide
  • Order information by secondary product category nationwide
  • All order information by region
  • Order information by primary product category by region
  • Order information by secondary product category by region
  • All order information by city
  • Order information by primary product category by city
  • Order information by secondary product category by city

Table used: dws.dws_trade_orders_w

ADS Layer Table Creation

-- ADS Layer Order Analysis Table
DROP TABLE IF EXISTS ads.ads_trade_order_analysis;
create table if not exists ads.ads_trade_order_analysis(
  areatype string, -- Region type: national, region, city
  regionname string, -- Region name
  cityname string, -- City name
  categorytype string, -- Product category type: primary, secondary
  category1 string, -- Primary category name
  category2 string, -- Secondary category name
  totalcount bigint, -- Order count
  total_productnum bigint, -- Product count
  totalmoney double -- Payment amount
)
partitioned by (dt string)
row format delimited fields terminated by ',';

ADS Layer Data Loading

vim ads_load_trade_order_analysis.sh

One order may contain multiple products, and each product belongs to different categories. This causes one order to have multiple category records, which are counted separately:

#!/bin/bash
source /etc/profile
if [ -n "$1" ]
then
  do_date=$1
else
  do_date=`date -d "-1 day" +%F`
fi
sql="
with mid_orders as (
  select regionname,
  cityname,
  firstname category1,
  secondname category2,
  count(distinct orderid) as totalcount,
  sum(productsnum) as total_productnum,
  sum(paymoney) as totalmoney
  from dws.dws_trade_orders_w
  where dt='$do_date'
  group by regionname, cityname, firstname, secondname
)
insert overwrite table ads.ads_trade_order_analysis
partition(dt='$do_date')
select '全国' as areatype,
'' as regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category1
union all
select '全国' as areatype,
'' as regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by category2
union all
select '大区' as areatype,
regionname,
'' as cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname
union all
select '大区' as areatype,
regionname,
'' as cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category1
union all
select '大区' as areatype,
regionname,
'' as cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by regionname, category2
union all
select '城市' as areatype,
'' as regionname,
cityname,
'' as categorytype,
'' as category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname
union all
select '城市' as areatype,
'' as regionname,
cityname,
'一级' as categorytype,
category1,
'' as category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category1
union all
select '城市' as areatype,
'' as regionname,
cityname,
'二级' as categorytype,
'' as category1,
category2,
sum(totalcount),
sum(total_productnum),
sum(totalmoney)
from mid_orders
group by cityname, category2;
"
hive -e "$sql"

Note: Since one order may have multiple records in dws.dws_trade_orders_w, use count(distinct orderid) when counting orders.

Summary

# Load ODS data (including DataX migrated data)
/data/lagoudw/script/trade/ods_load_trade.sh
# Load DIM layer data
/data/lagoudw/script/trade/dim_load_product_cat.sh
/data/lagoudw/script/trade/dim_load_shop_org.sh
/data/lagoudw/script/trade/dim_load_payment.sh
/data/lagoudw/script/trade/dim_load_product_info.sh
# Load DWD layer data
/data/lagoudw/script/trade/dwd_load_trade_orders.sh
# Load DWS layer data
/data/lagoudw/script/trade/dws_load_trade_orders.sh
# Load ADS layer data
/data/lagoudw/script/trade/ads_load_trade_order_analysis.sh

Airflow Introduction

Airflow is an open-source scheduling tool developed by Airbnb, written in Python. It was launched in 2014 and open-sourced in spring 2016. In 2016, it joined the Apache Software Foundation’s incubation program.

Airflow defines a workflow as a directed acyclic graph (DAG) of tasks and assigns them to a set of compute nodes, executing them in order based on their dependencies.

Airflow advantages:

  • Flexible and easy to use: Airflow is written in Python, workflow definitions are also in Python
  • Powerful: Supports multiple types of jobs, can customize different types of jobs such as Shell, Python, MySQL, Oracle, Hive, etc.
  • Simple and elegant: Job definitions are clear and straightforward
  • Easily extensible: Provides various base classes for extension, with multiple executors to choose from

Apache Airflow is an open-source task scheduling and workflow management platform, primarily used for developing, debugging, and monitoring data pipelines. Airflow uses Python scripts to define tasks and dependencies, helping users programmatically build dynamic, visualized workflows.

Architecture

  • WebServer: A daemon process that accepts HTTP requests and interacts with Airflow through a Python Flask web application. WebServer features include: aborting, resuming, and triggering tasks, monitoring running tasks, resuming from breakpoints, querying task status, viewing logs and other detailed information.
  • Scheduler: A daemon process that periodically polls task scheduling to determine whether to trigger task execution.
  • Worker: A daemon process responsible for starting Executor on the machine to execute tasks. With CeleryExecutor, Worker services can be deployed on multiple machines.

Core Features

Code-Based Workflow Definition

Workflows are defined as Python scripts, called DAGs (Directed Acyclic Graph). Each DAG includes a set of tasks and their dependencies. Supports parameterization and dynamic DAG generation, suitable for complex workflow scenarios.

Task Scheduling

Supports flexible task scheduling. Users can define task execution cycles through intervals, specific times, etc. The scheduler automatically triggers tasks in order based on dependencies.

Distributed Task Execution

Tasks can run distributedly, supporting horizontal scaling. Multiple Worker nodes can improve task processing capability.

Visualization Interface

Airflow provides a powerful Web UI to view DAG running status, logs, and historical records. Graphical display of DAG structure helps understand workflow execution at a glance.

High Extensibility

Supports extending functionality through plugin mechanisms, such as custom Operators or Hooks. Built-in various operators and connectors (like BashOperator, PythonOperator, MySqlOperator, etc.) for integrating various tasks and external systems.

Dependency Management

Airflow supports scheduling based on dependencies between tasks. Users can explicitly define task sequence and dependencies.

Key Concepts

DAG

DAG (Directed Acyclic Graph):

  • In AirFlow, a DAG defines a complete job. All Tasks in the same DAG share the same scheduling time.
  • Parameters: dag_id (unique identifier for DAG), default_args (default parameters, used if the current DAG instance job doesn’t have corresponding configuration)
  • schedule_interval: Configure DAG execution cycle, can use crontab syntax

Task

  • Task is a specific job in a DAG, depends on DAG, must exist in some DAG. Tasks can configure dependencies within the DAG.
  • dag: The job belongs to the corresponding DAG
  • task_id: Task identifier
  • owner: Task owner
  • start_date: Task start time