iT邦幫忙

2023 iThome 鐵人賽

DAY 19
0

昨天談到帶入參數執行 DAG,那如果我們那如果我們今天想要從Task 之間彼此傳遞參數呢?那就必須使用到 Airflow 中的 XCom的功能了 。今天我們就來講講這塊。

XCom

首先來談談 Airflow 原本的設計,Airflow 作為 Data Orchestration (資料協作)的工具,本身任務是調配不同的執行工作,並將其統一管理成為一個工作流程,比如說,使用 Python 進行資料的處理之後,再用 Bash 下一個指令進行資料搬運。彼此的Task 之間盡量保持獨立的運作。然而實際使用狀況任然會需要有一些參數的傳遞,這時可以將參數存放在Xcom當中,供其他 Task 取得數值。

什麼是 XCom

XCom 全名為 cross-communications,由於原本預設 Task 之間是彼此隔離的。因此 XCom 成為了 Task 之間溝通的管道。XCom 使用 key-value 的方式進行參數的溝通。建議不要使用 XCom 傳遞大容量的資料。

以 xcom_pull, xcom_push 實作

接下來我們來介紹一下 xccom 的兩個使用方法,首先是透過 xcom_pull, xcom_push 實作。我們在順序較前面的 Task 將特定數值透過 xcom_push 存入 xcom,並在想要提取資料的 Task 中透過 xcom_pull 取得參數並使用。
放入到 first_dag 來實作,我們在 extract 的Task 中存入參數,並在 transform 中取得參數進行轉換。

def extract(**kwargs):
    print("extract from data source")
    ti = kwargs['ti']
    ti.xcom_push(key="xcom_push", value="xcom_push_data")

上面的程式碼中,我們一樣使用 **kwargs 接收參數。並使用 "ti" 這個物件對其輸入 xcom 值。ti 是 Airflow 管理 task instance 相關資訊的一個物件。我們使用裡面的 xcom_push 方法將參數儲存至 XCom。

接下來我們在 transform Task 的程式中使用xcom_pull 取得資訊,其中 key 值則帶入先前存入 XCom 的 key 值:xcom_push

def transform(**kwargs):
    print("transform data")
    ti = kwargs['ti']
    xcom_data = ti.xcom_pull(key='xcom_push')

完成改動之後儲存並執行 DAG 我們可以看到其運作的結果。首先我們先頂進去 Task 的執行流程圖頁面,並點擊 extract Task ,並在pop-up 視窗上選擇 "Xcom",我們看一下是否有成功儲存 Xcom 值。
https://ithelp.ithome.com.tw/upload/images/20231004/20140477g8Ulj0D9VM.png

進去之後發現,確實我們所儲存的 "xcom_push", "xcom_push_data" 這對 key - value 有顯示在上面。代表我們有將參數成功的存入 XCom 中。
https://ithelp.ithome.com.tw/upload/images/20231004/20140477lmyVhiq0Th.png

最後再來看Transform Task 的 log,看他是否有成功取得 "xcom_push" "xcom_push_data" 並顯示。由下圖我們也可以看到,"xcom_push" "xcom_push_data" 照我們所預期的被取得以及印出。

以 return value 實作

而另一種方式我們可以使用 function return 的方式使用 XCom。當 PythonOperator 使用的 function 具有回傳值時,Airflow 會將回傳值記錄於 XCom 當中。

我們將上述的程式改成使用 return value 的方式實作,extract Task 的 function改寫為:

def extract(**kwargs):
    print("extract from data source")
    ti = kwargs['ti']
    return "extracted_data"

transform Task 一樣使用xcom_pull 只不過所給的參數為 task_ids,其數值則是填入想要取得回傳值的Task id。

def transform(**kwargs):
    print("transform data")
    ti = kwargs['ti']
    extracted_data = ti.xcom_pull(task_ids='extract')
    print(extracted_data)

修改完之後一樣儲存之後執行 DAG。
我們一樣先點擊 extract Task 的 Xcom 看看 Airflow 如何儲存 return value。下圖中我們可以看到 Airflow 是將return value 存在 xcom 中,而key 則為 return_value。
https://ithelp.ithome.com.tw/upload/images/20231004/20140477NRXNNISaJh.png

當使用 xcom_pull 並帶入 task_id 時 Airflow 就會取得指定 task_id 的 return_value 值回傳。而在 Transform Task 的 log 當中,我們也可以看得到我們 return 的 "extracted_data" 這個字串被成功地顯示。

https://ithelp.ithome.com.tw/upload/images/20231004/20140477yqJmd79DqZ.png

看完了今天的文章,大家應該都能游刃有餘的使用 Xcom 進行 Task 之間的參數交通了,那我們明天見囉~


上一篇
『Day18』帶入參數執行 DAG
下一篇
『Day20』Variable 與 Template
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言