在了解了 Airflow 的基本概念後,讓我們來看一個實際的例子,學習如何使用 Airflow 構建一個簡單的工作流程。在本篇範例中,我們將會創建一個 DAG,用於抓取網站數據、將數據存儲到資料庫中,並發送電子郵件通知。

default_args = {
'owner': 'abc',
'start_date': datetime(2023, 1, 1),
'retries': 0,
'email': ['abc@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
}
owner:指定 DAG 的擁有者。start_date:指定 DAG 的開始執行日期。retries:指定任務失敗後的重試次數。這裡設置為 0,表示不重試。email:指定通知用的電子郵件地址。email_on_failure 和 email_on_retry:分別指定是否在任務失敗或重試時發送電子郵件通知。這裡都設置為 False。dag = DAG(
'dag_template',
default_args=default_args,
description='dag_template',
schedule_interval=timedelta(days=1), # 每天執行一次
catchup=False,
)
dag_template:DAG 的唯一標識符號,名稱為 'dag_template'。default_args:前面定義的參數作為 DAG 的默認參數。description:描述這個 DAG 的功能。schedule_interval:設置 DAG 的排程間隔時間,這裡設定為每天執行一次。catchup:設置為 False,表示如果有過去未執行的任務不會被補執行。def crawl_and_store_to_csv():
pass
crawl_and_store_csv_task = PythonOperator(
task_id='crawl_and_store_csv',
python_callable=crawl_and_store_to_csv,
dag=dag,
)
crawl_and_store_to_csv:這是一個 Python 函數,模擬爬蟲操作並將結果存儲到 CSV 檔案中。實際業務邏輯應該在這個函數內實現。PythonOperator:Airflow 的 Python 操作器,用於執行 Python 代碼。task_id:該任務的唯一標識符號。python_callable:指定要執行的 Python 函數。create_data_table_task = PostgresOperator(
task_id='create_data_table',
postgres_conn_id="postgres_conn",
sql='''
CREATE TABLE IF NOT EXISTS a_table (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
subtitle VARCHAR(255),
);
''',
dag=dag,
)
PostgresOperator:Airflow 的 PostgreSQL 操作器,用於執行 SQL 指令。postgres_conn_id:指定 Airflow 中已配置的 PostgreSQL 連接 ID。sql:要執行的 SQL 語句,這裡用來創建一個名為 a_table 的資料表。send_email_task = EmailOperator(
task_id='send_email',
to=['efg@gmail.com'],
subject='Airflow - 匯出報告',
html_content='<p>Your Airflow job has finished.</p><p>Date: {{ execution_date }}</p>',
files=['/opt/airflow/Downloads/report_20230619010550.pdf'],
dag=dag
)
EmailOperator:Airflow 的電子郵件操作器,用於發送電子郵件。task_id:該任務的唯一標識符號。to:收件人的電子郵件地址。subject:電子郵件的主題。html_content:電子郵件的 HTML 內容,支持 Airflow 的模板變數,如 {{ execution_date }}。files:指定要附加到電子郵件中的檔案。crawl_and_store_csv_task >> create_data_table_task >> send_email_task
crawl_and_store_csv_task(爬蟲任務),完成後執行 create_data_table_task(創建資料表),最後執行 send_email_task(發送電子郵件)。這個 DAG 實現了一個簡單的工作流程:
明天會附上完整的程式碼,大家可以期待一下!