iT邦幫忙

2023 iThome 鐵人賽

DAY 14
0

安裝完環境之後,想必在讀本系列文章的你,對於開發 Data pipeline 必定是躍躍欲試!如果對 Airflow 有點熟悉的朋友,這幾天可以簡單瀏覽過或是當成複習,未來的章節我們會再更深入討論 Airflow 開發的各種情行。
由於每位讀者的電腦環境都不盡相同,因此我在說明的時候都是採取以 Docker 的方式的操作方式,因為大部分的作業系統都可以使用Docker 進行 Airflow 的開發以及安裝。

DAG 存放位置

在 Airflow 當中有管理 DAG 存放的位置的資料夾。我們要將我們所編寫的 DAG 程式,放置到所指定的資料夾當中,Airflow 會掃描資料夾中的 DAG 程式碼。以 Docker 的方式為例。Docker compose 會將與其同一個資料夾層級的名為 dags 的資料夾映射進去 docker 容器當中。因此我們需要將 DAG 程式碼放置處該位置。

動手寫起來

那我們就要來開發我們的第一個 DAG 了,我們先建立一個 ETL pipeline 的雛形。其中包含 Extract, Transform, Load 三個任務。為了讓大家能夠先專注在 Airflow 的開發概念,我們在每個任務的動作會先以 print 的方式代替。

建立 DAG 程式碼

首先我們先建立一個DAG 文件名為 first_dag.py並放置在 dags 資料夾當中

編寫 DAG 程式

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# 定義各個任務的動作
def extract:
    print("extract from data source")

def transform:
    print("transform data")
    
def extract:
    print("load data to the target")

# 定義DAG
with DAG(
    "first_dag",
    default_args={
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    description="my first DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    
    # 定義 Task
    extract_task = PythonOperator(
    task_id="extract", python_callable=extract, dag=dag,
    )

    transform_task = PythonOperator(
    task_id="transform", python_callable=transform, dag=dag,
    )
    
    load_task = PythonOperator(
    task_id="load", python_callable=load, dag=dag,
    )
    
    # 定義 Task 關聯關係
    extract_task >> transform_task >> load_task
  • 定義 function
    在第一部分,我們定義 python function -- extract, transform load 所做的事。在這邊我們把所做的事使用 print 方式代替。定義 function 是為了讓 airflow 的 task 進行調用。

  • 建立 DAG
    在中間的部分可以看到 with DAG( ... ) as dag 的寫法,這是讓我們可以定義這個文件的 dag 的基本資訊,以及其相關執行的任務。

  • 描述 DAG 中的 Task
    在 Airflow 當中有一個重要的元件為 Task,他是工作任務的基本單位,我們會在後面的篇章進一步的介紹,在這邊我們先了解整個 Airflow 的開發方法和邏輯。在這支DAG 我們定義了三個 task :extract_task、transform_task 以及 load_task。在各個 task 當中皆使用 PythonOperator 進行運作,而各個 task 分別定義其 task_id 以及 呼叫的 python function 等資訊

  • 描述 task 的執行關係
    在程式碼的最後,我們把所定義的 task 變數,使用 ">>" 符號進行連結,這是用來定義 DAG 中各個任務的順序關係以及執行關係。

執行

定義好了 Airflow 的程式碼之後,接下來就是我們見證奇蹟的時刻了~
我們執行 docker compose 的指令

docker-compose up

docker 成功將 Airflow 建立完成之後,我們進入 Airflow 的頁面,Airflow 的網頁位置為自己的本機加上 port 號碼,預設為8080,所以通常為 127.0.0.1:8080。進入頁面之後我們可以看到我們所編寫的 DAG 出現在首頁當中。

https://ithelp.ithome.com.tw/upload/images/20230929/201404778cAL58QYqA.png

接著我們點擊執行 DAG ,按鈕是位於右邊的 Action 中的三角形。點擊執行 DAG 之後,我們的 DAG 便會按照我們所定義的執行。執行完之後,成功的話會出現綠色的圈圈。我們點擊進去之後就會看到整個 DAG 的執行流程。

https://ithelp.ithome.com.tw/upload/images/20230929/20140477tYzNd6IlC6.png

上圖中我們可以看到我們定義的三個 Task 順利的執行,並且也按照我們定義的順序排列且執行。
點擊進去 Task 後會出現許多細部選項,我們以 extract 這個 task 為例,點擊進去之後可以看到下圖。
https://ithelp.ithome.com.tw/upload/images/20230929/20140477hupLGcBqhI.png

在頁面的中間點選 Log 選項,系統就會顯示執行的過程紀錄,我們可以看看我們所定義的 print("extract from data source") 是否如我們所預期的執行。

https://ithelp.ithome.com.tw/upload/images/20230929/20140477plHUVmTud7.png

有的!在中間的 log 我們有看到 "extract from data source" 的訊息,代表我們所定義的 Task 工作有成功的被執行!各位也可以進入每個 task 中查看所執行的內容。

恭喜各位,我們成功的開發文我們第一個 DAG 了!
雖然功能非常陽春,但是我們學習到了基本的 Airflow 的概念,以及如何操作和使用這個威力強大的工具。再下來就讓我們一步步探索 Airflow 的各個元件以及其開發方式吧!


上一篇
『Day13』安裝 Airflow
下一篇
『Day15』如何使用 DAG , Task
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言