iT邦幫忙

2023 iThome 鐵人賽

DAY 23
0

還記得我們第一次寫的 first_dag 嗎?當時只是基本的寫了一個 DAG 的雛形,在經過這幾天更了解 DAG 各個元件之後,大家是不是覺得躍躍欲試呢,相信大家一定很想要把這些功能強的大元件使用在我們的 Data pipeline 中吧!今天我們就來實作一個實際應用的 Data pipeline!

建置天氣資料 Data pipeline

這次我們練習建立一個天氣資訊的Data pipeline,Data source 主要使用氣象局提供的 API ,預期每天早上透過氣象局的 API 取得相關資訊並儲存至資料庫中。

設計 Data pipeline

還記得在 如何設計 Data pipeline 中,我們有提到在設計 Data pipeline 需要考量的幾個面向:

  • 了解目的
  • 資料量分析
  • 資料的型態
  • 資料處理速度要求
    回憶起這些要素之後,讓我們稍微來對這次要做的 DAG 進行分析。

了解目的

在這個 Data pipeline 當中我們主要要處理氣象資料的搜集。我們期望在每天的早上搜集當時的並記錄。預期未來可以拿來做氣象警報,或是機器學習模組的訓練資料。但目前只先把資料儲存做資料的查詢。

資料的型態

在氣象局的API 中有提供 JSON 以及 XML 兩種資料格式可以選擇,目前已處理方便程度我們選擇 JSON 作為我們處理資料的型態。

資料量的分析

目前經過幾次的測試,回傳的資料量會在 10KB 以下,對於 RAM 不會佔用太大的空間。如果有些資料的數量過大時,會建議擷取下來之後息安儲存到暫存的硬體空間。下一步驟時再透過暫存的空間持續處理。

資料處理速度要求

目前對於速度沒有很即時的要求,只希望於程式在我起床之前成功跑完,讓我起床之後可以針對氣候資訊規劃相關事宜。因此我們可以以一般的單線執行方法處理。

實作

進入到實踐部分,在建立 Data pipeline 之前首先我們會需要有氣象局API 的授權碼,用以呼叫氣象局 API,我們可以氣象局網站進行申請。
申請帳號參考網址:
https://pweb.cwa.gov.tw/emember/register/authorization

申請完帳號之後可以在API 授權碼頁面中取得授權碼,此授權碼可以用來呼叫氣象局的 API。
取得授權碼參考網址:
https://opendata.cwa.gov.tw/user/authkey

https://ithelp.ithome.com.tw/upload/images/20231008/20140477MjpSilCFBo.png

接下來我們可以查看氣象局所開放的 API 種類,選擇想要使用的 API 。
參考網址:
https://opendata.cwb.gov.tw/dist/opendata-swagger.html#/

設定環境變數

在瞭解了我們想要取得資料的 Data Source 以及方式之後,我們可以把相關會用到的參數存入Variable 中管理。
而資料會存入的資料庫位置也可以存入 Connection 管理。

定義 DAG 流程

接著我們首先針對 DAG 整個大架構的流程打個底,這個 weather DAG 一樣會是 Extract - Transform - Load (ETL)架構的 Data pipeline,所以我們先把 Task 以及關聯關係定義出來。

from airflow import DAG
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
import requests
import json

# 定義 DAG
dag = DAG(
    'weather_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval="@daily",  
    catchup=False,
)

# 抓取資料
def extract_data():
    pass

# 轉換資料
def transform_data(**kwargs):
    pass

# 定義函數,用於將資料插入到 MySQL 資料庫
def load_to_mysql(**kwargs):
    pass

# 執行資料抓取
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

# 執行資料轉換
transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,  # 启用上下文传递
    dag=dag,
)

# 執行插入資料到 MySQL
load_task = PythonOperator(
    task_id='load_to_mysql',
    python_callable=load_to_mysql,
    provide_context=True,  # 启用上下文传递
    dag=dag,
)

# 定義任務之間的依賴關係
extract_task >> transform_task >> load_task

在上面的程式碼中,我們定義了 DAG 基本的雛形,接下來我們要針對 Task 細部的功能進行開發。
為了讓大家有個好的連假過,我們把這個 DAG的建置分成兩天進行說明,祝大家雙十連假玩得開心,今天先講到這邊囉,我們明天繼續!


上一篇
『Day22』Operator 介紹
下一篇
『Day24』來做個天氣資訊 DAG 吧 (下)
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
hlb
iT邦新手 5 級 ‧ 2023-11-05 16:38:38

airflow.operators.python_operator 已經棄用了,可以直接改用 airflow.operators.python 喔。

https://airflow.apache.org/docs/apache-airflow/2.0.0/_api/airflow/operators/python_operator/index.html

感謝提醒!

我要留言

立即登入留言