Big Data 254 - Airflow Trade Task Scheduling

Task Integration Deployment

Basic Introduction to Airflow

Apache Airflow is an open-source task scheduling and workflow management tool for orchestrating complex data processing tasks. Originally developed by Airbnb, it was donated to the Apache Software Foundation in 2016. Airflow’s main feature is defining tasks and their dependencies as code, supporting task scheduling and monitoring, making it suitable for complex big data tasks.

Airflow Features

  • Code-Centric: Airflow uses Python to define DAGs, providing flexibility and programmability.
  • Highly Extensible: Users can customize Operators and Hooks to integrate various data sources and tools.
  • Powerful UI Interface: Provides a visual interface for monitoring task status, viewing logs, retrying failed tasks, and more.
  • Rich Scheduling Options: Supports both time-based and event-based scheduling.
  • High Availability: When combined with executors like Celery and Kubernetes, it supports distributed architecture, suitable for handling large-scale tasks.

Use Cases

Data Pipeline Scheduling

Used to manage ETL processes from source to destination. For example, daily data extraction from databases, cleaning, and storage into data warehouses.

Machine Learning Workflow Management

Schedules data preprocessing, model training, and model deployment tasks.

Data Validation

Automatically checks data quality and consistency.

Scheduled Task Automation

Timely log cleanup, data archiving, or report generation.

Airflow Core Concepts

DAGs

Directed Acyclic Graph - organizes all tasks that need to run based on their dependencies, describing the execution order of all tasks.

Operators

Airflow has many built-in Operators:
  • BashOperator: Execute a Bash command
  • PythonOperator: Call arbitrary Python functions
  • EmailOperator: Send emails
  • HTTPOperator: Send HTTP requests
  • SqlOperator: Execute SQL commands
  • Custom Operators

Task

A Task is an instance of an Operator.

Task Instance

Since Tasks are scheduled repeatedly, each execution of a Task is a different Task Instance. Task Instances have their own states, including: success, running, failed, skipped, up_for_reschedule, up_for_retry, queued, no_status, etc.

Task Relationships

Different Tasks in a DAG can have dependencies on each other.

Core Trade Task Scheduling Integration

Core Trade Analysis

We have previously written many scripts that can be scheduled here:

# Load ODS data (DataX migrated data)
/opt/wzk/hive/ods_load_trade.sh
# Load DIM layer data
/opt/wzk/hive/dim_load_product_cat.sh
/opt/wzk/hive/dim_load_shop_org.sh
/opt/wzk/hive/dim_load_payment.sh
/opt/wzk/hive/dim_load_product_info.sh
# Load DWD layer data
/opt/wzk/hive/dwd_load_trade_orders.sh
# Load DWS layer data
/opt/wzk/hive/dws_load_trade_orders.sh
# Load ADS layer data
/opt/wzk/hive/ads_load_trade_order_analysis.sh

Note: When depends_on_past is set to True, the current task can only be triggered if the previous scheduling was successful.

Writing the Script

Create the file vim $AIRFLOW_HOME/dags/trade_test.py and write the following content:

import datetime
from datetime import timedelta, date
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# Define DAG default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2020, 6, 20),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define DAG
coretradedag = DAG(
    'coretrade',
    default_args=default_args,
    description='Core trade analyze',
    schedule_interval='30 0 * * *',  # Run at 00:30 daily
)

# Get yesterday's date
today = date.today()
oneday = timedelta(days=1)
yesterday = (today - oneday).strftime("%Y-%m-%d")

# Define tasks
odstask = BashOperator(
    task_id='ods_load_data',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/ods_load_trade.sh {yesterday}',
    dag=coretradedag,
)

dimtask1 = BashOperator(
    task_id='dimtask_product_cat',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dim_load_product_cat.sh {yesterday}',
    dag=coretradedag,
)

dimtask2 = BashOperator(
    task_id='dimtask_shop_org',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dim_load_shop_org.sh {yesterday}',
    dag=coretradedag,
)

dimtask3 = BashOperator(
    task_id='dimtask_payment',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dim_load_payment.sh {yesterday}',
    dag=coretradedag,
)

dimtask4 = BashOperator(
    task_id='dimtask_product_info',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dim_load_product_info.sh {yesterday}',
    dag=coretradedag,
)

dwdtask = BashOperator(
    task_id='dwd_load_data',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dwd_load_trade_orders.sh {yesterday}',
    dag=coretradedag,
)

dwstask = BashOperator(
    task_id='dws_load_data',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/dws_load_trade_orders.sh {yesterday}',
    dag=coretradedag,
)

adstask = BashOperator(
    task_id='ads_load_data',
    depends_on_past=False,
    bash_command=f'sh /opt/wzk/hive/ads_load_trade_order_analysis.sh {yesterday}',
    dag=coretradedag,
)

# Define task dependencies
odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4
odstask >> dwdtask
dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask
dwdtask >> dwstask
dwstask >> adstask

Viewing Results

Check for script issues:

# Execute command to check if script has errors. If no errors, the script is fine
python $AIRFLOW_HOME/dags/trade_test.py

List all DAGs:

airflow dags list

List tree structure:

airflow tasks list coretrade --tree

Task dependency tree structure:

<Task(BashOperator): ods_load_data>
    <Task(BashOperator): dimtask_payment>
        <Task(BashOperator): dws_load_data>
            <Task(BashOperator): ads_load_data>
    <Task(BashOperator): dimtask_product_cat>
        <Task(BashOperator): dws_load_data>
            <Task(BashOperator): ads_load_data>
    <Task(BashOperator): dimtask_product_info>
        <Task(BashOperator): dws_load_data>
            <Task(BashOperator): ads_load_data>
    <Task(BashOperator): dimtask_shop_org>
        <Task(BashOperator): dws_load_data>
            <Task(BashOperator): ads_load_data>
    <Task(BashOperator): dwd_load_data>
        <Task(BashOperator): dws_load_data>
            <Task(BashOperator): ads_load_data>