上一篇跟大家分享了 Airflow 功能強大的各種 View。而這篇要來跟大家分享警報的實作方法。不同於 View 是開發者以及維運人員需要主動進行查看,警報則是由 Airflow 主通通知,在執行 DAG 的過程中有任何的問題,可以讓 Airflow 主動通知相關人員進行處理。
使用 Email 應該算是最實用也最通用的一種警報方法了吧。因為現在幾乎每個人都一定會有 Email,並且在開發人員也時常是使用 Email 作為一種認證。另外 Email 也算是一個通用的協定,各項相關的程式的支持都非常完善,也不用有另外的費用,因此 Email 是一個非常好用來做警報通知的一個媒介。
要實作 Email 其實非常簡單,在 DAG 預設的參數中在有欄位可以輸入想通知的人員的Email 位置,也可以設定基本的通知時機。
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'email': ['your_email@example.com'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG('weather_dag', default_args=default_args, schedule_interval=None)
上面的程式碼透過 'email' 參數指定要寄送的Email地址,'email_on_failure'、'email_on_retry' 則指定了傳送 Email的條件,在這邊是如果 DAG 執行失敗的話就傳送 Email。我們也可以針對特定的 Task 進行 email 的警報設定,這邊我以 weather_dag 裡的 extract_task 作為範例。
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
email=["your_email@example.com", "your_email2@example.com"],
email_on_failure=True,
email_on_retry=True
)
我們也可以極大化的客製化 Email 警報的相關訊息,透過 Airflow 內建的 email 模組,我們可以很快速的實作客製化的 Email 訊息。
from airflow.utils.email import send_email
def success_email_alert(context):
dag_run = context.get("dag_run")
content = "DAG executed successfully !"
subject = f"DAG {dag_run} successfully done"
send_email(to=your_emails, subject=subject, html_content=content)
dag = DAG(
start_date=datetime(2023, 10, 11),
schedule="@daily",
catchup=False,
on_success_callback=success_email_alert
)
我們可以看到上面透過 on_success_callback 參數,讓我們在DAG 成功時可以執行我們所給定的 function -- success_email_alert。透過 call_back 的設計可以讓我們能夠在各種執行結果之下選擇運行相關的 function。
常見的 call_back 有:
善用 call_back 可以讓我們實作警示系統時可以有更多的彈性。
而如果想要設定其他郵件的細部設定我們可以參考官方網站進行設定,其中包含 SMTP 的 host以及 Email 的 template等等。
參考官方網站:
https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
隨著科技不斷的推陳出新,除了Email 之外我們也多了許多平台可以使用,或許不像是 Email 一樣幾乎全世界都在使用,但是有時這些平台更貼近我們的生活,像是 Line 、Slack 等等。如果要傳遞警報導相關平台,只要平台有開設相關介面,我們可以客製化 Notifier 進行警報訊息的推送。
像是 Operator, Hook 一樣,Airflow 也提供 BaseNotifier,讓我們繼承使用。在這邊我們實作 WeatherNotifier 作為我們 weather_dag 使用的 Notifier。
from airflow.notifications.basenotifier import BaseNotifier
from XXX_platform import send_message
class WeatherNotifier(BaseNotifier):
# 傳遞警報給指定平台
template_fields = ("msg",)
def __init__(self, msg):
self.msg = msg
def notify(self, context):
task_id = context["ti"].task_id
task_state = context["ti"].state
send_message(
f"Task -- {t_id} finished! result: {t_state}, result message: {self.msg}"
)
有了 WeatherNotifier 後,我們可以將 Notifier 功能加入 load_task 的運作中,如果 load_task 失敗的話,我們就請他發 send_message request 給我們設定的平台。在這邊的 send_message 只是一個舉例,開發者可以把想串接的平台使用這個架構進行串接。
load_task = PythonOperator(
task_id="load_to_mysql",
python_callable=load_to_mysql,
on_failure_callback=WeatherNotifier(msg="failed!"),
)
這樣我們的 Notifier 功能就實作完成了!
今天分享了兩種警報訊息通知的方法。在 Airflow 框架當中,善用當中給的一些程式介面,可以幫助我們很優雅的處理 Data pipeline 流程中的通知功能。希望大家也能多多熟悉跟使用囉!