iT邦幫忙

2023 iThome 鐵人賽

DAY 29
0

不知不覺已經來到鐵人賽最後兩天了,加油!在讀本篇文章的讀者們,非常感謝你們一路的支持!
在本系列的初期,我們有介紹到 Airflow 的設計概念,其中一個很重要的理念就是 "Workflow as Code",所有的 Data pipeline 都使用 Code 表示。這對程式開發者是一大福音,因為我們可以程式的機制管理 pipeline,也可以導入如同上一篇所介紹的許多自動化的流程。而在其中,如何管理程式的品質就會是很重要的課題了。今天我們就來看看有哪些程式碼的設計及優化方向可以思考。

Coding Style, Conventions

PEP8 Style

在管理程式碼品質的第一步我們可以先確定程式碼的 Coding style 以及 Conventions,由於Airflow 都是以Python 進行開發,這對開發團隊是一件優點,可以以單一一種語言進行開發。Python 由於開發者種多,在社群中有提供多種風格指南可以使用,其中也包括廣泛被使用的 PEP8 風格指南。如果團隊還沒有決定相關風格指南的話,也非常建議可以使用 PEP8 開始,因為其相關資源算是非常完善的。

我們簡單來看看兩個範例,一個是有 PEP 8 風格,另一個則沒有。我們先來看看不遵循 PEP 8 的範例:

# 不符合 PEP 8 的樣式
def addNumbers(a,b):
    """這是一個不符合 PEP 8 的函數,缺少適當的註釋和空行。"""
    result = a+b
    return result

x=10
y=20

# 函數調用和變數命名不符合 PEP 8
sumResult=addNumbers(x,y)

# 這是一個合法的多行字串註釋,但縮排不符合 PEP 8
longString="""這是一個長字串,通常用於文件註釋或文檔字符串。
不符合 PEP 8 的縮排風格。
"""

print(sumResult)

接下來看看符合 PEP8 風格的寫法。

# 符合 PEP 8 風格
def add_numbers(a, b):
    """這是一個符合 PEP 8 的函數,有適當的註釋和空行。"""
    result = a + b
    return result

x = 10
y = 20

# 函數調用和變數命名也符合 PEP 8
sum_result = add_numbers(x, y)

# 這是一個合法的多行字串註釋
long_string = """這是一個長字串,通常用於文件註釋或文檔字符串。
符合 PEP 8 的縮排風格。
"""

print(sum_result)

透過以上的例子應該可以很直覺地感受到有沒有遵照 Coding Style進行程式編寫的重要性了吧,PEP 8 提供了一組關於Python代碼的最佳實踐,以確保代碼的可讀性和一致性。遵循PEP 8可以讓我們的代碼易於維護並更好的與其他Python代碼協同運作。

更多 PEP8 詳細介紹
https://www.python.org/dev/peps/pep-0008/

使用工具確認程式品質

我們可以透過 Python 的工具確認程式碼是否符合 PEP8的規範。像是我們在昨天 CI/CD pipeline 所加入的 Flake8 Pylint,都是可以確認程式品質的工具,透過這些工具可以得到我們的程式是否符合 PEP8 風格的分析結果。可以幫助我們優化我們的程式品質。
我們也可以直接使用 Formatter 工具,讓程式工具自動幫我們調整成符合 PEP8 風格的程式寫法。像是 Autopep8 或是 Black都是非常好使用的工具。

Airflow DAG 編寫風格的一致性

除了 Python 本身的 Coding style之外,Airflow 也有自己建立的一些編寫方式,像是 Relations 的編寫方法。團隊針對 Airflow 也可以對其編寫的規則,讓團隊能夠更好的持續維護程式碼品質。

動態生成

遵照 DRY(Don't Repeat Yourself) 原則,在程式當中我們盡可能能將重複出現的功能模組化,或是將其寫成Function 重複的調用。在 Airflow DAG 當中我們也可以把同樣性質 Task 或是 DAG 透過動態生成的方式建立。

使用 Loop 動態生成 Task

這邊我們透過 for loop 動態生成 Task:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def generate_task(task_id):
    def my_python_function():
        print(f"Executing task {task_id}")

    return PythonOperator(
        task_id=task_id,
        python_callable=my_python_function,
        dag=dag,
    )

dag = DAG('dynamic_task_generation', start_date=datetime(2023, 1, 1))

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

# 使用循環生成多個任務
for i in range(5):
    task = generate_task(f'task_{i}')
    start >> task >> end

根據配置生成 DAG

再來我們以 DAG 為範例,使用配置的文件動態生成 DAG。

import json
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

# 從配置文件中讀取DAG定義
with open('dag_config.json', 'r') as config_file:
    dag_config = json.load(config_file)

for dag_id, tasks in dag_config.items():
    dag = DAG(dag_id, start_date=datetime(2023, 1, 1))

    start = DummyOperator(task_id='start', dag=dag)
    end = DummyOperator(task_id='end', dag=dag)

    # 動態生成任務
    for task_id in tasks:
        task = DummyOperator(task_id=task_id, dag=dag)
        start >> task >> end

透過上面的例子,我們可以發現如果 Task 或 DAG 設計的良好的話,是能被重複使用藉此節省許多開發時間的。因此我們在設計上可以去思考怎麼樣設計 Task 或是 DAG 是可以方便重複被調用的。

嘗試使用 Airflow TaskFlow API

Airflow 一開始設計的設定是希望 Task 之間盡量的獨立,因為是身為 Data Orchestration的角色,主要整合不同服務之間的運作,所以沒有對參數的傳遞做特別的設計,需要用到 XComs 模組進行實作。然而隨著使用情景越來越多元,Airflow 2 中加入了 Airflow TaskFlow API,讓我們夠更優雅地進行參數的傳遞。Airflow TaskFlow API 是一個基於裝飾器的API,用於定義Python任務及其依賴關係。如果開發者主要使用 PythonOperator 並將數據作為XCom在它們之間傳遞,使用 Taskflow API 可以顯著的簡化代碼。

我們來使用Airflow TaskFlow編寫簡單ETL(提取、轉換和加載)工作範例。這個ETL工作流主要的工作為擷取數據,將其轉換為大寫,然後 load 到目標位置。

from airflow.decorators import task
from airflow.utils.dates import days_ago

@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def etl_dag():
    @task
    def extract_data(file_path: str):
        # 模擬提取數據的操作
        extracted_data = f"取得來自{file_path}的數據"
        return extracted_data

    @task
    def transform_data(data: str):
        # 將數據轉換為大寫
        transformed_data = data.upper()
        return transformed_data

    @task
    def load_data(data: str, target_path: str):
        # 模擬將數據 load 到目標位置的操作
        print(f"將轉換後的數據 {data} load 到 {target_path}")

    # 定義變數
    source_file = "source_data.csv"
    target_location = "destination/"

    # 依賴關係
    extracted = extract_data(source_file)
    transformed = transform_data(extracted)
    load_data(transformed, target_location)

etl_dag = etl_dag()

透過上面案例我們可以感受到,使用 TaskFlow API 讓整個DAG 程式變得非常簡潔易讀。在上面的範例中,和過去我們寫的DAG有幾個不同點:

  • 使用 @dag 取代過去宣告 DAG 的方式
  • 使用 @task 取代過去宣告的方式,這邊的task 預設為 PythonOperator
  • 傳遞參數時使用 Python 變數的方式進行傳送

TaskFlow API 傳遞參數的方式,也讓我們的參數能變得更明顯易讀。過去使用XCom模組時,由於參數是另外使用模組傳遞,透過程式碼不是那麼直觀可以看到參數傳遞的路徑,必須要細部的去看每一個 task 中存入的 XComs key名稱來找到對應的值。然而透過 TaskFlow API 我們可以直接在 Function的層次就可以得到解答。
雖然說 TaskFlow API 仍然有些缺點,如果不是使用 PythonOperator,或是仍有許多並非 TaskFlow API 寫法的程式,整合在一起時會讓開發者倍感困惑。但是如果善用 TaskFlow API 可以讓 DAG 程式碼的可讀性提升不少。

參數管理

在 Data pipeline 當中我們會連結許多不同的服務,也會需要不同的設定。如果要管理的 Data pipeline越來越多,情境越來越複雜,如果我們的參數是儲存在各個Data pipeline的話,那對於開發者以及後續維運者都會是一場災難。因此使用 Airflow 的管理工具對於程式碼的維運會非常有幫助。以下有一些建議可以使用的功能以及方式。

  • 使用 Connection 管理連線
  • 使用 Variable 管理參數
  • 使用外部 Config 檔案管理參數

透過適合的方式有效的管理各種參數,在維運和改動時不需要再對程式碼進行版本的更新。也讓環境設定和 DAG 程式碼可以更加的彼此獨立,可以減少許多開發中的混亂。

今天分享了一些程式碼優化的方向,希望有機會能夠和開發者多多交流分享想法,讓彼此的開發都能更加的提升。而很多規範的採用也還是回歸到團隊的討論,或許沒有一定的完美的做法,但會有最適合團隊的做法!


上一篇
『Day28』部署 DAG
下一篇
『Day30』Data pipeline 優化以及結語
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言