airflow dag, operator and task
DAG
Airflow Python script is really just a configuration file specifying the DAG’s structure as code.
The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. To communicate between tasks, use XComs.
the DAG definition file should not do some actual data processing. The script’s purpose is to define a DAG object. It needs to evaluate quickly in seconds.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
default_args={
"depends_on_past": False,
"email": ["xyz@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
description="xyz",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag: #initialize DAG
#instantiate operator as taks
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = """some documentation in markdown format for the task"""
dag.doc_md = """some documentation for the dag"""
t3 = BashOperator(
task_id="echo",
depends_on_past=False,
bash_command="echo helloworld",
)
#run task2 and task3 and finally task 1
t1 >> [t2, t3]
Operator
An operator defines a unit of work for Airflow to complete. Using operators is the classic approach to defining work in Airflow.
All operators inherit from the BaseOperator, which includes all of the required arguments for running work in Airflow. It can be PythonOperator, the KubernetesPodOperator, etc. as well.
Task
An operator must be instantiated as a task to run in DAG.
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.
Dependency between tasks
A few ways:
t1.set_downstream(t2) #t1 first, then t2
t2.set_upstream(t1) #t1 first, then t2
t1 >> t2 #t1 first, then t2
t2 << t1 #t1 first, then t2
t1 >> t2 >> t3 #t1 first, then t2, then t3
t1.set_downstream([t2, t3]) #t1 first, then t2 and t3
t1 >> [t2, t3] #t1 first, then t2 and t3
[t2, t3] << t1 #t1 first, then t2 and t3
Test run
Save the code .py file into the DAGs folder (referenced in airflow.cfg). The default locationis ~/airflow/dags.
Then run python command:
python ~/airflow/dags/tutorial.py
Test run a task only
From command line, it can run only one task
airflow tasks test tutorial print_date 2015-06-01
More on dependency
If parameter depends_on_past=True, individual task instances will depend on the success of their previous task instance.
If depends_on_past=True, using wait_for_downstream=True as well will make the task to wait for not only previous task instance, but also downstream of previous task instance.