iT邦幫忙

2024 iThome 鐵人賽

DAY 7
0
DevOps

我獨自升級:從水管工走向 DataOps系列 第 7

【Day 7】Airflow 用 Slack 傳送通知 - Webhook 設定與建議

  • 分享至 

  • xImage
  •  

前言

昨天是用基本的 Slack API 來設定,但很多時候我們可能會需要透過 Webhook URL 來滿足更彈性的寄送需求,例如在 EC2 上執行完備份的通知,需求很單純也不想再另外開一個環境或專案達成,就會直接用 shell script 寫 .sh 檔案,或是用 curl 來達成 post requests,送通知到 slack,接下來接一步步開通 Webhooks,airflow 也有 function 可以直接用喔!/images/emoticon/emoticon12.gif

透過 Incoming Webhooks 傳 Slack 通知

1. 開啟 Activate Incoming Webhooks,點擊下方的 Add new webhook to workspace

https://ithelp.ithome.com.tw/upload/images/20240920/20135427IK7d1ZdWxx.png

2. 接著選擇要用在哪個 channel,我都用在 #alerts

https://ithelp.ithome.com.tw/upload/images/20240921/20135427paQwRRPl6q.png

3. 點擊允許後 channel 當中就會出現已將應用整合的通知

https://ithelp.ithome.com.tw/upload/images/20240921/20135427wqWgWVJdZ1.png

4. 接著能拿到一串 curl requests 和 Webhook URL

https://ithelp.ithome.com.tw/upload/images/20240921/20135427xGSqvqcELM.png

  • 基本上只要透過這個 url 送出 post requests 就能達成 傳通知到 slack 這件事,所以直接從 terminal 執行 curl 這段語法就能看到 Hello, World!

https://ithelp.ithome.com.tw/upload/images/20240921/20135427C9kMjFpimX.png

5. 老方法:新增到 Airflow Connection

https://ithelp.ithome.com.tw/upload/images/20240921/2013542718M4gFdWtb.png

  • 注意只需要填入 Webhook Token 的部分,就是 Webhook URL 的後半段,T0XXXXX/B0XXXXXXXXXX/XXXXXXXXXXX 這個部分
https://hooks.slack.com/services/T0XXXXX/B0XXXXXXXXXX/XXXXXXXXXXX

6. Slack webhook 程式碼範例

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification
from pendulum import datetime

with DAG(
    dag_id='slack_notification_dag',
    start_date=datetime(2024, 9, 19),
    schedule=None,
    catchup=False
    ) as dag:

    start_task = EmptyOperator(
        task_id='start_task',
        on_success_callback=send_slack_webhook_notification(
            slack_webhook_conn_id="slack_conn_webhook",
            text="DAG run successful! "),
        dag=dag
    )

    start_task
  • 和昨天 slack api 差異有下列幾項:
    1. import 的函式不同,要改用send_slack_webhook_notification
    2. conn_id 的參數名稱也改成是 slack_webhook_conn_id
    3. 用 Slack webhook 不需給 channel 的參數,因為在上面的步驟已經設定過了

Airflow 通知其他建議

1. 錯誤 callback: on_fail_callback

  • 錯誤時的 callback 更重要,大多會將 slack 通知放在當中
  • 通常是直接設定在 dag 層級,有任何錯誤就能直接通知
  • 以上方範例做修改,會像這樣:
....
with DAG(
    dag_id='slack_notification_dag',
    start_date=datetime(2024, 9, 19),
    schedule=None,
    catchup=False,
    on_fail_callback=send_slack_webhook_notification(...)
    ) as dag:
    .....

2. 錯誤時,除了通知還要執行其他

  • 在 airflow 更新後(印象是一年內),callback 都可以同時執行多個函數,使用方式如下:
on_fail_callback=[send_slack_webhook_notification, other_func(), ...]
  • 只需要用 list 放入需要執行的內容即可
  • 通常錯誤時除了通知,會做一些修改或資料的檢查,避免過程中的修改會因為 error retry 而被重複執行

3. 透過 Airflow 變數取得更多資訊

可以依照需求將 Airflow 變數加到錯誤訊息中,例如如果是放在 dag 層級,就可以帶入 ti.task_id 來取得錯誤的任務,有哪些變數可以用可以參考我去年的這篇文章Airflow Variable 變數的神秘魔法,或是到 airflow 官網查看更多變數

from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification

send_slack_webhook_notification(
    slack_conn_id="slack_conn_webhook",
    text="Dag ID : {{ ti.dag_id }}\nTask ID : {{ ti.task_id }}"
)

後記

如果在通知上,想要連執行錯誤訊息都取得,可以試著再將 slack 通知在包一層做成 plugins,後續呼叫上就能自訂更多功能,後半部的專案可能有機會再做詳細的說明。


上一篇
【Day 6】Airflow 用 Slack 傳送通知 - Slack API 設定
下一篇
【Day 8】Airflow 卍解 -自訂 XCom 後端解除限制
系列文
我獨自升級:從水管工走向 DataOps30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言