延續昨天的話題,我想要每天午夜完成一次從 Google Maps API 取得資料 ⮕ 計算各地點的評分 ⮕ 將結果寫入 CSV 這樣的動作,可以透過 Cronjob, Crontab 使用簡單的時間格式表達式來設定任務執行頻率,但透過終端機來處理複雜任務實在太辛苦了。
今天我們來聊聊 Airflow。它是一款開源平台,用來開發、排程與監控**批量導向 (batch-oriented) **工作流程的平台。透過 Airflow 的使用者介面,我們可以方便地管理工作流程的狀態,不用再透過黑畫面了!
*批量 (batch) 意指多筆資料一次性地進行處理,而不是即時處理單筆資料。
Airflow 的特色是工作流程即程式碼 (Workflows as Code) 的概念,所有的工作流程都是透過 Python 程式碼開發。
空口無憑,來看段定義工作流程的程式碼吧!
dag_example.py
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
# 執行時間與頻率設定
with DAG(
dag_id='fetch_dag',
start_date=datetime(2024, 9, 25),
schedule_interval="0 16 * * *",
) as dag:
# 任務的順序編排
start_task = DummyOperator(task_id='start')
end_task = DummyOperator(task_id='end')
start_task >> end_task
透過執行時間與頻率設定,以及任務的順序編排,Airflow 就幫你產出工作流程,而這個流程我們叫做 DAG (Directed Acyclic Graph,有向無環圖),表示任意相連的兩個任務必有上下游關係,不會循環相依。
在 Airflow 中,operator 是用來定義任務類型的核心概念。 例如 PythonOperator
就是用來執行 Python script 的任務, SQLExecuteQueryOperator
就是用來執行 SQL script 的任務。我們可以透過 operator 定義工作流程中每個任務的具體執行方式。
在 Airflow 中使用 operator 的方式是將實體化 (instantiate) 成具體的任務。tasks 負責決定如何在 DAG 中執行 operator 的工作。例如,我們可以將 PythonOperator 來創建多個獨立任務,來執行不同的 Python scripts。
以 Day 10 提到的 Google Maps API 資料收集、轉換及匯入為例,我們透過以下的程式碼編譯,就能夠在每天台灣時間午夜 0 點 (UTC 16 點) 定時進行任務流程!
dag_google_map_api.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from etl.extract import fetch_places_count
from etl.transform import calculate_score
from etl.load import save_to_csv
default_args = {
'owner': 'Shu-Ting',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='google_maps_api',
default_args=default_args,
description='ETL pipeline to fetch Google Maps API'
schedule_interval="0 16 * * *",
start_date=datetime(2024, 9, 25),
catchup=False,
) as dag:
# Task 1: Extract
extract_task = PythonOperator(
task_id='extract_data_task',
python_callable=extract_data,
provide_context=True
)
# Task 2: Transform
transform_task = PythonOperator(
task_id='transform_data_task',
python_callable=transform_data,
provide_context=True
)
# Task 3: Load
load_task = PythonOperator(
task_id='load_data_task',
python_callable=load_data,
provide_context=True
)
# 定義任務順序
extract_task >> transform_task >> load_task
Airflow 支援 Jinja 模板引擎,讓我們能在工作流程中使用模板語法,並且提供內建參數,例如常用的 {{ ds }}(工作流程執行的理論日期)。
例如,我們每天透過 SQL 語句把寫入 Data Lake 的 CSV 匯入 Date Warehouse (此處選用 BigQuery) 的表中,並且希望寫入時標示任務執行日在 _processed_at 欄位裡,就非常適合運用 Jinja 來達成這件事。
insert_data.sql
CREATE OR REPLACE EXTERNAL TABLE `datalake.google_maps_results_{{ ds }}`
location STRING,
score FLOAT64,
_processed_at DATE
)
OPTIONS (
format = 'CSV',
uris = ['gs://bucket/path/location_scores.csv'],
skip_leading_rows = 1
);
INSERT INTO `datawarehouse.location_score_snapshot` (
location,
score,
_processed_at
)
SELECT
location,
score
'{{ ds }}' AS _processed_at
FROM
`datalake.google_maps_results_{{ ds }}`
;
同時我們能夠透過程式碼拆分,讓 SQL 程式碼單獨放在 .sql,運行任務的順序就擺在 dag_google_maps_api.py
裡:
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator
)
...
# Task 4: Insert
insert_task = BigQueryInsertJobOperator(
task_id="create_external_table_task",
configuration={
"query": {
"query": Path("/path/templates/create_external_table.sql")
"useLegacySql": False
}
},
)
# 定義任務順序
extract_task >> transform_task >> load_task >> insert_task
如此一來,我們又落實了程式碼模組化的理念!
經過上述的介紹,若我們想要如 Day 03 所述開發一條 data pipeline 讓資料流轉,在 Airflow 體系下選用適合的 operator 並根據需求定義任務先後順序,理論上可以靈活地達成各式各樣工作流程編排。
『如果工作流程很複雜,任務的相依性很高,Airflow 還是能實現嗎?』
這個問題留待明天我們再來談!