昨天是用基本的 Slack API 來設定,但很多時候我們可能會需要透過 Webhook URL 來滿足更彈性的寄送需求,例如在 EC2 上執行完備份的通知,需求很單純也不想再另外開一個環境或專案達成,就會直接用 shell script 寫 .sh
檔案,或是用 curl
來達成 post requests,送通知到 slack,接下來接一步步開通 Webhooks,airflow 也有 function 可以直接用喔!
Activate Incoming Webhooks
,點擊下方的 Add new webhook to workspace
#alerts
傳通知到 slack
這件事,所以直接從 terminal 執行 curl
這段語法就能看到 Hello, World!
T0XXXXX/B0XXXXXXXXXX/XXXXXXXXXXX
這個部分https://hooks.slack.com/services/T0XXXXX/B0XXXXXXXXXX/XXXXXXXXXXX
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification
from pendulum import datetime
with DAG(
dag_id='slack_notification_dag',
start_date=datetime(2024, 9, 19),
schedule=None,
catchup=False
) as dag:
start_task = EmptyOperator(
task_id='start_task',
on_success_callback=send_slack_webhook_notification(
slack_webhook_conn_id="slack_conn_webhook",
text="DAG run successful! "),
dag=dag
)
start_task
send_slack_webhook_notification
slack_webhook_conn_id
....
with DAG(
dag_id='slack_notification_dag',
start_date=datetime(2024, 9, 19),
schedule=None,
catchup=False,
on_fail_callback=send_slack_webhook_notification(...)
) as dag:
.....
on_fail_callback=[send_slack_webhook_notification, other_func(), ...]
可以依照需求將 Airflow 變數加到錯誤訊息中,例如如果是放在 dag 層級,就可以帶入 ti.task_id
來取得錯誤的任務,有哪些變數可以用可以參考我去年的這篇文章Airflow Variable 變數的神秘魔法,或是到 airflow 官網查看更多變數
from airflow.providers.slack.notifications.slack_webhook import send_slack_webhook_notification
send_slack_webhook_notification(
slack_conn_id="slack_conn_webhook",
text="Dag ID : {{ ti.dag_id }}\nTask ID : {{ ti.task_id }}"
)
如果在通知上,想要連執行錯誤訊息都取得,可以試著再將 slack 通知在包一層做成 plugins,後續呼叫上就能自訂更多功能,後半部的專案可能有機會再做詳細的說明。