iT邦幫忙

2023 iThome 鐵人賽

DAY 9
1

儘管Airflow提供了許多內置的運算符(Operators),但有時候我們需要根據特定需求開發自己的自定義運算符。這篇文章將介紹如何在Airflow中開發自定義運算符。

首先,我們需要導入所需的庫:

from airflow.models import BaseOperator

然後,我們可以創建我們的自定義運算符類別:

class MyCustomOperator(BaseOperator):
    
    def __init__(self, my_param, **kwargs):
        super().__init__(**kwargs)
        self.my_param = my_param

    def execute(self, context):
        # 在這裡執行您的自定義操作
        pass

在上面的示例中,我們創建了一個名為MyCustomOperator的自定義運算符,並在__init__方法中接受一個自定義參數my_param。您可以在execute方法中實現您的自定義操作邏輯。

接下來,我們需要在Airflow中註冊這個自定義運算符。您可以在DAG文件中使用以下代碼註冊它:

from airflow import DAG
from my_custom_operator import MyCustomOperator

# 創建一個DAG
dag = DAG('my_dag', schedule_interval=None)

# 使用自定義運算符
my_task = MyCustomOperator(
    task_id='my_task',
    my_param='my_value',
    dag=dag
)

最後,確保您的自定義運算符文件位於Airflow DAG文件夾中,以便Airflow能夠正確找到它。

但是有幾點事要注意:

  1. **init** 內不要進行昂貴的操作。例如像是產生一個 DB connection,或是載入某個大檔案的 I/O
  2. 反過來說,如果要跟 DB 連線,建議寫成 Hook 的形式,這樣一來可以方便重覆使用,並在 execute() 內呼叫它
  3. 如果你不想放在 dags/, plugins/, config/ 這些資料夾內,那務必記得將那個資料夾加到 PYTHONPATH

Template Field

如果有看前面的例子,應該還記得我們有用過 {{var.value.XXX}} 這種方式來取得參數。在自定義 Operator 時,這種寫法並不是預設能用在所有參數上的,必須事先在 class 內定義好。

class HelloOperator(BaseOperator):

    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

如下面這個例子,其實寫法並不能,只要在 class 層多一行 template_fields: Sequence[str] = ("param_name",)

這樣我們就可以在 DAG 呼叫時帶入 string 變數:

with dag:
    hello_task = HelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="Earth",
    )

這招會有助於我們在開發自己的 operator 時,有效的簡化使用方法。由於每個 task 越獨立越好,除非有特殊目的,不然減少對上下文的接觸會比較好。

如果不用這種寫法,要取得 task_id 就要從 execute 的 context 參數拿出來了。


上一篇
Airflow Connection 設定跟使用 - Day8
下一篇
Airflow TaskFlow 改寫 DAG - Day10
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言