iT邦幫忙

2023 iThome 鐵人賽

DAY 9
2
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 9

[Day9] Airflow Tasks 之間的資料間諜 XComs

  • 分享至 

  • xImage
  •  

今天目標

最簡單的 BashOperator 已經使用過了,今天就要使用 PythonOperator 製作多個 Tasks,學習資料間諜 Xcoms 是怎麼在當中傳遞資訊的 ~

XComs

XComs 其實是 cross-communications 的縮寫,可以讓任務之間傳遞資料,但是針對少量資料設計的,大型資料還是盡量不要透過 XComs 傳遞。

實作時間

目前程式架構

from datetime import datetime
from airflow import DAG

with DAG(
    dag_id = 'python_dag',
    start_date = datetime(2023, 9, 24),
    schedule_interval=None
) as dag:
  • datetime 用來設定 start_date
  • DAG 用來創建 DAG,誒~應該是廢話/images/emoticon/emoticon01.gif
  • 接著我們可以開一個新的 python_dag.py,然後貼上上面的架構
  • 切記!檔案名稱和 dag_id 要一樣,都叫做 python_dag 吧~

加上新的 PythonOperator

from airflow.operators.python import PythonOperator

def say_hello():
    print("hello")

task1 = PythonOperator(
    task_id=say_hello,
    python_callable=say_hello
)

應該都知道這些要放在哪裡吧~

  1. PythonOperator 模組 import 就和 datetimeDAG import 放一起
  2. say_hello funtion 都可以,是獨立出來的,可以放在 importwith DAG( 的中間
  3. task1=PythonOperator(... 這部分最重要,要放在 with DAG( 的裡面

加起來就會變這樣

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

def say_hello():
    print("hello")

with DAG(
    dag_id = 'first_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval=None
) as dag:
    task1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello
    )

    task1

對了不要忘記最下方要加上 task1,airflow 才知道你要使用喔!
提醒+1: task_id 和 function 的名稱也盡量一樣,比較不會亂~

確認 python_dag 運作

  1. 搜尋 python_dag ,開啟開關
  2. 點擊 Trigger DAG 執行
  3. 確認 logs 的資料
  4. 有看到 hello 代表執行成功了
    https://ithelp.ithome.com.tw/upload/images/20230924/20135427s1XU54nugG.png

使用 XComs 在 tasks 之間傳資料

Before XComs

  1. ti 參數:代表 task instance 任務實例,可以從中存取 Airflow XComs 的值,可以把它想成是一個暫存區,或是你家一進門的鞋櫃,一回家會先把外套包包放在櫃子上,脫完鞋,再拿下來走進客廳。
  2. xcoms_pushxcoms_pull : 有在用 git 的人,肯定對 pull push 不陌生,在task 當中,使用 push 可以將值傳遞給 xcoms,pull 可以將值從 xcoms 拿回來。

It's Time to get hands dirty

def say_hello(ti):
    name = ti.xcom_pull(task_ids='send_name', key='name')
    print("hello", name) 

def send_name(ti):
    ti.xcom_push(key='name', value='Shawn')

我們接著建立兩個 function 來說明,send_namekeynamevalueShawn 的資料給 xcoms,到了 say_hello 我們再透過 task_ids=send_namekey=name 把資料拿回來,沒錯,就很像是 Dictionary 的 key-value 型態,所以一定要給 task_id 才能拿到值嗎?沒錯,因為整個 DAG 裡面的 function 都是用 Xcoms 在溝通的,一定要很明確才能拿到 value,就像是全家人一起把東西放到鞋櫃上,這時候突然沒電,你確定你拿到的外套是你的嗎?/images/emoticon/emoticon06.gif

with DAG(
    dag_id = 'python_dag',
    start_date = datetime(2023, 9, 22),
    schedule_interval=None
) as dag:
    task1 = PythonOperator(
    task_id='say_hello',
    python_callable=say_hello
    )

    task2 = PythonOperator(
    task_id='send_name',
    python_callable=send_name
    )

    task1 >> task2

上面都沒問題吧~都是一樣的內容以此類推~/images/emoticon/emoticon31.gif

check logs

記得每一次修改完程式,執行前都要重開 webserver 喔!(之後用 docker 就不用這麼麻煩了~)

  • 接著打開 Logs ,可以從 Grid View 或是 Graph 去點擊 say_hello 任務,就能看到 Logs 的頁籤了

https://ithelp.ithome.com.tw/upload/images/20230924/20135427ICe8PNlnTf.png

None~失敗~(登~登~登~登~/images/emoticon/emoticon04.gif
有人知道為什麼嗎?
有發現問題就在最後一行 task1 >> task2 嗎?
因為在執行 task1 的時候 XComs 根本還沒有 name 的資料。
所以只要交換順序就沒問題了。

task2 >> task1

https://ithelp.ithome.com.tw/upload/images/20230924/2013542749RxtOy6Cg.png

Do~Re~Mi~So~成功了~/images/emoticon/emoticon34.gif

小🪢

今天先到這邊吧~明天再繼續講 Airflow 的 Varaible 有哪些~


上一篇
[Day8] 關於 Airflow Web UI 的大小事
下一篇
[Day10] Airflow Variable 變數的神秘魔法
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言