到目前為止我們了解,DAG 如何建立,如何設定關聯關係,如何傳遞變數。而在Data pipeline 當中還有很重要的一個部分是與外部系統溝通。 Airflow Data pipeline的使用情境經常會是控制並整合不同系統的工作及流程。讓我們一起來看看 Airflow 如何處理外部的連接。
首先我們來看看 Connection 模組,Connection 顧名思義就是處理對於外部連線資訊的一個模組。 每個 Connection 紀錄了一個對於外部的連線資訊。我們可以透過 Airflow 的管理介面進行設定。
像是 Variable的設定方法一樣,我們點擊 Airflow最上方的 Admin ,之後點擊 Connection 進入到 Connection 頁面。在 Connnection 頁面中我們點擊增加的按鈕可以增加 Connection。
在設定頁面中我們把連接資訊填入各個欄位,比如說我要設定 MySQL 連線 ,首先填寫 Connection id 之後,在Connection Type 中選擇 MySQL,之後填入相關 Host 位置及帳號密碼。
在這邊也可以先點擊 Test 做連線測試,如果連線成功 Airflow 會於上方顯示成功的提示。設定完成之後點選 Save 按鈕即可儲存。
回到 Connection 列表頁面就可以看到剛剛所輸入的 Connection 資訊了!
設定完成之後我們便可以從 DAG 程式當中取得 Connection 使用。
Connection 中儲存了外部的連線資訊,而透過 Hook 則可使用 Connection 的資訊時記得與外部交互,達到與外部溝通的目的。Hook 是一個 Python 類別,Airflow 內部提供多種類型的 Hook給予開發者使用。像是 HttpHook,MySQLHook、S3Hook 等等,方便開發者進行開發。
首先要 import MySqlHook,並在建立 MySqlHook 帶入想要聯繫的 MySql connection id, 在這邊我們輸入我們在上半部輸入的 Connection -- mysql_conn'。之後便可以以 get_records 方法執行 SQL 並取得返回值。
from airflow.providers.mysql.hooks.mysql import MySqlHook
# 建立 MySQLHook
mysql_hook = MySqlHook(mysql_conn_id='mysql_conn')
# 定義查詢 SQL
sql_query = "SELECT * FROM my_table"
# 執行查詢指令
results = mysql_hook.get_records(sql_query)
# 輸出結果
for row in results:
print(row)
除了 MySql Hook 之外,Airflow 提供許多服務的 Hook,這邊列出一些常用的服務作為參考。
如果有服務需要自己進行改動,對 Hook 進行客製化的話,可以參考以下官方文件。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/hooks/index.html
了解了 Connection 以及 Hook 我們可以很方便地進行對外部系統的溝通!