iT邦幫忙

2023 iThome 鐵人賽

DAY 24
0

昨天我們建立了天氣資訊 DAG 的基本架構和流程。今天我們來繼續實作細部的 Task 工作。
首先我們從 Extract Function 進行實作。

實作 Extract Function

在擷取資料的 Task 中,我們希望從氣象 API 中取得當下天氣資訊,在 API 列表中我們選擇"v1/rest/datastore/O-A0003-001 現在天氣觀測報告-現在天氣觀測報告"這隻 API,點擊之後我們可以看到API 資訊。
https://ithelp.ithome.com.tw/upload/images/20231009/20140477o6mbHIl0QQ.png

在這邊的 StationId 我們選擇臺北的觀測站作為範例,觀察下圖所給的 id 為 466920
https://ithelp.ithome.com.tw/upload/images/20231009/20140477gGfyjQFIkY.png

我們可以測試 API 觀看他的呼叫方法以及回傳格式
https://ithelp.ithome.com.tw/upload/images/20231009/201404777cDoT0wiFl.png

我們可以看到這個 GET API 的呼叫範例,接著將API 位置,及相關參數儲存至 Variable。這邊儲存 station_id、weather_api以及weather_auth_token。
https://ithelp.ithome.com.tw/upload/images/20231009/20140477JoocTZntPd.png

接著我們可以實作一版 Extract Task。將回傳的資訊只截取我們需要的 "TEMP", "HUMD", "Weather", "obsTime" 值。

# 定義所需之資料
weather_data_col = ["TEMP", "HUMD", "Weather", "obsTime"]
# 抓取資料
def extract_data():
    station_id = Variable.get("station_id")
    weather_api = Variable.get("weather_api")
    auth_token = Variable.get("weather_auth_token")
    url = f"{weather_api}?Authorization={auth_token}&stationId={station_id}"
    response = requests.get(url).json()
    weather_data = ["" for i in range(len(weather_data_col))]
    for data in response["records"]["location"][0]["weatherElement"]:
        if data["elementName"] in weather_data_col:
            weather_data[weather_data_col.index(data["elementName"])] = data["elementValue"]
    # 取得時間
    weather_data[weather_data_col.index("obsTime")] = response["records"]["location"][0]["time"]["obsTime"]     
    print(weather_data)  
    return weather_data

執行看看之後看看 Extract Task 的 log ,程式正確被執行,成功截取所需之資訊。
https://ithelp.ithome.com.tw/upload/images/20231009/20140477jbSSP7uaRU.png

擷取完之後我們需要將特定資料進行轉換,接著我們來開發 Transform Task。

實作 Transform Task

由於我們取得到濕度是0~1 表示的浮點數,我們想要把它變成百分比的百分比數。因此會針對"HUMD"欄位作轉換。

# 轉換資料
def transform_data(**kwargs):
    ti = kwargs['ti']
    weather_data = list(ti.xcom_pull(task_ids='extract_data'))
    weather_data[weather_data_col.index("HUMD")] = str(float(weather_data[weather_data_col.index("HUMD")]) * 100)
    return weather_data

透過上面的程式我們首先透過 XCom 取得 extract_data 回傳的值,並將 HUMD 進行轉換。轉換資料完成之後,就是最後把資料儲存進資料庫的步驟了。

實作 Load Task

在實作 Load Task 之前,我們要先設定 MySQL ,創建相關 Table。並在 Variable 中加入相關 Table。您可以參考以下的 SQL 來 創建 Table

CREATE TABLE `weather` (
  `TEMP` float DEFAULT NULL,
  `HUMD` float DEFAULT NULL,
  `Weather` varchar(255) DEFAULT NULL,
  `obsTime` datetime DEFAULT NULL
);

創建之後將 Schema 以及 Table 名稱儲存至 Variable,在這邊我命名為 weather_schema、weather_table
https://ithelp.ithome.com.tw/upload/images/20231009/201404776ZWp1DqYF0.png

完成之後我們就可以在 Task 中使用相關設定了!我們透過之前學過的 Hook 實作 MySQL 的 INSERT。在這邊要先import MySqlHook 再進行開發。

from airflow.providers.mysql.hooks.mysql import MySqlHook

# 將資料插入到 MySQL 資料庫
def load_to_mysql(**kwargs):
    ti = kwargs['ti']
    weather_data = list(ti.xcom_pull(task_ids='transform_data'))
    mysql_schema = Variable.get("weather_schema")
    weather_table = Variable.get('weather_table')  
    col_string = ",".join(weather_data_col)

    sql = f"INSERT INTO {mysql_schema}.{weather_table} ({col_string}) VALUES (%s, %s, %s, %s)"
    mysql_hook = MySqlHook(mysql_conn_id='mysql_conn')  
    mysql_hook.run(sql, parameters=weather_data)

編寫完成儲存之後,我們就來執行我們的 DAG 看看了!

https://ithelp.ithome.com.tw/upload/images/20231009/20140477cr8tlhvbCC.png

DAG 順利執行成功,資料也成功進入到 MySQL中!
學習了 Airflow 許多功能強大的元件後,可以讓我們很有效率地進行DAG 的開發。這兩天實作完 Data pipeline,相信大家有更抓的 Airflow 的開發吧!


上一篇
『Day23』來做個天氣資訊 DAG 吧 (上)
下一篇
『Day25』使用 View 來監控執行狀況
系列文
Data pipeline 建起來!用 Airflow 開發你的 Data pipeline30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言