iT邦幫忙

2023 iThome 鐵人賽

DAY 27
0

在我們開發 weather_dag 之後,我們介紹了如何透過 View 以及警報來監控 DAG 執行的狀況。在開發之後其實還有一件非常重要的事,那就是測試,對於開發人員來說,如果總是使用手動進行測試,不僅費時費工,而且甚至會有遺漏的部分。因此編寫測試是很重要的一件事。

雖然說重要,但是對於 Airflow 是有點難度的事,我們都知道 Airflow 主要使用在 Data Orchestration,主要在進行各個工作流程的排程以及管理相互關係,這表示 Airflow 本身會串接不同的系統,所以涉及的服務範圍非常廣。我們今天就來大家探討Airflow 撰寫測試的方法,這邊算是起一個頭,在實務上會碰到的選擇以及狀況可以說是五花八門,要怎麼判斷以及決定,也歡迎大家能夠一起討論!

單元測試

首先我們先來看看 Airflow 的單元測試可以如何實作。在這邊我們使用 pytest 進行開發。

測試 function

我們可以透過撰寫 function 測試進行單元測試。針對我們weather_dag 的轉換是否成功進行測試。

from weather_dag import transform_data
def test_transform_data():
    mock_data = ["27.1", "0.79", "多雲時晴", "2021-10-02 11:00:00"]
    result = transform_data(mock_data)
    assert result == ["27.1", "79", "多雲時晴", "2021-10-02 11:00:00"]

使用 pytest.fixture

我們可以首先對於 DAG 進行 loading 是否成功的測試。這邊使用 DagBag 模組,DagBag 用於管理和載入所有DAG 資訊。DagBag 可以實現動態載入和管理DAG。下面的程式碼使用 DagBag 查看 DAG 運行的資訊。

import pytest
from airflow.models import DagBag

@pytest.fixture()
def dagbag():
    return DagBag()

def test_mydag_loaded(dagbag):
    mydag = dagbag.get_dag(dag_id="weather_dag")
    assert mydag is not None
    assert dagbag.import_errors == {}
    assert len(mydag.tasks) == 3

在這邊的 fixture 可以讓 pytest 進行取代,在這邊我們在 dagbag 的函式上加上@pytest.fixture()在 test_mydag_loaded執行的時候則會執行上面的 dagbag,可以透過這進行環境的獨立。

使用 mocker 模擬資料

針對客製化的 Operator 我們也可以進行相關的測試,而如果想要獨立相關環境,我們可以使用 pytest 的 mocker 進行環境資料模擬,我們就不需要真的接到外部的資源,實現環境獨立的測試。以下我們舉 CustomOperator 作為範例,CustomerOperator 是一個客製化的 Operator,會進行 MySQL 的操作。在下面的範例中,我們透過 mocker 模擬一個 MySql 的連線進行測試。

import datetime
import pytest
from airflow.models import DAG, Connection
from airflow.providers.mysql.hooks.mysql import MySqlHook
from CustomModule import CustomOperator

def test_custom_operator(mocker):
    mocker.patch.object(
        MySqlHook,
        "get_connection",
        return_value=Connection(
            conn_id="mysql",
            conn_type="mysql",
            host="localhost",
            login="admin",
            password="admin",
            port="3306",
        ),
        )

    task = CustomOperator(
        task_id="test",
        start_date="{{ prev_ds }}",
        end_date="{{ ds }}",
        mysql_conn_id="mysql_id",
        insert_query=(
            "INSERT INTO my_table (test1, test2) "
            "VALUES (1,2)"
        ),
        dag=test_dag,
    )

    mysql_hook = MySqlHook()

    row_count = mysql_hook.get_first("SELECT COUNT(*) FROM my_table")[0]
    assert row_count > 0

執行 pytest

而如果fixture 想要集中管理的話,可以放置至 conftest.py 中,pytest 預設會引入其中的 fixture 作為使用。完成我們的建置之後我們可以使用 pytest 指令將所寫的測試執行。

pytest -v

也可以加入 cov 用以插入覆蓋率的報告

pytest --cov=your_module 

透過單元測試,我們可以確保我們程式基本的邏輯正確性。測試可以應用在不同的層面上,小型的單個工作單元我們可以通過單元測試進行測試。而儘管這些單元測試可以驗證正確的行為,但它們不驗證由多個這樣的單元組合而成的系統的行為。因此我們也需要撰寫了整合測試 (Integration Testing) 以及 端對端測試 (End-To-End Testing),藉此驗證多個組件一起運作的行為。

整合測試中我們必須盡可能的將環境與 production 環境盡量對齊,可以使用 AWS 或是 Azure、GCP 相關的環境設置,也可以使用類似 Whirl 的環境模擬服務。由於 Airflow 所連結的服務非常的多,因此可以找尋團隊適合的方法進行最適合的整合測試。

希望今天測試的分享能夠讓大家透過測試讓 Data pipeline 更加的堅固。


上一篇
『Day26』加入警報讓 Airflow 主動通知
下一篇
『Day28』部署 DAG
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言