iT邦幫忙

0

Python & Airflow 學習筆記_PythonOperator 和 BranchPythonOperator

  • 分享至 

  • xImage
  •  

這篇文章主要是在教學如何在 ariflow 中建立一個可以執行 python function 的 operator,同時也會介紹如何使用 BranchPythonOperator 來進行任務分支的串接,如果有問題歡迎留言討論

一、PythonOperator

上一篇有提到 BashOperator 這個東西,Bash 主要是在 linux 下命令的一個功能,而 PythonOperator 顧名思義就是一個可以執行 python function 的一個 operator 了,在這個 operator 當中提供了 python_callable 參數,用來連結要執行的 python function,下面為建立範例的步驟

(一)、import 套件

import requests
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

(二)、建立 task id class

這邊是這篇文章比較特殊的部分,由於後面會介紹到 BranchPythonOperator,所以筆者這邊利用 class 來儲存本次會用到的所有 task id,這樣在撰寫程式上面比較快速也可以防止手殘打錯字

class TaskID:
    start_task_id = "start_task"
    end_task_id = "end_task"
    crawler_task_id = "crawler_task"
    success_task_id = "crawl_success"
    failed_task_id = "crawl_failed"

(三)、建立簡易 python 爬蟲

這邊撰寫了一個簡易的爬蟲,爬取目標為 批踢踢實業坊 的熱門看板,不做任何解析,只單純判斷 status_code 是否為 200,並印出 html code

def crawl_ptt():
    url = "https://www.ptt.cc/bbs/index.html"
    res = requests.get(url=url)
    if res.status_code == 200:
        print(res.text)

(四)、建立 DAG

可以看到在 PythonOperator 當中將第三步寫的 crawl_ptt 函式丟進去 python_callable 這個參數裡面,這樣當 airflow 執行到這個 operator 的時候就會自動地去執行 crawl_ptt 裡面的內容

@dag(start_date=datetime.today(), tags=['user'])
def python_operator_demo_dag():
    start_task = EmptyOperator(task_id=TaskID.start_task_id)

    crawl_task = PythonOperator(task_id=TaskID.crawler_task_id,
                                python_callable=crawl_ptt)

    end_task = EmptyOperator(task_id=TaskID.end_task_id)

    start_task >> crawl_task >> end_task
    
create_python_dag = python_operator_demo_dag()

(五)、成果展示

這邊直接進入到 crawl_task 這個 operator 的執行 Log 當中察看結果,如果不清楚怎麼查看 Log 的人,可以參考 這篇文章 第三點
https://ithelp.ithome.com.tw/upload/images/20220821/201440249TSwjRYGHk.jpg

二、BranchPythonOperator

有時候根據程式執行的結果不同,會需要處理不同的事情,例如爬蟲成功時應該做資料寫入,失敗時應該做 log 輸出 or 通知使用整等程序,airflow 提供了 BranchPythonOperator 來替我們完成這項工作,下面為簡易的使用方式

P.S. import 套件以及 task id class 的部分會沿用上面範例做使用,下面不再贅述

(一)、建立爬蟲函式

可以看到在下面的函式當中,根據爬蟲執行的情況正常與否,會回傳出不同的 task id 讓 airflow 知道該執行哪個 operator

def crawl_ptt_branch():
    url = "https://www.ptt.cc/bbs/index.html"
    res = requests.get(url=url)
    if res.status_code == 200:
        return TaskID.success_task_id
    else:
        return TaskID.failed_task_id

(二)、建立成功與失敗函式

下面為簡單的範例,主要用在 demo 當中,沒有寫很複雜的程式,單純印出字串提示使用者,在 dag 當中會利用 PythonOperator 進行串接使用

def success_crawl():
    print("爬取成功")


def failed_crawl():
    print("爬取失敗")

(三)、建立 DAG

在 BranchPythonOperator 之後有可能會被執行的 PythonOperator,可以利用 [] 包起來表示,airflow 會自動根據回傳出來的 task_id 進行判斷要執行哪個 operator,在下面的成果展示執行紀錄當中,可以看到 failed operator 為粉紅色的 skipped 狀態,只有執行到 success operator 內的函式

P.S. end_task 內有一個參數 trigger_rule 為觸發條件,本次設定為沒有失敗就會觸發,後續會發一篇文章介紹使用方式

@dag(start_date=datetime.today(), tags=['user'])
def branch_operator_demo_dag():
    start_task = EmptyOperator(task_id=TaskID.start_task_id)

    crawl_task = BranchPythonOperator(task_id=TaskID.crawler_task_id,
                                      python_callable=crawl_ptt_branch)

    success_task = PythonOperator(task_id=TaskID.success_task_id,
                                  python_callable=success_crawl)

    failed_task = PythonOperator(task_id=TaskID.failed_task_id,
                                 python_callable=failed_crawl)

    end_task = EmptyOperator(task_id=TaskID.end_task_id,
                             trigger_rule=TriggerRule.NONE_FAILED)

    start_task >> crawl_task >> [success_task, failed_task] >> end_task
    
create_branch_python_dag = branch_operator_demo_dag()

(四)、成果展示

  1. 流程圖https://ithelp.ithome.com.tw/upload/images/20220821/20144024QxQTSDLjQM.jpg
  2. 執行結果https://ithelp.ithome.com.tw/upload/images/20220821/20144024OnTTLQNpL0.jpg
  3. 執行紀錄 (請忽略前面測試時的紀錄,成功紀錄會使用紅色框框標註)https://ithelp.ithome.com.tw/upload/images/20220821/20144024LJgCid5wIN.jpg

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言