ETL 大概有八成的時間,都是在跟各種儲存系統互動。不管是常見的資料庫、BigQuery,或是 Redis, Hadoop,這些本質上都是儲存系統,差別只是連接方法跟存取接口。
這些通常都會需要安裝 driver,給定 host、帳密等等資料才能連接。在Airflow中,Connection是一個關鍵的概念,它允許您配置與外部系統的連接信息,這樣,您可以輕鬆地在DAG中引用這些連接,而不必在程式中明文儲存敏感信息。
你當然不見得要用它,但為了避免密碼外露或是被存在 git 內,你可能會需要另外將設定檔加密,或是在部署到正式環境時設定正式 DB 的帳密,增加了部署難度。
下面是如何在Airflow中配置並使用PostgreSQL連接的示例:
現在,您已經配置了一個名為"my_postgres_conn"的PostgreSQL連接。接下來,您可以在您的DAG中使用這個連接來執行PostgreSQL操作。以下是一個簡單的示例:
pythonCopy code
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
# 定義您的DAG
dag = DAG(
'example_postgres_dag',
start_date=datetime(2023, 9, 7),
schedule_interval=None, # 這個DAG不會自動運行,需要手動觸發
)
# 定義一個PostgreSQL操作
create_table_task = PostgresOperator(
task_id='create_table',
sql='CREATE TABLE IF NOT EXISTS my_table (id serial PRIMARY KEY, data text);',
postgres_conn_id='my_postgres_conn', # 使用之前配置的連接
dag=dag,
)
# 添加更多的操作(根據您的需求)
# 指定DAG的執行順序
create_table_task
在這個示例中,我們首先定義了一個PostgreSQL連接("my_postgres_conn"),然後使用PostgresOperator執行了一個SQL語句來創建一個名為"my_table"的數據表。
上面的範例如果 SQL 是 select 相關的話,資料會被送到 XCom 去,這部份之後會再聊,先讓我們繼續聊 connection。這個例子是包裝成 Operator,但如果我們要在同一個 task 內使用 connection 的話要怎麼辦?
通常我們會使用 Hook
來產生連線:
def extract_data():
hook = PostgresHook(postgres_conn_id=postgres_conn_id)
connection = hook.get_conn()
cursor = connection.cursor()
# 執行SQL查詢
sql_query = "SELECT * FROM my_table;"
cursor.execute(sql_query)
# 提取結果
result = cursor.fetchall()
# 關閉連接
cursor.close()
connection.close()
return result
在需要 insert 或是有一些特殊操作想保持 task 的原子性時,如此就可解決。