這邊記錄了該如何建立一個簡易的 Dag,如果有錯誤或更好的寫法,歡迎留言討論
就筆者從官方文件的理解,每個 Dag 可以代表是一個要執行的任務,而每個 Dag 裡面會有許多個 Operator 可以進行各個流程該做的事情,簡單來講可以理解成 Task 底下有多個 Sub Task,而這些 Sub Task 串連起來就變成一個 Dag
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
在 airflow 當中提供許多種 Operator 給我們進行各種功能操作,詳細種類的 Operator 可以參考 這個網址 本文主要使用 EmptyOperator 以及 BashOperator 來進行範例撰寫
start_task = EmptyOperator(task_id="start_task")
end_task = EmptyOperator(task_id="end_task")
first_task = BashOperator(task_id="first_task",
bash_command=f"echo execute time: {datetime.now()}")
按照如下圖的範例,我們可以建立一個名為 first_dag 的任務,並且指派 tags 屬性,方便我們在 UI 上進行查詢
dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())
with dag:
start_task = EmptyOperator(task_id="start_task")
end_task = EmptyOperator(task_id="end_task")
first_task = BashOperator(task_id="first_task",
bash_command=f"echo execute time: {datetime.now()}")
下圖為透過上述程式碼建立出的 Dag,如果是使用官方的 docker-compose.yaml 建立環境的話,會出現許多範例 Dag可以善用 tags 來進行過濾
注意,此時還沒有將任務流程進行串聯
我們可以透過 >>
符號來將各個 Operator 進行串聯,下方為完整程式碼,串聯完成後,即可回到 UI 介面進行重新整理,後面的步驟會教學如何運行任務以及查看 Log
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())
with dag:
start_task = EmptyOperator(task_id="start_task")
end_task = EmptyOperator(task_id="end_task")
first_task = BashOperator(task_id="first_task",
bash_command=f"echo execute time: {datetime.now()}")
start_task >> first_task >> end_task
Trigger DAG
選項start_task > first_task > end_task
execute time: {datetime.now()}
在 airflow 2.x 版後,提供了利用 decorator 的方式來建立 Dag,下方為簡易範例
特別注意:
使用此方法建立 Dag 後,需要於外部將該 Dag 的函式丟給一個變數,airflow 才可以抓到該 Dag
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
@dag(tags=['user'], start_date=datetime.today())
def simple_dag():
start_task = EmptyOperator(task_id="start_task")
end_task = EmptyOperator(task_id="end_task")
first_task = BashOperator(task_id="first_task",
bash_command=f"echo execute time: {datetime.now()}")
start_task >> first_task >> end_task
my_dag = simple_dag()