還記得我們第一次寫的 first_dag 嗎?當時只是基本的寫了一個 DAG 的雛形,在經過這幾天更了解 DAG 各個元件之後,大家是不是覺得躍躍欲試呢,相信大家一定很想要把這些功能強的大元件使用在我們的 Data pipeline 中吧!今天我們就來實作一個實際應用的 Data pipeline!
這次我們練習建立一個天氣資訊的Data pipeline,Data source 主要使用氣象局提供的 API ,預期每天早上透過氣象局的 API 取得相關資訊並儲存至資料庫中。
還記得在 如何設計 Data pipeline 中,我們有提到在設計 Data pipeline 需要考量的幾個面向:
在這個 Data pipeline 當中我們主要要處理氣象資料的搜集。我們期望在每天的早上搜集當時的並記錄。預期未來可以拿來做氣象警報,或是機器學習模組的訓練資料。但目前只先把資料儲存做資料的查詢。
在氣象局的API 中有提供 JSON 以及 XML 兩種資料格式可以選擇,目前已處理方便程度我們選擇 JSON 作為我們處理資料的型態。
目前經過幾次的測試,回傳的資料量會在 10KB 以下,對於 RAM 不會佔用太大的空間。如果有些資料的數量過大時,會建議擷取下來之後息安儲存到暫存的硬體空間。下一步驟時再透過暫存的空間持續處理。
目前對於速度沒有很即時的要求,只希望於程式在我起床之前成功跑完,讓我起床之後可以針對氣候資訊規劃相關事宜。因此我們可以以一般的單線執行方法處理。
進入到實踐部分,在建立 Data pipeline 之前首先我們會需要有氣象局API 的授權碼,用以呼叫氣象局 API,我們可以氣象局網站進行申請。
申請帳號參考網址:
https://pweb.cwa.gov.tw/emember/register/authorization
申請完帳號之後可以在API 授權碼頁面中取得授權碼,此授權碼可以用來呼叫氣象局的 API。
取得授權碼參考網址:
https://opendata.cwa.gov.tw/user/authkey
接下來我們可以查看氣象局所開放的 API 種類,選擇想要使用的 API 。
參考網址:
https://opendata.cwb.gov.tw/dist/opendata-swagger.html#/
在瞭解了我們想要取得資料的 Data Source 以及方式之後,我們可以把相關會用到的參數存入Variable 中管理。
而資料會存入的資料庫位置也可以存入 Connection 管理。
接著我們首先針對 DAG 整個大架構的流程打個底,這個 weather DAG 一樣會是 Extract - Transform - Load (ETL)架構的 Data pipeline,所以我們先把 Task 以及關聯關係定義出來。
from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
import requests
import json
# 定義 DAG
dag = DAG(
'weather_dag',
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
catchup=False,
)
# 抓取資料
def extract_data():
pass
# 轉換資料
def transform_data(**kwargs):
pass
# 定義函數,用於將資料插入到 MySQL 資料庫
def load_to_mysql(**kwargs):
pass
# 執行資料抓取
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)
# 執行資料轉換
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True, # 启用上下文传递
dag=dag,
)
# 執行插入資料到 MySQL
load_task = PythonOperator(
task_id='load_to_mysql',
python_callable=load_to_mysql,
provide_context=True, # 启用上下文传递
dag=dag,
)
# 定義任務之間的依賴關係
extract_task >> transform_task >> load_task
在上面的程式碼中,我們定義了 DAG 基本的雛形,接下來我們要針對 Task 細部的功能進行開發。
為了讓大家有個好的連假過,我們把這個 DAG的建置分成兩天進行說明,祝大家雙十連假玩得開心,今天先講到這邊囉,我們明天繼續!
airflow.operators.python_operator 已經棄用了,可以直接改用 airflow.operators.python 喔。
感謝提醒!