airflow dag, operator and task

DAG

Airflow Python script is really just a configuration file specifying the DAG’s structure as code.

The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. To communicate between tasks, use XComs.


the DAG definition file should not do some actual data processing. The script’s purpose is to define a DAG object. It needs to evaluate quickly in seconds.



from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.bash import BashOperator


with DAG(

    default_args={

        "depends_on_past": False,

        "email": ["xyz@example.com"],

        "email_on_failure": False,

        "email_on_retry": False,

        "retries": 1,

        "retry_delay": timedelta(minutes=5),

    },

    description="xyz",

    schedule=timedelta(days=1),

    start_date=datetime(2021, 1, 1),

    catchup=False,

) as dag: #initialize DAG


    #instantiate operator as taks

    t1 = BashOperator(

        task_id="print_date",

        bash_command="date",

    )

    t2 = BashOperator(

        task_id="sleep",

        depends_on_past=False,

        bash_command="sleep 5",

        retries=3,

    )

    t1.doc_md = """some documentation in markdown format for the task"""

    dag.doc_md = """some documentation for the dag"""


    t3 = BashOperator(

        task_id="echo",

        depends_on_past=False,

        bash_command="echo helloworld",

    )


#run task2 and task3 and finally task 1

    t1 >> [t2, t3]


Operator

An operator defines a unit of work for Airflow to complete. Using operators is the classic approach to defining work in Airflow.

All operators inherit from the BaseOperator, which includes all of the required arguments for running work in Airflow. It can be PythonOperator, the KubernetesPodOperator, etc. as well.


Task

An operator must be instantiated as a task to run in DAG.

t1 = BashOperator(

    task_id="print_date",

    bash_command="date",

)

A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.


Dependency between tasks

A few ways:

t1.set_downstream(t2) #t1 first, then t2

t2.set_upstream(t1)  #t1 first, then t2

t1 >> t2  #t1 first, then t2

t2 << t1  #t1 first, then t2

t1 >> t2 >> t3 #t1 first, then t2, then t3

t1.set_downstream([t2, t3]) #t1 first, then t2 and t3

t1 >> [t2, t3] #t1 first, then t2 and t3

[t2, t3] << t1 #t1 first, then t2 and t3


Test run

Save the code .py file into the DAGs folder (referenced in airflow.cfg). The default locationis ~/airflow/dags.

Then run python command:

    python ~/airflow/dags/tutorial.py

Test run a task only

From command line, it can run only one task

    airflow tasks test tutorial print_date 2015-06-01


More on dependency

If parameter depends_on_past=True, individual task instances will depend on the success of their previous task instance.

If depends_on_past=True, using wait_for_downstream=True as well will make the task to wait for not only previous task instance, but also downstream of previous task instance.