iT邦幫忙

0

Python & Airflow 學習筆記_Operator 間的參數傳遞 XCom

  • 分享至 

  • xImage
  •  

本文主要在教學如何在 Airflow 當中傳遞參數,Airflow 當中提供了 XCom 來進行參數傳遞,而根據官方文檔提示,在撰寫任務流程時,要盡量保持任務的獨立,但難免會有必須要傳遞參數的時候,因此若真的需要在不同任務間傳遞參數的話,要盡量以最簡化、只傳遞必要參數為原則使用 XCom,下面為簡單的 XCom 使用方式,如果有問題歡迎留言討論

一、簡介

XCom 主要用於 同一個 DAG 內的 Operator 之間的參數傳遞,不可用於跨 DAG 的情況

二、使用方式

使用方式非常簡單,只需要在 operator 內將要回傳出的參數直接使用 return 回傳出來即可,直接附上範例進行解釋

  1. import 套件
from airflow.decorators import dag
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
  1. 建立 python 函式,分別作用為傳出時間字串、使用時間字串
    可以看到 use_datetime 有利用 **kwargs 接收參數,並且在函式內有使用 kwargs.get("task_instance"),這行的作用在於接收 airflow 內的 task_instance 物件,此物件內會儲存該 DAG 內實作的 Operator (Task) 的相關資訊,接到之後我們就可以利用 xcom_pull 來取得指定 task_id 回傳的資訊
def get_datetime():
    return datetime.today().isoformat()


def use_datetime(**kwargs):
    task_instance = kwargs.get("task_instance")
    str_today = task_instance.xcom_pull(task_ids="get_datetime_task")
    print(str_today)
  1. 實際建立 DAG
    可以看到在 use_datetime_task 的部分有利用 provide_context=True 來將 airflow 後台的相關資訊傳入 operator,若為 True 則需要在對應的 python 函式內使用 **kwargs 來進行接收
@dag(start_date=datetime.today(), tags=['user'], schedule_interval="@daily")
def xcom_demo_dag():
    start_task = EmptyOperator(task_id="start")

    get_datetime_task = PythonOperator(task_id="get_datetime_task",
                                       python_callable=get_datetime)

    use_datetime_task = PythonOperator(task_id="use_datetime_task",
                                       python_callable=use_datetime,
                                       provide_context=True)

    end_task = EmptyOperator(task_id="end")

    start_task >> get_datetime_task >> use_datetime_task >> end_task


create_xcom_demo_dag = xcom_demo_dag()

三、結果呈現

附上 use_datetime_task 的執行結果
https://ithelp.ithome.com.tw/upload/images/20220827/20144024aNEUL16HWg.jpg


圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言