儘管Airflow提供了許多內置的運算符(Operators),但有時候我們需要根據特定需求開發自己的自定義運算符。這篇文章將介紹如何在Airflow中開發自定義運算符。
首先,我們需要導入所需的庫:
from airflow.models import BaseOperator
然後,我們可以創建我們的自定義運算符類別:
class MyCustomOperator(BaseOperator):
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
# 在這裡執行您的自定義操作
pass
在上面的示例中,我們創建了一個名為MyCustomOperator
的自定義運算符,並在__init__
方法中接受一個自定義參數my_param
。您可以在execute
方法中實現您的自定義操作邏輯。
接下來,我們需要在Airflow中註冊這個自定義運算符。您可以在DAG文件中使用以下代碼註冊它:
from airflow import DAG
from my_custom_operator import MyCustomOperator
# 創建一個DAG
dag = DAG('my_dag', schedule_interval=None)
# 使用自定義運算符
my_task = MyCustomOperator(
task_id='my_task',
my_param='my_value',
dag=dag
)
最後,確保您的自定義運算符文件位於Airflow DAG文件夾中,以便Airflow能夠正確找到它。
但是有幾點事要注意:
Hook
的形式,這樣一來可以方便重覆使用,並在 execute()
內呼叫它PYTHONPATH
內如果有看前面的例子,應該還記得我們有用過 {{var.value.XXX}}
這種方式來取得參數。在自定義 Operator 時,這種寫法並不是預設能用在所有參數上的,必須事先在 class 內定義好。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
如下面這個例子,其實寫法並不能,只要在 class 層多一行 template_fields: Sequence[str] = ("param_name",)
這樣我們就可以在 DAG 呼叫時帶入 string 變數:
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
這招會有助於我們在開發自己的 operator 時,有效的簡化使用方法。由於每個 task 越獨立越好,除非有特殊目的,不然減少對上下文的接觸會比較好。
如果不用這種寫法,要取得 task_id 就要從 execute 的 context
參數拿出來了。