Big Data 253 - Airflow Core Concepts

Task Integration Deployment

Basic Introduction to Airflow

Apache Airflow is an open-source task scheduling and workflow management tool for orchestrating complex data processing tasks. Originally developed by Airbnb, it was donated to the Apache Software Foundation in 2016. Airflow’s main feature is defining tasks and their dependencies as code, supporting task scheduling and monitoring, making it suitable for complex big data tasks.

Airflow Features

  • Code-Centric: Airflow uses Python to define DAGs, providing flexibility and programmability.
  • Highly Extensible: Users can customize Operators and Hooks to integrate various data sources and tools.
  • Powerful UI Interface: Provides a visual interface for monitoring task status, viewing logs, retrying failed tasks, and more.
  • Rich Scheduling Options: Supports both time-based and event-based scheduling.
  • High Availability: When combined with executors like Celery and Kubernetes, it supports distributed architecture, suitable for handling large-scale tasks.

Use Cases

  • Data Pipeline Scheduling: Used to manage ETL processes from source to destination.
  • Machine Learning Workflow Management: Schedules data preprocessing, model training, and model deployment tasks.
  • Data Validation: Automatically checks data quality and consistency.
  • Scheduled Task Automation: Timely log cleanup, data archiving, or report generation.

Airflow Core Concepts

DAGs

Directed Acyclic Graph - organizes all tasks that need to run based on their dependencies, describing the execution order of all tasks.

Operators

Airflow has many built-in Operators:

  • BashOperator: Execute a Bash command
  • PythonOperator: Call arbitrary Python functions
  • EmailOperator: Send emails
  • HTTPOperator: Send HTTP requests
  • SqlOperator: Execute SQL commands
  • Custom Operators

Task

A Task is an instance of an Operator.

Task Instance

Since Tasks are scheduled repeatedly, each execution of a Task is a different Task Instance. Task Instances have their own states, including: success, running, failed, skipped, up_for_reschedule, up_for_retry, queued, no_status, etc.

Task Relationships

Different Tasks in a DAG can have dependencies on each other.

Executor

Airflow supports four types of executors:

  • SequentialExecutor: Single-process sequential execution, default executor, usually only for testing
  • LocalExecutor: Multi-process local execution
  • CeleryExecutor: Distributed scheduling, commonly used in production
  • DaskExecutor: Dynamic task scheduling, mainly used for data analysis

Getting Started Example

Writing the Script

from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# Define default arguments
def default_options():
    default_args = {
        'owner': 'airflow',  # Owner name
        'start_date': dates.days_ago(1),  # First execution time
        'retries': 1,  # Retry count on failure
        'retry_delay': timedelta(seconds=5)  # Retry interval on failure
    }
    return default_args

# Define Bash task
def task1(dag):
    t = "pwd"
    task = BashOperator(
        task_id='MyTask1',  # task_id
        bash_command=t,  # Command to execute
        dag=dag  # DAG to belong to
    )
    return task

# Python task function
def hello_world():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))

# Define Python task
def task2(dag):
    task = PythonOperator(
        task_id='MyTask2',
        python_callable=hello_world,  # Function to execute
        dag=dag
    )
    return task

# Define another Bash task
def task3(dag):
    t = "date"
    task = BashOperator(
        task_id='MyTask3',
        bash_command=t,
        dag=dag
    )
    return task

# Define DAG
with DAG(
    'HelloWorldDag',  # dag_id
    default_args=default_options(),  # Default arguments
    schedule_interval="*/2 * * * *"  # Execution interval, twice per minute
) as d:
    task1 = task1(d)
    task2 = task2(d)
    task3 = task3(d)
    chain(task1, task2, task3)  # Define execution order

Testing Execution

# Execute command to check if script has errors
python $AIRFLOW_HOME/dags/helloworld.py

# View active DAGs
airflow dags list --subdir $AIRFLOW_HOME/dags

# View tasks in a specific DAG
airflow tasks list HelloWorldDag

# Test a task in the DAG
airflow tasks test HelloWorldDag MyTask2 2020-08-01