本文主要在教學如何在 Airflow 當中傳遞參數,Airflow 當中提供了 XCom 來進行參數傳遞,而根據官方文檔提示,在撰寫任務流程時,要盡量保持任務的獨立,但難免會有必須要傳遞參數的時候,因此若真的需要在不同任務間傳遞參數的話,要盡量以最簡化、只傳遞必要參數為原則使用 XCom,下面為簡單的 XCom 使用方式,如果有問題歡迎留言討論
XCom 主要用於 同一個 DAG 內的 Operator 之間的參數傳遞,不可用於跨 DAG 的情況
使用方式非常簡單,只需要在 operator 內將要回傳出的參數直接使用 return 回傳出來即可,直接附上範例進行解釋
from airflow.decorators import dag
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
**kwargs
接收參數,並且在函式內有使用 kwargs.get("task_instance")
,這行的作用在於接收 airflow 內的 task_instance 物件,此物件內會儲存該 DAG 內實作的 Operator (Task) 的相關資訊,接到之後我們就可以利用 xcom_pull
來取得指定 task_id 回傳的資訊def get_datetime():
return datetime.today().isoformat()
def use_datetime(**kwargs):
task_instance = kwargs.get("task_instance")
str_today = task_instance.xcom_pull(task_ids="get_datetime_task")
print(str_today)
**kwargs
來進行接收@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")
def xcom_demo_dag():
start_task = EmptyOperator(task_id="start")
get_datetime_task = PythonOperator(task_id="get_datetime_task",
python_callable=get_datetime)
use_datetime_task = PythonOperator(task_id="use_datetime_task",
python_callable=use_datetime,
provide_context=True)
end_task = EmptyOperator(task_id="end")
start_task >> get_datetime_task >> use_datetime_task >> end_task
create_xcom_demo_dag = xcom_demo_dag()
附上 use_datetime_task 的執行結果