iT邦幫忙

2023 iThome 鐵人賽

DAY 17
0
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 17

[Day17] Airflow 連接到 Database 的三種方法

  • 分享至 

  • xImage
  •  

前言

連接到 Database ,應該是 Airflow 當中一定會用到的功能,畢竟我們的資料進出都需要操作資料庫,在 dag 當中是用到之前提過的MySqlOperator或是PostgresOperator,依照不同資料庫選用不同的 Operator,但是!!!之前提過了 Taskflow API,不想要舊方法和新方法混著寫,有沒有不需要 Operator 就能連接 Database 的方法 ?? 有的/images/emoticon/emoticon30.gif就是使用 Hook (不是早安午安晚安那個 Youtuber 哦) ! 等等會先學三種連接 Database 方法,接著就會在 dag 當中用 Hook 實作,讓你的 DAG 程式碼依舊很簡潔/images/emoticon/emoticon34.gif

Before Connections

我根本就沒有資料庫,要怎麼練習和實作,錯了,你有的,還記得昨天建立環境有初始化資料庫嗎(docker compose up airflow-init),雖然我之前有說不要用這個 metadata 資料庫,但如果你只是為了「練習連接資料庫」,還是可以偷偷用一下的/images/emoticon/emoticon01.gif,不然帶著大家 step by step 建立資料庫,可能又要多一天了😂

修改 Postgres 設定

  • 設定連接 port,兩個 5432,代表 docker 容器外和容器內的 port
    https://ithelp.ithome.com.tw/upload/images/20231002/20135427TDeEEi3YzW.png

  • 重新 Build Postgres 的服務

docker-compose up -d --no-deps --build postgres

https://ithelp.ithome.com.tw/upload/images/20231002/20135427eRmsu3wEhE.png

方法一:Web UI 新增 Connections

在 Airflow 網頁介面,選擇 Admin > Connections,來進入 Connection 管理介面,點擊 + 新增一個新的連接,我們會設置一個 Conn Id 來表示這個連結。
https://ithelp.ithome.com.tw/upload/images/20231002/20135427HY4sQcIhj1.png

設定

  • Connection Id(要在airflow中使用的id): localhost-db (可以自己設定)
  • HOST(連接的主機): host.docker.internal
  • Connection Type(連接的資料庫類型): Postgres
  • Schema(連接的db名稱): postgres
  • Login 和 Password 都是 airflow
    https://ithelp.ithome.com.tw/upload/images/20231002/201354273InlHO6oFg.png
    設定完按save即可,想要測試可以直接跳到實作時間~

方法二:使用 Airflow CLI 設定

進入 docker 容器設定 connection

docker ps

https://ithelp.ithome.com.tw/upload/images/20231002/20135427LPNcMHge3v.png

docker exec -it <container_id> bash
  • -t : 是啟動終端
  • -i : 是允許進行互動
  • 我這邊的範例就是上方紅框的612591b32ad4

進入容器之後,就像是平常的終端機 terminal 一樣,如果要離開,打 exit 就可以了

airflow connections add 'local-db-cli' \
--conn-uri 'postgres://airflow:airflow@host.docker.internal:5432/postgres'

有發現其實跟方法一樣嗎?其實就只是進入docker容器設定,沒有 Web UI 介面可以用,我們打開 Web UI 就會發現其實完全是同一種方法/images/emoticon/emoticon01.gif
https://ithelp.ithome.com.tw/upload/images/20231002/2013542742RkFk6Us4.png

方法三:Docker Compose 設定

x-airflow-common:
    xxx
    environment:
        xxx
        AIRFLOW__API__AUTH_BACKENDS:
        AIRFLOW_CONN_LOCAL_DB=
                'postgres://airflow:airflow@host.docker.internal:5432/postgres'

把要連接的db資訊放在 docker-compose YAML 檔案,放在environment區塊下方就可以了

  • 修改完設定,重新創建並啟動 Container
docker-compose up -d

法三是比較推薦的方式,不會因為清空容器就要重新設定連接,但還是盡量不要直接像上面一樣把帳號密碼放在設定中
成功連接 db 之後就趕緊實作一個 DAG 來看看能不能下 sql 來取得資料囉~

實作時間

Taskflow 的 @dag@task 應該都很熟悉了,可以看一下怎麼用 PostgresHook 來連接,比用 Operator 簡單多了~

import json
from airflow.decorators import dag, task
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook

@dag(schedule_interval=None, start_date=datetime(2023, 10, 2))
def conn_db_run():
    @task
    def conn_local_db():
        hook = PostgresHook(postgres_conn_id="localhost-db") 
        # postgres_conn_id => 法一:localhost-db、法二:local-db-cli、法三:local_db
        connection = hook.get_conn()
        cursor = connection.cursor()
        sql_query = """
            SELECT * FROM pg_catalog.pg_tables;
            """
        cursor.execute(sql_query)
        result = cursor.fetchall()
        cursor.close()
        connection.close()
        return(json.dumps(result))

    conn_local_db()

conn_db_run()
  • SELECT * FROM pg_catalog.pg_tables; 代表列出所有的 tables

https://ithelp.ithome.com.tw/upload/images/20231002/20135427eq1yFFevPp.png

看到 return 有資料代表成功連接 db 囉~


上一篇
[Day16] 用 Docker Compose 建立 Airflow 環境
下一篇
[day18] 急!在線等!求解20 點!Airflow 安裝 Python 模組
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言