現在,我們來看一個簡單但真實的Airflow DAG 例子,並介紹其中一些常見的運算符(task)。
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
def filter_top_50_stocks(*args, **kwargs):
# 在這個函數中,您可以從 task1 回傳的股票數據中過濾出前50大的股票
# 並將結果存儲在 task instance 的上下文中,以便後續的 Postgres 操作使用
stocks = kwargs['ti'].xcom_pull(key='fetch_stock_data' # 在這裡獲取前50大股票的邏輯
top_50_stocks = filter_top_50() #省略實作
kwargs['ti'].xcom_push(key='top_50_stocks', value=top_50_stocks)
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'stock_data_etl',
default_args=default_args,
description='DAG to fetch and store stock data',
schedule_interval=timedelta(days=1), # Daily execution
catchup=False,
)
# Task 1: Fetch stock data
fetch_stock_data = SimpleHttpOperator(
task_id='fetch_stock_data',
method='GET',
http_conn_id='stock_api', # You need to set up a connection for the API
endpoint='/fetch_stock_data',
dag=dag,
)
# Task 2: PythonOperator to filter top 50 stocks
filter_top_50_stocks_task = PythonOperator(
task_id='filter_top_50_stocks',
python_callable=filter_top_50_stocks,
provide_context=True,
dag=dag,
)
# Task 3: Store data in Postgres
store_in_postgres = PostgresOperator(
task_id='store_in_postgres',
postgres_conn_id='postgres_connection', # You need to set up a connection for PostgreSQL
sql='INSERT INTO stock_data (date, symbol, closing_price) SELECT date, symbol, closing_price FROM external_stock_data WHERE symbol IN ({{ ti.xcom_pull(task_ids="filter_top_50_stocks", key="top_50_stocks") }});',
dag=dag,
)
# Task 4: Notify success
notify_success = SimpleHttpOperator(
task_id='notify_success',
method='POST',
http_conn_id='notification',
endpoint='/notify_success',
data={"message": "Stock data ETL job completed successfully."},
headers={"Content-Type": "application/json"},
dag=dag,
)
# Define task dependencies
fetch_stock_data >> filter_top_50_stocks_task >> store_in_postgres >> notify_success
在這個例子中,我們使用了幾種常見的運算符(task operator):
由於各式各樣的資料庫太多了,相關的 operator 基本上大同小異,就是提供 SQL 及 connection id 去讀取資料庫,就不一一介紹。
connection id 會跟 connection 元件有關係,這個我們後面會再提供。而我們抓下來的資料會透過 XCom 跟 SQL Template 傳遞,如同程式裡的這段:
sql='INSERT INTO stock_data (date, symbol, closing_price) SELECT date, symbol, closing_price FROM external_stock_data WHERE symbol IN ({{ ti.xcom_pull(task_ids="filter_top_50_stocks", key="top_50_stocks") }});'
兩個大括號 {{ }} 在 Airflow 內會自動被 parse,藉此取得前一個 task 傳來的值。
XCom 的用法之後也會再說明。
這個可能是 Airflow 內最常用到的 operator 了,由於 python 的易用跟流行性,即使你不熟任何 Airflow 內建或是第三方寫好的 plugin (相信我,它們很好用),你幾乎都可以在 PythonOperator 內自己寫出來。
例如 SimpleHttpOperator,你當然可以在 python operator 內寫一段 python code,使用 request 呼叫 API,效果大致上是一樣的,如果你認為這兩者是綁定的工作的話。
我們下一篇再來說明 PythonOperator 的一些用法跟注意事項。
通知可以用 on_sucess_callback 就可以了
是的,你說的沒錯。我這裡是為了展示常用的邏輯所以獨立拆成一個 task,但 Airflow 有提供這個接口,可以簡化我們的 DAG 邏輯。