iT邦幫忙

2024 iThome 鐵人賽

DAY 11
1

https://ithelp.ithome.com.tw/upload/images/20240925/20168816ZsicWOQj56.png

延續昨天的話題,我想要每天午夜完成一次從 Google Maps API 取得資料 ⮕ 計算各地點的評分 ⮕ 將結果寫入 CSV 這樣的動作,可以透過 Cronjob, Crontab 使用簡單的時間格式表達式來設定任務執行頻率,但透過終端機來處理複雜任務實在太辛苦了。

今天我們來聊聊 Airflow。它是一款開源平台,用來開發、排程與監控**批量導向 (batch-oriented) **工作流程的平台。透過 Airflow 的使用者介面,我們可以方便地管理工作流程的狀態,不用再透過黑畫面了!
*批量 (batch) 意指多筆資料一次性地進行處理,而不是即時處理單筆資料。

工作流程即程式碼|Workflows as Code


Airflow 的特色是工作流程即程式碼 (Workflows as Code) 的概念,所有的工作流程都是透過 Python 程式碼開發。

  • 動態性:透過 Python 程式碼的編譯 ,我們能夠在 Airflow 框架下動態地生成工作流程。
  • 擴充性:Airflow 提供多種 operator,讓我們能夠透過 Python 整合不同的技術,例如 SQL、bash 等。且所有元件都具備擴充性,可隨著環境的需要進行調整。
  • 靈活性:Airflow 整合了 Jinja 模板 (jinja template engine) 功能,讓我們可以輕鬆地把工作流程細節參數化。

空口無憑,來看段定義工作流程的程式碼吧!

dag_example.py

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

# 執行時間與頻率設定
with DAG(
    dag_id='fetch_dag',
    start_date=datetime(2024, 9, 25),
    schedule_interval="0 16 * * *",
) as dag:
    # 任務的順序編排
    start_task = DummyOperator(task_id='start')
    end_task = DummyOperator(task_id='end')

    start_task >> end_task

透過執行時間與頻率設定,以及任務的順序編排,Airflow 就幫你產出工作流程,而這個流程我們叫做 DAG (Directed Acyclic Graph,有向無環圖),表示任意相連的兩個任務必有上下游關係,不會循環相依。

執行單位|Operator


在 Airflow 中,operator 是用來定義任務類型的核心概念。 例如 PythonOperator 就是用來執行 Python script 的任務, SQLExecuteQueryOperator 就是用來執行 SQL script 的任務。我們可以透過 operator 定義工作流程中每個任務的具體執行方式。

任務|Task


在 Airflow 中使用 operator 的方式是將實體化 (instantiate) 成具體的任務。tasks 負責決定如何在 DAG 中執行 operator 的工作。例如,我們可以將 PythonOperator 來創建多個獨立任務,來執行不同的 Python scripts。
Day 10 提到的 Google Maps API 資料收集、轉換及匯入為例,我們透過以下的程式碼編譯,就能夠在每天台灣時間午夜 0 點 (UTC 16 點) 定時進行任務流程!

dag_google_map_api.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from etl.extract import fetch_places_count
from etl.transform import calculate_score
from etl.load import save_to_csv

default_args = {
    'owner': 'Shu-Ting',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='google_maps_api',
    default_args=default_args,
    description='ETL pipeline to fetch Google Maps API'
    schedule_interval="0 16 * * *",
    start_date=datetime(2024, 9, 25),
    catchup=False,
) as dag:

    # Task 1: Extract
    extract_task = PythonOperator(
        task_id='extract_data_task',
        python_callable=extract_data,
        provide_context=True
    )

    # Task 2: Transform
    transform_task = PythonOperator(
        task_id='transform_data_task',
        python_callable=transform_data,
        provide_context=True
    )

    # Task 3: Load
    load_task = PythonOperator(
        task_id='load_data_task',
        python_callable=load_data,
        provide_context=True
    )

    # 定義任務順序
    extract_task >> transform_task >> load_task

參數化|Templating with Jinja


Airflow 支援 Jinja 模板引擎,讓我們能在工作流程中使用模板語法,並且提供內建參數,例如常用的 {{ ds }}(工作流程執行的理論日期)。
例如,我們每天透過 SQL 語句把寫入 Data Lake 的 CSV 匯入 Date Warehouse (此處選用 BigQuery) 的表中,並且希望寫入時標示任務執行日在 _processed_at 欄位裡,就非常適合運用 Jinja 來達成這件事。

insert_data.sql

CREATE OR REPLACE EXTERNAL TABLE `datalake.google_maps_results_{{ ds }}`
    location STRING,
    score FLOAT64,
    _processed_at DATE
)
OPTIONS (
    format = 'CSV',
    uris = ['gs://bucket/path/location_scores.csv'],
    skip_leading_rows = 1
);

INSERT INTO `datawarehouse.location_score_snapshot` (
    location,
    score,
    _processed_at
)
SELECT
     location,
     score
     '{{ ds }}' AS _processed_at
FROM
    `datalake.google_maps_results_{{ ds }}`
;

同時我們能夠透過程式碼拆分,讓 SQL 程式碼單獨放在 .sql,運行任務的順序就擺在 dag_google_maps_api.py 裡:

from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryInsertJobOperator
)

...

# Task 4: Insert
insert_task = BigQueryInsertJobOperator(
    task_id="create_external_table_task",
    configuration={
        "query": {
            "query": Path("/path/templates/create_external_table.sql")
            "useLegacySql": False
        }
    },
)

# 定義任務順序
extract_task >> transform_task >> load_task >> insert_task

如此一來,我們又落實了程式碼模組化的理念!

小結論:善用 Airflow


經過上述的介紹,若我們想要如 Day 03 所述開發一條 data pipeline 讓資料流轉,在 Airflow 體系下選用適合的 operator 並根據需求定義任務先後順序,理論上可以靈活地達成各式各樣工作流程編排。

『如果工作流程很複雜,任務的相依性很高,Airflow 還是能實現嗎?』

這個問題留待明天我們再來談!


上一篇
《資料與程式碼的交鋒》Day 10-程式碼模組化
下一篇
《資料與程式碼的交鋒》Day 12-任務相依性設計
系列文
資料與程式碼的交鋒 - Data Engineer 與合作夥伴的協奏曲 30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言