昨天談到帶入參數執行 DAG,那如果我們那如果我們今天想要從Task 之間彼此傳遞參數呢?那就必須使用到 Airflow 中的 XCom的功能了 。今天我們就來講講這塊。
首先來談談 Airflow 原本的設計,Airflow 作為 Data Orchestration (資料協作)的工具,本身任務是調配不同的執行工作,並將其統一管理成為一個工作流程,比如說,使用 Python 進行資料的處理之後,再用 Bash 下一個指令進行資料搬運。彼此的Task 之間盡量保持獨立的運作。然而實際使用狀況任然會需要有一些參數的傳遞,這時可以將參數存放在Xcom當中,供其他 Task 取得數值。
XCom 全名為 cross-communications,由於原本預設 Task 之間是彼此隔離的。因此 XCom 成為了 Task 之間溝通的管道。XCom 使用 key-value 的方式進行參數的溝通。建議不要使用 XCom 傳遞大容量的資料。
接下來我們來介紹一下 xccom 的兩個使用方法,首先是透過 xcom_pull, xcom_push 實作。我們在順序較前面的 Task 將特定數值透過 xcom_push 存入 xcom,並在想要提取資料的 Task 中透過 xcom_pull 取得參數並使用。
放入到 first_dag 來實作,我們在 extract 的Task 中存入參數,並在 transform 中取得參數進行轉換。
def extract(**kwargs):
print("extract from data source")
ti = kwargs['ti']
ti.xcom_push(key="xcom_push", value="xcom_push_data")
上面的程式碼中,我們一樣使用 **kwargs 接收參數。並使用 "ti" 這個物件對其輸入 xcom 值。ti 是 Airflow 管理 task instance 相關資訊的一個物件。我們使用裡面的 xcom_push 方法將參數儲存至 XCom。
接下來我們在 transform Task 的程式中使用xcom_pull 取得資訊,其中 key 值則帶入先前存入 XCom 的 key 值:xcom_push
def transform(**kwargs):
print("transform data")
ti = kwargs['ti']
xcom_data = ti.xcom_pull(key='xcom_push')
完成改動之後儲存並執行 DAG 我們可以看到其運作的結果。首先我們先頂進去 Task 的執行流程圖頁面,並點擊 extract Task ,並在pop-up 視窗上選擇 "Xcom",我們看一下是否有成功儲存 Xcom 值。
進去之後發現,確實我們所儲存的 "xcom_push", "xcom_push_data" 這對 key - value 有顯示在上面。代表我們有將參數成功的存入 XCom 中。
最後再來看Transform Task 的 log,看他是否有成功取得 "xcom_push" "xcom_push_data" 並顯示。由下圖我們也可以看到,"xcom_push" "xcom_push_data" 照我們所預期的被取得以及印出。
而另一種方式我們可以使用 function return 的方式使用 XCom。當 PythonOperator 使用的 function 具有回傳值時,Airflow 會將回傳值記錄於 XCom 當中。
我們將上述的程式改成使用 return value 的方式實作,extract Task 的 function改寫為:
def extract(**kwargs):
print("extract from data source")
ti = kwargs['ti']
return "extracted_data"
transform Task 一樣使用xcom_pull 只不過所給的參數為 task_ids,其數值則是填入想要取得回傳值的Task id。
def transform(**kwargs):
print("transform data")
ti = kwargs['ti']
extracted_data = ti.xcom_pull(task_ids='extract')
print(extracted_data)
修改完之後一樣儲存之後執行 DAG。
我們一樣先點擊 extract Task 的 Xcom 看看 Airflow 如何儲存 return value。下圖中我們可以看到 Airflow 是將return value 存在 xcom 中,而key 則為 return_value。
當使用 xcom_pull 並帶入 task_id 時 Airflow 就會取得指定 task_id 的 return_value 值回傳。而在 Transform Task 的 log 當中,我們也可以看得到我們 return 的 "extracted_data" 這個字串被成功地顯示。
看完了今天的文章,大家應該都能游刃有餘的使用 Xcom 進行 Task 之間的參數交通了,那我們明天見囉~