Big Data 252 - Airflow Crontab Scheduling
Crontab Introduction
Basic Overview
Linux systems are controlled by the cron (crond) system service. Linux inherently has many scheduled tasks, so this system service is enabled by default. Linux also provides the crontab command for Linux users to control scheduled tasks.
- Log file: ll /var/log/cron*
- Edit file: vim /etc/crontab
- Process: ps -ef | grep crond => /etc/init.d/crond restart
- Purpose: Task (command) scheduled execution, timed backup, etc.
Format Specification
| Field | Range | Description |
|---|---|---|
| First * | Minute (0-59) | Which minute of the hour to execute |
| Second * | Hour (0-23) | Which hour of the day to execute |
| Third * | Date (1-31) | Which day of the month to execute |
| Fourth * | Month (1-12) | Which month of the year to execute |
| Fifth * | Day of week (0-7) | Which day of the week to execute |
The following special characters can also be used in each field:
*Represents all values within the range. For example, if the month field is *, it means months 1-12/Represents a fixed time interval. For example, if the minute field is*/10, it means execute every 10 minutes-Represents a range, inclusive. For example, 2-5 means 2, 3, 4, 5. In the hour field, 0-23/2 means execute every 2 hours within the 0-23 range,Disconnected numbers (non-consecutive), such as 1,2,3,6,8,9- Since the first day of the week varies by region, Sunday=0 (first day) or Sunday=7 (last day)
Configuration Examples
# Execute command every minute (since cron scans every minute by default, all * works)
* * * * * command
# Execute command at minute 3 and 15 of every hour
3,15 * * * * command
# Execute command at minute 3 and 15, between 8-11 AM every day
3,15 8-11 * * * command
# Execute command at minute 3 and 15, between 8-11 AM, every 2 days
3,15 8-11 */2 * * command
# Execute command at minute 3 and 15, between 8-11 AM, every Monday
3,15 8-11 * * 1 command
# Execute command at 21:30 every evening
30 21 * * * command
# Execute command at 4:45 on the 1st, 10th, and 22nd of each month
45 4 1,10,22 * * command
# Execute command at 1:10 every Saturday and Sunday
10 1 * * 6,0 command
# Execute command every hour
0 */1 * * * command
# Between 11 PM and 7 AM, execute command every hour
* 23-7/1 * * * command
Task Integration Deployment
Airflow Core Concepts
DAGs
Directed Acyclic Graph - organizes all tasks that need to run based on their dependencies, describing the execution order of all tasks.
Operators
Airflow has many built-in Operators:
- BashOperator: Execute a Bash command
- PythonOperator: Call arbitrary Python functions
- EmailOperator: Send emails
- HTTPOperator: Send HTTP requests
- SqlOperator: Execute SQL commands
- Custom Operators
Task
A Task is an instance of an Operator.
Task Instance
Since Tasks are scheduled repeatedly, each execution of a Task is a different Task Instance. Task Instances have their own states, including: success, running, failed, skipped, up_for_reschedule, up_for_retry, queued, no_status, etc.
Task Relationships
Different Tasks in a DAG can have dependencies on each other.
Executor
Airflow supports four types of executors:
- SequentialExecutor: Single-process sequential execution, default executor, usually only for testing
- LocalExecutor: Multi-process local execution
- CeleryExecutor: Distributed scheduling, commonly used in production. Celery is a distributed scheduling framework requiring third-party components like RabbitMQ
- DaskExecutor: Dynamic task scheduling, mainly used for data analysis
Getting Started Example
Writing the Script
Create the DAG directory and write the script:
mkdir $AIRFLOW_HOME/dags
vim $AIRFLOW_HOME/dags/helloworld.py
Write the content:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
# Define default arguments
def default_options():
default_args = {
'owner': 'airflow', # Owner name
'start_date': dates.days_ago(1), # First execution time
'retries': 1, # Retry count on failure
'retry_delay': timedelta(seconds=5) # Retry interval on failure
}
return default_args
# Define Bash task
def task1(dag):
t = "pwd"
task = BashOperator(
task_id='MyTask1', # task_id
bash_command=t, # Command to execute
dag=dag # DAG to belong to
)
return task
# Python task function
def hello_world():
current_time = str(datetime.today())
print('hello world at {}'.format(current_time))
# Define Python task
def task2(dag):
task = PythonOperator(
task_id='MyTask2',
python_callable=hello_world, # Function to execute
dag=dag
)
return task
# Define another Bash task
def task3(dag):
t = "date"
task = BashOperator(
task_id='MyTask3',
bash_command=t,
dag=dag
)
return task
# Define DAG
with DAG(
'HelloWorldDag', # dag_id
default_args=default_options(), # Default arguments
schedule_interval="*/2 * * * *" # Execution interval, twice per minute
) as d:
task1 = task1(d)
task2 = task2(d)
task3 = task3(d)
chain(task1, task2, task3) # Define execution order
Testing Execution
# Execute command to check if script has errors
python $AIRFLOW_HOME/dags/helloworld.py
# View active DAGs
airflow dags list --subdir $AIRFLOW_HOME/dags
# View tasks in a specific DAG
airflow tasks list HelloWorldDag
# Test a task in the DAG
airflow tasks test HelloWorldDag MyTask2 2020-08-01