iT邦幫忙

2023 iThome 鐵人賽

DAY 28
0

我們前面講了 Airflow 跟 Flink 的一些基礎觀念跟開發,現在讓我們整理一下兩邊的差異。

首先,Airflow 是一個優秀的排程管理工具,它有自己的 DB 能記錄每個 job 的執行狀況跟參數,當某個 job 失敗之後,能在修正之後快速指定重啟。

但是如果兩個 task 之間有大量資料 (>2GB) 要傳輸,可能會遇到問題。如果想解決的話,又要將資料先暫存在他處,到下一個 task 再載入回來。對於流式資料的處理,也不是非常適合,畢竟它是「排程」

當遇到停機異常時,由於 Airflow 的執行狀況跟 XCom 是記錄在 metadata database 內,所以即使主機掛掉,重啟之後依然能讀取最後的執行階段,並將資料還原。

Flink 則相反,雖然它也有一個 WebUI 可以觀察目前狀況,也有提供 API 確認現在 job 的狀態,但它的排程僅能依賴外部,例如 Linux crontab。不過他是 Java 開發,所以在效能跟 threads 使用上會優於 Python。

當遇到停機異常時,Flink 如果連主要的 master 都掛掉,那執行中的 job 會消失。即使你有設定定期存 Checkpoint,你還是需要找出對應的 checkpoint 存檔點,要從一堆無意義的 hash 資料夾中尋回,存在一定難度。

https://ithelp.ithome.com.tw/upload/images/20230928/20161625yzOnA0p7yt.png
圖裡就是我某個主機長期執行之後,存下來的 checkpoint 目錄。

所以我建議,如果真的是 streaming 的 job,就讓它在 Flink 上長期掛著執行。再透過 Airflow 呼叫 Flink API 去監控狀態,記錄 jobID, parameters 等參數存在 XCom 內,必要時可以重啟。

而 batch 的工作,如果簡單的話,可以寫在 AIrflow 內,複雜的就在 Flink 開發後,透過 Airflow 排程呼叫。

我們可以寫一個 Airflow 的 operator 來簡化這件事

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import subprocess

class RunFlinkJobOperator(BaseOperator):
    """
    Custom Airflow Operator to run a Flink job using a JAR file.
    """

    @apply_defaults
    def __init__(
        self,
        jar_path,
        flink_home,
        flink_command="run",
        *args, **kwargs
    ):
        """
        Initialize the operator.

        :param jar_path: Path to the Flink JAR file.
        :param flink_home: Path to the Flink installation directory.
        :param flink_command: Flink command to run (default is 'run').
        """
        super().__init__(*args, **kwargs)
        self.jar_path = jar_path
        self.flink_home = flink_home
        self.flink_command = flink_command

    def execute(self, context):
        """
        Execute the Flink job using the specified JAR file.

        :param context: Airflow execution context.
        """
        # Construct the Flink command
        flink_cmd = f"{self.flink_home}/bin/flink {self.flink_command} {self.jar_path}"

        # Run the Flink job
        try:
            result = subprocess.run(flink_cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            self.log.info(result.stdout)
            self.log.info(result.stderr)
            self.log.info("Flink job completed successfully.")
        except subprocess.CalledProcessError as e:
            self.log.error(f"Flink job failed with error: {e}")
            raise e

甚至連 jar 檔都可以定期的透過 airflow 從 maven 下載更新,但這部份先略過不提。

希望這樣的組合能讓你的 ETL 更加流暢,讓適合的工具來協助我們。


上一篇
Airflow 的 XCom 與限制 - Day27
下一篇
來用 TDD 開發 Airflow DAG 吧 - Day29
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言