iT邦幫忙

2023 iThome 鐵人賽

DAY 8
1

ETL 大概有八成的時間,都是在跟各種儲存系統互動。不管是常見的資料庫、BigQuery,或是 Redis, Hadoop,這些本質上都是儲存系統,差別只是連接方法跟存取接口。

這些通常都會需要安裝 driver,給定 host、帳密等等資料才能連接。在Airflow中,Connection是一個關鍵的概念,它允許您配置與外部系統的連接信息,這樣,您可以輕鬆地在DAG中引用這些連接,而不必在程式中明文儲存敏感信息。

你當然不見得要用它,但為了避免密碼外露或是被存在 git 內,你可能會需要另外將設定檔加密,或是在部署到正式環境時設定正式 DB 的帳密,增加了部署難度。

下面是如何在Airflow中配置並使用PostgreSQL連接的示例:

  1. 首先,打開Airflow的Web界面,然後轉到Admin -> Connections。
  2. 在Connections頁面,您可以看到現有的連接或添加新連接。點擊"Create"按鈕以添加新連接。
  3. 在添加連接的表單中,輸入以下信息:
    • Conn Id:這是連接的唯一ID,您可以自己定義,例如,"my_postgres_conn"。
    • Conn Type:選擇"PostgreSQL"。
    • Host:PostgreSQL數據庫的主機名或IP地址。
    • Schema:數據庫模式(如果需要)。
    • Login:您的PostgreSQL用戶名。
    • Password:您的PostgreSQL密碼。
    • Port:PostgreSQL服務的端口(通常是5432)。
  4. 點擊"Save"以保存連接。

Untitled

現在,您已經配置了一個名為"my_postgres_conn"的PostgreSQL連接。接下來,您可以在您的DAG中使用這個連接來執行PostgreSQL操作。以下是一個簡單的示例:

pythonCopy code
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

# 定義您的DAG
dag = DAG(
    'example_postgres_dag',
    start_date=datetime(2023, 9, 7),
    schedule_interval=None,  # 這個DAG不會自動運行,需要手動觸發
)

# 定義一個PostgreSQL操作
create_table_task = PostgresOperator(
    task_id='create_table',
    sql='CREATE TABLE IF NOT EXISTS my_table (id serial PRIMARY KEY, data text);',
    postgres_conn_id='my_postgres_conn',  # 使用之前配置的連接
    dag=dag,
)

# 添加更多的操作(根據您的需求)

# 指定DAG的執行順序
create_table_task

在這個示例中,我們首先定義了一個PostgreSQL連接("my_postgres_conn"),然後使用PostgresOperator執行了一個SQL語句來創建一個名為"my_table"的數據表。

上面的範例如果 SQL 是 select 相關的話,資料會被送到 XCom 去,這部份之後會再聊,先讓我們繼續聊 connection。這個例子是包裝成 Operator,但如果我們要在同一個 task 內使用 connection 的話要怎麼辦?

通常我們會使用 Hook 來產生連線:

def extract_data():
    hook = PostgresHook(postgres_conn_id=postgres_conn_id)
    connection = hook.get_conn()
    cursor = connection.cursor()
    
    # 執行SQL查詢
    sql_query = "SELECT * FROM my_table;"
    cursor.execute(sql_query)
    
    # 提取結果
    result = cursor.fetchall()
    
    # 關閉連接
    cursor.close()
    connection.close()
    
    return result

在需要 insert 或是有一些特殊操作想保持 task 的原子性時,如此就可解決。


上一篇
Airflow Variable 設定 - Day7
下一篇
Airflow 自訂 Operator - Day9
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言