iT邦幫忙

2023 iThome 鐵人賽

DAY 12
0
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 12

[Day12] 零地點突破‧改-Airflow Taskflow API(上)

  • 分享至 

  • xImage
  •  

前言

2020 年 12 月發布的 Apache Airflow 2.0,其中很重要的新功能就是 Taskflow API,可以讓我們更簡潔的撰寫 Airflow DAG,降低程式碼複雜度、減少行數。(相信是 Apache 聽到使用者的心聲了/images/emoticon/emoticon02.gif

Taskflow API 的優缺點

優點一:ti ❌

在任務之間傳遞資料,不需要使用 ti.xcom_pullti.xcom_push 了,可直接用 Python 常用的 return 傳遞資料。

優點二:with DAG(): ❌

不需要用with DAG() 把 Operator 包起來定義 task, task 定義可以和 python function直接寫一起,每次都要重複兩次是真的煩。

缺點:只有部分能用

只有提供的 Decorators 可以用,如果要使用的 Operator 沒提供,其他還是要用原來方式撰寫,或是只能把 Taskflow 和舊方法 Mix 再一起寫/images/emoticon/emoticon03.gif

目前可以用的 Decorators

  • @dag : DAG 裝飾器 => 取代 with DAG()
  • @task : python task 裝飾器 => 取代 PythonOperator
  • @task_group : 昨天學的 task group 也有更方便的寫法
  • @task.virtualenv : python 虛擬環境
  • @task.docker : docker 環境

實作飯粒🍚

Before🍚

今天要做的範例是假設拿到一筆 json 格式的訂單資料,要利用訂單金額訂單數量計算出平均的訂單金額,有故意把簡單的邏輯分成三個 tasks,再次實作昨天的 TaskGroup,接著運用 Taskflow APItasktask_groupdag,看能夠縮短多少行。

傳統做法 code

import json
from datetime import datetime
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator

def extract(ti): #用json模組讀取資料
    json_string = """
                    [
                        {
                            "order_id": "1001",
                            "order_item": "薯餅蛋餅",
                            "order_price": 45
                        },
                        {
                            "order_id": "1002",
                            "order_item": "大冰奶",
                            "order_price": 35
                        }
                    ]
                  """
    order_data = json.loads(json_string)
    ti.xcom_push(key='order_data', value=order_data)
   
def transform_sum(ti): #計算薯餅蛋餅和大冰奶的總金額
    order_data_list = ti.xcom_pull(task_ids='extract', key='order_data')
    order_total = 0
    for order_dict in order_data_list:
        order_total += order_dict['order_price']
    ti.xcom_push(key='order_total', value=order_total)

def transform_count(ti): #json裡面有多少數量
    order_data_list = ti.xcom_pull(task_ids='extract', key='order_data')
    order_count = len(order_data_list)
    ti.xcom_push(key='order_count', value=order_count)

def transform_averge(ti): #總金額/數量=平均金額
    order_total = ti.xcom_pull(
        task_ids='transform.transform_sum', key='order_total')
    order_count = ti.xcom_pull(
        task_ids='transform.transform_count', key='order_count')
    order_average = order_total/order_count
    ti.xcom_push(key='order_averge', value=order_average)

def load(ti): #將平均金額print出來
    order_averge = ti.xcom_pull(
        task_ids='transform.transform_averge', key='order_averge')
    print(f"Average Order Price: {order_averge}")

with DAG(
    dag_id='traditional_etl_dag',
    schedule_interval=None,
    start_date=datetime(2023, 9, 27)
) as dag:
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract
    )
    with TaskGroup(group_id='transform') as transform:
        transform_sum = PythonOperator(
            task_id='transform_sum',
            python_callable=transform_sum
        )
        transform_count = PythonOperator(
            task_id='transform_count',
            python_callable=transform_count
        )
        transform_averge = PythonOperator(
            task_id='transform_averge',
            python_callable=transform_averge
        )
        [transform_sum, transform_count] >> transform_averge
    load = PythonOperator(
        task_id='load',
        python_callable=load
    )
    extract >> transform >> load

Hint

注意在使用 TaskGroup 的時候,傳進裡面 Xcomstask id 會變成 group_id.task_id,因為 TaskGroup 能夠在不同的 Group 也可以用相同的 task_id,所以前面要加上group_id 來辨識,如果覺得這樣很麻煩,可以在 TaskGroup 的設定加上 prefix_group_id=False,就能維持原來的 task_id,記得這時候的 task_id 就不能有重複了。
以上面舉例要改成:

with TaskGroup(group_id='transform',prefix_group_id=False) as transform_data:

這樣在 ti.xcom_pull 的時候 前面就不用加 transform

哇~真的是落落長,整個 DAG 的流程如下:
https://ithelp.ithome.com.tw/upload/images/20230927/20135427BVM7cek65B.png
https://ithelp.ithome.com.tw/upload/images/20230927/20135427zYxGi41cpm.png

Taskflow 做法 code

import json
from airflow.decorators import dag, task, task_group
from datetime import datetime
@dag(schedule_interval=None, start_date=datetime(2023, 9, 27))
def taskflow_etl_dag():
    @task()
    def extract():
        json_string = """
                        [
                            {
                                "order_id": "1001",
                                "order_item": "薯餅蛋餅",
                                "order_price": 45
                            },
                            {
                                "order_id": "1002",
                                "order_item": "大冰奶",
                                "order_price": 35
                            }
                        ]
                        """
        order_data = json.loads(json_string)
        return order_data
    @task_group
    def transform(order_data):
        @task()
        def transform_sum(order_data_json):
            order_total = 0
            for order_dict in order_data_json:
                order_total += order_dict['order_price']
            return order_total
        @task()
        def transform_count(order_data_json):
            order_count = len(order_data_json)
            return order_count
        @task()
        def transform_averge(order_total, order_count):
            order_average = order_total/order_count
            return order_average
        order_average_result = transform_averge(
            transform_sum(order_data), transform_count(order_data))
        return order_average_result
    @task()
    def load(order_average):
        print(f"Average Order Price: {order_average}")
    load(transform(extract()))
taskflow_etl_dag()
  • 減少了大約 25 行
  • 但是我認為用>>的方式設定相依性,會比用function return來說更方便

結語

今天就到這邊吧~明天就會去開始用 docker 了,應該會先從 docker 基礎開始介紹,接著說明 docker-compose ,最後才會安裝 airflow 的 docker,這個部分也是會 step by step 每個步驟都會截圖。


更新:docker要到後天了,突然發現 Taskflow API 好像沒有講 code 細節,明天還是再細講一下。


上一篇
[Day11] 當我們同在一起 - Airflow Task Group
下一篇
[Day13] 零地點突破‧改-Airflow Taskflow API(下)
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言