這篇文章主要是在教學如何在 ariflow 中建立一個可以執行 python function 的 operator,同時也會介紹如何使用 BranchPythonOperator 來進行任務分支的串接,如果有問題歡迎留言討論
上一篇有提到 BashOperator 這個東西,Bash 主要是在 linux 下命令的一個功能,而 PythonOperator 顧名思義就是一個可以執行 python function 的一個 operator 了,在這個 operator 當中提供了 python_callable
參數,用來連結要執行的 python function,下面為建立範例的步驟
import requests
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
這邊是這篇文章比較特殊的部分,由於後面會介紹到 BranchPythonOperator,所以筆者這邊利用 class 來儲存本次會用到的所有 task id,這樣在撰寫程式上面比較快速也可以防止手殘打錯字
class TaskID:
start_task_id = "start_task"
end_task_id = "end_task"
crawler_task_id = "crawler_task"
success_task_id = "crawl_success"
failed_task_id = "crawl_failed"
這邊撰寫了一個簡易的爬蟲,爬取目標為 批踢踢實業坊 的熱門看板,不做任何解析,只單純判斷 status_code 是否為 200,並印出 html code
def crawl_ptt():
url = "https://www.ptt.cc/bbs/index.html"
res = requests.get(url=url)
if res.status_code == 200:
print(res.text)
可以看到在 PythonOperator 當中將第三步寫的 crawl_ptt 函式丟進去 python_callable 這個參數裡面,這樣當 airflow 執行到這個 operator 的時候就會自動地去執行 crawl_ptt 裡面的內容
@dag(start_date=datetime.today(), tags=['user'])
def python_operator_demo_dag():
start_task = EmptyOperator(task_id=TaskID.start_task_id)
crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,
python_callable=crawl_ptt)
end_task = EmptyOperator(task_id=TaskID.end_task_id)
start_task >> crawl_task >> end_task
create_python_dag = python_operator_demo_dag()
這邊直接進入到 crawl_task 這個 operator 的執行 Log 當中察看結果,如果不清楚怎麼查看 Log 的人,可以參考 這篇文章 第三點
有時候根據程式執行的結果不同,會需要處理不同的事情,例如爬蟲成功時應該做資料寫入,失敗時應該做 log 輸出 or 通知使用整等程序,airflow 提供了 BranchPythonOperator 來替我們完成這項工作,下面為簡易的使用方式
P.S. import 套件以及 task id class 的部分會沿用上面範例做使用,下面不再贅述
可以看到在下面的函式當中,根據爬蟲執行的情況正常與否,會回傳出不同的 task id 讓 airflow 知道該執行哪個 operator
def crawl_ptt_branch():
url = "https://www.ptt.cc/bbs/index.html"
res = requests.get(url=url)
if res.status_code == 200:
return TaskID.success_task_id
else:
return TaskID.failed_task_id
下面為簡單的範例,主要用在 demo 當中,沒有寫很複雜的程式,單純印出字串提示使用者,在 dag 當中會利用 PythonOperator 進行串接使用
def success_crawl():
print("爬取成功")
def failed_crawl():
print("爬取失敗")
在 BranchPythonOperator 之後有可能會被執行的 PythonOperator,可以利用 [] 包起來表示,airflow 會自動根據回傳出來的 task_id 進行判斷要執行哪個 operator,在下面的成果展示執行紀錄當中,可以看到 failed operator 為粉紅色的 skipped 狀態,只有執行到 success operator 內的函式
P.S. end_task 內有一個參數 trigger_rule 為觸發條件,本次設定為沒有失敗就會觸發,後續會發一篇文章介紹使用方式
@dag(start_date=datetime.today(), tags=['user'])
def branch_operator_demo_dag():
start_task = EmptyOperator(task_id=TaskID.start_task_id)
crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,
python_callable=crawl_ptt_branch)
success_task = PythonOperator(task_id=TaskID.success_task_id,
python_callable=success_crawl)
failed_task = PythonOperator(task_id=TaskID.failed_task_id,
python_callable=failed_crawl)
end_task = EmptyOperator(task_id=TaskID.end_task_id,
trigger_rule=TriggerRule.NONE_FAILED)
start_task >> crawl_task >> [success_task, failed_task] >> end_task
create_branch_python_dag = branch_operator_demo_dag()