Big Data 254 - Airflow Trade Task Scheduling
Task Integration Deployment
Basic Introduction to Airflow
Apache Airflow is an open-source task scheduling and workflow management tool for orchestrating complex data processing tasks. Originally developed by Airbnb, it was donated to the Apache Software Foundation in 2016. Airflow’s main feature is defining tasks and their dependencies as code, supporting task scheduling and monitoring, making it suitable for complex big data tasks.
Airflow Features
- Code-Centric: Airflow uses Python to define DAGs, providing flexibility and programmability.
- Highly Extensible: Users can customize Operators and Hooks to integrate various data sources and tools.
- Powerful UI Interface: Provides a visual interface for monitoring task status, viewing logs, retrying failed tasks, and more.
- Rich Scheduling Options: Supports both time-based and event-based scheduling.
- High Availability: When combined with executors like Celery and Kubernetes, it supports distributed architecture, suitable for handling large-scale tasks.
Use Cases
Data Pipeline Scheduling
Used to manage ETL processes from source to destination. For example, daily data extraction from databases, cleaning, and storage into data warehouses.
Machine Learning Workflow Management
Schedules data preprocessing, model training, and model deployment tasks.
Data Validation
Automatically checks data quality and consistency.
Scheduled Task Automation
Timely log cleanup, data archiving, or report generation.
Airflow Core Concepts
DAGs
Directed Acyclic Graph - organizes all tasks that need to run based on their dependencies, describing the execution order of all tasks.
Operators
Airflow has many built-in Operators:
- BashOperator: Execute a Bash command
- PythonOperator: Call arbitrary Python functions
- EmailOperator: Send emails
- HTTPOperator: Send HTTP requests
- SqlOperator: Execute SQL commands
- Custom Operators
Task
A Task is an instance of an Operator.
Task Instance
Since Tasks are scheduled repeatedly, each execution of a Task is a different Task Instance. Task Instances have their own states, including: success, running, failed, skipped, up_for_reschedule, up_for_retry, queued, no_status, etc.
Task Relationships
Different Tasks in a DAG can have dependencies on each other.
Core Trade Task Scheduling Integration
Core Trade Analysis
We have previously written many scripts that can be scheduled here:
# Load ODS data (DataX migrated data)
/opt/wzk/hive/ods_load_trade.sh
# Load DIM layer data
/opt/wzk/hive/dim_load_product_cat.sh
/opt/wzk/hive/dim_load_shop_org.sh
/opt/wzk/hive/dim_load_payment.sh
/opt/wzk/hive/dim_load_product_info.sh
# Load DWD layer data
/opt/wzk/hive/dwd_load_trade_orders.sh
# Load DWS layer data
/opt/wzk/hive/dws_load_trade_orders.sh
# Load ADS layer data
/opt/wzk/hive/ads_load_trade_order_analysis.sh
Note: When depends_on_past is set to True, the current task can only be triggered if the previous scheduling was successful.
Writing the Script
Create the file vim $AIRFLOW_HOME/dags/trade_test.py and write the following content:
import datetime
from datetime import timedelta, date
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# Define DAG default arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 6, 20),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define DAG
coretradedag = DAG(
'coretrade',
default_args=default_args,
description='Core trade analyze',
schedule_interval='30 0 * * *', # Run at 00:30 daily
)
# Get yesterday's date
today = date.today()
oneday = timedelta(days=1)
yesterday = (today - oneday).strftime("%Y-%m-%d")
# Define tasks
odstask = BashOperator(
task_id='ods_load_data',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/ods_load_trade.sh {yesterday}',
dag=coretradedag,
)
dimtask1 = BashOperator(
task_id='dimtask_product_cat',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dim_load_product_cat.sh {yesterday}',
dag=coretradedag,
)
dimtask2 = BashOperator(
task_id='dimtask_shop_org',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dim_load_shop_org.sh {yesterday}',
dag=coretradedag,
)
dimtask3 = BashOperator(
task_id='dimtask_payment',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dim_load_payment.sh {yesterday}',
dag=coretradedag,
)
dimtask4 = BashOperator(
task_id='dimtask_product_info',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dim_load_product_info.sh {yesterday}',
dag=coretradedag,
)
dwdtask = BashOperator(
task_id='dwd_load_data',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dwd_load_trade_orders.sh {yesterday}',
dag=coretradedag,
)
dwstask = BashOperator(
task_id='dws_load_data',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/dws_load_trade_orders.sh {yesterday}',
dag=coretradedag,
)
adstask = BashOperator(
task_id='ads_load_data',
depends_on_past=False,
bash_command=f'sh /opt/wzk/hive/ads_load_trade_order_analysis.sh {yesterday}',
dag=coretradedag,
)
# Define task dependencies
odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4
odstask >> dwdtask
dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask
dwdtask >> dwstask
dwstask >> adstask
Viewing Results
Check for script issues:
# Execute command to check if script has errors. If no errors, the script is fine
python $AIRFLOW_HOME/dags/trade_test.py
List all DAGs:
airflow dags list
List tree structure:
airflow tasks list coretrade --tree
Task dependency tree structure:
<Task(BashOperator): ods_load_data>
<Task(BashOperator): dimtask_payment>
<Task(BashOperator): dws_load_data>
<Task(BashOperator): ads_load_data>
<Task(BashOperator): dimtask_product_cat>
<Task(BashOperator): dws_load_data>
<Task(BashOperator): ads_load_data>
<Task(BashOperator): dimtask_product_info>
<Task(BashOperator): dws_load_data>
<Task(BashOperator): ads_load_data>
<Task(BashOperator): dimtask_shop_org>
<Task(BashOperator): dws_load_data>
<Task(BashOperator): ads_load_data>
<Task(BashOperator): dwd_load_data>
<Task(BashOperator): dws_load_data>
<Task(BashOperator): ads_load_data>