連接到 Database ,應該是 Airflow 當中一定會用到的功能,畢竟我們的資料進出都需要操作資料庫,在 dag 當中是用到之前提過的MySqlOperator
或是PostgresOperator
,依照不同資料庫選用不同的 Operator,但是!!!之前提過了 Taskflow API,不想要舊方法和新方法混著寫,有沒有不需要 Operator 就能連接 Database 的方法 ?? 有的就是使用 Hook (不是早安午安晚安那個 Youtuber 哦) ! 等等會先學三種連接 Database 方法,接著就會在 dag 當中用 Hook 實作,讓你的 DAG 程式碼依舊很簡潔
我根本就沒有資料庫,要怎麼練習和實作,錯了,你有的,還記得昨天建立環境有初始化資料庫嗎(docker compose up airflow-init
),雖然我之前有說不要用這個 metadata 資料庫,但如果你只是為了「練習連接資料庫」,還是可以偷偷用一下的,不然帶著大家 step by step 建立資料庫,可能又要多一天了😂
設定連接 port,兩個 5432,代表 docker 容器外和容器內的 port
重新 Build Postgres 的服務
docker-compose up -d --no-deps --build postgres
在 Airflow 網頁介面,選擇 Admin > Connections,來進入 Connection 管理介面,點擊 + 新增一個新的連接,我們會設置一個 Conn Id 來表示這個連結。
docker ps
docker exec -it <container_id> bash
612591b32ad4
進入容器之後,就像是平常的終端機 terminal 一樣,如果要離開,打 exit
就可以了
airflow connections add 'local-db-cli' \
--conn-uri 'postgres://airflow:airflow@host.docker.internal:5432/postgres'
有發現其實跟方法一樣嗎?其實就只是進入docker容器設定,沒有 Web UI 介面可以用,我們打開 Web UI 就會發現其實完全是同一種方法
x-airflow-common:
xxx
environment:
xxx
AIRFLOW__API__AUTH_BACKENDS:
AIRFLOW_CONN_LOCAL_DB=
'postgres://airflow:airflow@host.docker.internal:5432/postgres'
把要連接的db資訊放在 docker-compose YAML 檔案,放在environment區塊下方就可以了
docker-compose up -d
法三是比較推薦的方式,不會因為清空容器就要重新設定連接,但還是盡量不要直接像上面一樣把帳號密碼放在設定中
成功連接 db 之後就趕緊實作一個 DAG 來看看能不能下 sql 來取得資料囉~
Taskflow 的 @dag
和 @task
應該都很熟悉了,可以看一下怎麼用 PostgresHook
來連接,比用 Operator 簡單多了~
import json
from airflow.decorators import dag, task
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook
@dag(schedule_interval=None, start_date=datetime(2023, 10, 2))
def conn_db_run():
@task
def conn_local_db():
hook = PostgresHook(postgres_conn_id="localhost-db")
# postgres_conn_id => 法一:localhost-db、法二:local-db-cli、法三:local_db
connection = hook.get_conn()
cursor = connection.cursor()
sql_query = """
SELECT * FROM pg_catalog.pg_tables;
"""
cursor.execute(sql_query)
result = cursor.fetchall()
cursor.close()
connection.close()
return(json.dumps(result))
conn_local_db()
conn_db_run()
SELECT * FROM pg_catalog.pg_tables;
代表列出所有的 tables看到 return 有資料代表成功連接 db 囉~