Big Data 252 - Airflow Crontab Scheduling

Crontab Introduction

Basic Overview

Linux systems are controlled by the cron (crond) system service. Linux inherently has many scheduled tasks, so this system service is enabled by default. Linux also provides the crontab command for Linux users to control scheduled tasks.

  • Log file: ll /var/log/cron*
  • Edit file: vim /etc/crontab
  • Process: ps -ef | grep crond => /etc/init.d/crond restart
  • Purpose: Task (command) scheduled execution, timed backup, etc.

Format Specification

FieldRangeDescription
First *Minute (0-59)Which minute of the hour to execute
Second *Hour (0-23)Which hour of the day to execute
Third *Date (1-31)Which day of the month to execute
Fourth *Month (1-12)Which month of the year to execute
Fifth *Day of week (0-7)Which day of the week to execute

The following special characters can also be used in each field:

  • * Represents all values within the range. For example, if the month field is *, it means months 1-12
  • / Represents a fixed time interval. For example, if the minute field is */10, it means execute every 10 minutes
  • - Represents a range, inclusive. For example, 2-5 means 2, 3, 4, 5. In the hour field, 0-23/2 means execute every 2 hours within the 0-23 range
  • , Disconnected numbers (non-consecutive), such as 1,2,3,6,8,9
  • Since the first day of the week varies by region, Sunday=0 (first day) or Sunday=7 (last day)

Configuration Examples

# Execute command every minute (since cron scans every minute by default, all * works)
* * * * * command

# Execute command at minute 3 and 15 of every hour
3,15 * * * * command

# Execute command at minute 3 and 15, between 8-11 AM every day
3,15 8-11 * * * command

# Execute command at minute 3 and 15, between 8-11 AM, every 2 days
3,15 8-11 */2 * * command

# Execute command at minute 3 and 15, between 8-11 AM, every Monday
3,15 8-11 * * 1 command

# Execute command at 21:30 every evening
30 21 * * * command

# Execute command at 4:45 on the 1st, 10th, and 22nd of each month
45 4 1,10,22 * * command

# Execute command at 1:10 every Saturday and Sunday
10 1 * * 6,0 command

# Execute command every hour
0 */1 * * * command

# Between 11 PM and 7 AM, execute command every hour
* 23-7/1 * * * command

Task Integration Deployment

Airflow Core Concepts

DAGs

Directed Acyclic Graph - organizes all tasks that need to run based on their dependencies, describing the execution order of all tasks.

Operators

Airflow has many built-in Operators:

  • BashOperator: Execute a Bash command
  • PythonOperator: Call arbitrary Python functions
  • EmailOperator: Send emails
  • HTTPOperator: Send HTTP requests
  • SqlOperator: Execute SQL commands
  • Custom Operators

Task

A Task is an instance of an Operator.

Task Instance

Since Tasks are scheduled repeatedly, each execution of a Task is a different Task Instance. Task Instances have their own states, including: success, running, failed, skipped, up_for_reschedule, up_for_retry, queued, no_status, etc.

Task Relationships

Different Tasks in a DAG can have dependencies on each other.

Executor

Airflow supports four types of executors:

  • SequentialExecutor: Single-process sequential execution, default executor, usually only for testing
  • LocalExecutor: Multi-process local execution
  • CeleryExecutor: Distributed scheduling, commonly used in production. Celery is a distributed scheduling framework requiring third-party components like RabbitMQ
  • DaskExecutor: Dynamic task scheduling, mainly used for data analysis

Getting Started Example

Writing the Script

Create the DAG directory and write the script:

mkdir $AIRFLOW_HOME/dags
vim $AIRFLOW_HOME/dags/helloworld.py

Write the content:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# Define default arguments
def default_options():
    default_args = {
        'owner': 'airflow',  # Owner name
        'start_date': dates.days_ago(1),  # First execution time
        'retries': 1,  # Retry count on failure
        'retry_delay': timedelta(seconds=5)  # Retry interval on failure
    }
    return default_args

# Define Bash task
def task1(dag):
    t = "pwd"
    task = BashOperator(
        task_id='MyTask1',  # task_id
        bash_command=t,  # Command to execute
        dag=dag  # DAG to belong to
    )
    return task

# Python task function
def hello_world():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))

# Define Python task
def task2(dag):
    task = PythonOperator(
        task_id='MyTask2',
        python_callable=hello_world,  # Function to execute
        dag=dag
    )
    return task

# Define another Bash task
def task3(dag):
    t = "date"
    task = BashOperator(
        task_id='MyTask3',
        bash_command=t,
        dag=dag
    )
    return task

# Define DAG
with DAG(
    'HelloWorldDag',  # dag_id
    default_args=default_options(),  # Default arguments
    schedule_interval="*/2 * * * *"  # Execution interval, twice per minute
) as d:
    task1 = task1(d)
    task2 = task2(d)
    task3 = task3(d)
    chain(task1, task2, task3)  # Define execution order

Testing Execution

# Execute command to check if script has errors
python $AIRFLOW_HOME/dags/helloworld.py

# View active DAGs
airflow dags list --subdir $AIRFLOW_HOME/dags

# View tasks in a specific DAG
airflow tasks list HelloWorldDag

# Test a task in the DAG
airflow tasks test HelloWorldDag MyTask2 2020-08-01