在了解了 Airflow 的基本概念後,讓我們來看一個實際的例子,學習如何使用 Airflow 構建一個簡單的工作流程。在本篇範例中,我們將會創建一個 DAG,用於抓取網站數據、將數據存儲到資料庫中,並發送電子郵件通知。
default_args = {
'owner': 'abc',
'start_date': datetime(2023, 1, 1),
'retries': 0,
'email': ['abc@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
}
owner
:指定 DAG 的擁有者。start_date
:指定 DAG 的開始執行日期。retries
:指定任務失敗後的重試次數。這裡設置為 0
,表示不重試。email
:指定通知用的電子郵件地址。email_on_failure
和 email_on_retry
:分別指定是否在任務失敗或重試時發送電子郵件通知。這裡都設置為 False
。dag = DAG(
'dag_template',
default_args=default_args,
description='dag_template',
schedule_interval=timedelta(days=1), # 每天執行一次
catchup=False,
)
dag_template
:DAG 的唯一標識符號,名稱為 'dag_template'
。default_args
:前面定義的參數作為 DAG 的默認參數。description
:描述這個 DAG 的功能。schedule_interval
:設置 DAG 的排程間隔時間,這裡設定為每天執行一次。catchup
:設置為 False
,表示如果有過去未執行的任務不會被補執行。def crawl_and_store_to_csv():
pass
crawl_and_store_csv_task = PythonOperator(
task_id='crawl_and_store_csv',
python_callable=crawl_and_store_to_csv,
dag=dag,
)
crawl_and_store_to_csv
:這是一個 Python 函數,模擬爬蟲操作並將結果存儲到 CSV 檔案中。實際業務邏輯應該在這個函數內實現。PythonOperator
:Airflow 的 Python 操作器,用於執行 Python 代碼。task_id
:該任務的唯一標識符號。python_callable
:指定要執行的 Python 函數。create_data_table_task = PostgresOperator(
task_id='create_data_table',
postgres_conn_id="postgres_conn",
sql='''
CREATE TABLE IF NOT EXISTS a_table (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
subtitle VARCHAR(255),
);
''',
dag=dag,
)
PostgresOperator
:Airflow 的 PostgreSQL 操作器,用於執行 SQL 指令。postgres_conn_id
:指定 Airflow 中已配置的 PostgreSQL 連接 ID。sql
:要執行的 SQL 語句,這裡用來創建一個名為 a_table
的資料表。send_email_task = EmailOperator(
task_id='send_email',
to=['efg@gmail.com'],
subject='Airflow - 匯出報告',
html_content='<p>Your Airflow job has finished.</p><p>Date: {{ execution_date }}</p>',
files=['/opt/airflow/Downloads/report_20230619010550.pdf'],
dag=dag
)
EmailOperator
:Airflow 的電子郵件操作器,用於發送電子郵件。task_id
:該任務的唯一標識符號。to
:收件人的電子郵件地址。subject
:電子郵件的主題。html_content
:電子郵件的 HTML 內容,支持 Airflow 的模板變數,如 {{ execution_date }}
。files
:指定要附加到電子郵件中的檔案。crawl_and_store_csv_task >> create_data_table_task >> send_email_task
crawl_and_store_csv_task
(爬蟲任務),完成後執行 create_data_table_task
(創建資料表),最後執行 send_email_task
(發送電子郵件)。這個 DAG 實現了一個簡單的工作流程:
明天會附上完整的程式碼,大家可以期待一下!