我們延續 Day 14 提到的例子:從 Google Maps API 獲取資料,清理轉換後寫入 BigQuery 裡。資料處理的過程中需要運用以下套件:
pandas
requests
google-auth-oauthlid
google-api-python-client
google-cloud-storage
我們利用 poetry 進行套件管理,步驟也很簡單:
# 初始化專案
poetry new airflow-project
# 進入專案資料夾
cd airflow-project
# 安裝相依性套件
poetry add pandas
poetry add requests
poetry add google-auth-oauthlib
poetry add google-api-python-client
poetry add google-cloud-storage
因為套件相對單純,我們先不指定版本號,讓 Poetry 會根據每個套件的相依性需求,解析出適當的套件版本,並記錄在 poetry.lock 檔案中。我們的專案依然會鎖定使用的套件版本,以避免因為不同的環境而發生版本衝突。
此外,雖然我們在 pyproject.toml 中不會明確指定版本號,但 Poetry 會在生成的 poetry.lock 文件中鎖定所有具體版本,確保每次在其他環境中安裝時,使用的是相同的版本組合。
映像檔是分層構建,每一層可以基於前一層進行功能疊加。我從 Airflow 的 Docker Hub 上找到了 apache/airflow:2.7.2-python3.10 這個 image,表是以它建立出來的環境中,Airflow 的版本是 2.7.2,Python 則是 3.10,就從他為基底,逐步建構我們的應用程式環境吧!
編寫 Dockerfile
如下:
# 使用 Airflow 的 Docker Hub 上的基底 image
FROM apache/airflow:2.7.2-python3.10
# 安裝 Poetry
RUN pip install poetry
# 設置工作目錄
WORKDIR /opt/airflow
# 複製 Poetry 檔案到容器內
COPY pyproject.toml poetry.lock /opt/airflow/
# 安裝相依性 (注意環境設置)
RUN poetry config virtualenvs.create false \
&& poetry install --no-dev --no-interaction --no-ansi
# 複製 dags、plugins 目錄
COPY dags/ /opt/airflow/dags/
COPY plugins/ /opt/airflow/plugins/
# 初始化環境變數
ENV AIRFLOW_HOME=/opt/airflow
# 啟動 Airflow
ENTRYPOINT ["/usr/bin/dumb-init", "--"]
CMD ["bash", "-c", "airflow db init && airflow webserver & airflow scheduler"]
這個 Dockerfile
從基底 image 出發,把我們先前創建出來的 pyproject.toml 和 poetry.lock 以及相關的工作流程 DAGs 和可能用到的 plugins (例如:客製化的 Operator) 掛載 (mount) 進容器內,最後初始化並啟動。
根據 開發者筆記 | 避免 Airflow DAG 無法順利啟動的幾個原則 這篇,Airflow 架構中有幾個常用的元件,包含 Worker, Scheduler, Webserver, Metadata Database 等。因此我們使用 Docker Compose 來管理多個 Airflow 元件服務。
編寫 docker-compose.yaml
文件如下:
version: '3'
services:
airflow:
build: .
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"
command: bash -c "airflow db init && airflow webserver & airflow scheduler"
depends_on:
- postgres
worker: # Airflow Worker
...
scheduler: # Airflow Scheduler
...
webserver: # Airflow Webserver
...
postgres: # Postgres for Airflow Metadata Database
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
上面的設定檔簡言之,就是定義每個元件容器的建構方式。完成後,我們利用以下指令就可以完成 Airflow 環境的啟動了!
# 建立映像檔
docker-compose build .
# 在背景啟動 docker-compose
docker-compose up -d
可以通過 docker ps
看到 Airflow 的 Scheduler、Worker 和 Webserver,以及 Metadata database 是否正常運行。如果有任何容器未啟動或出現問題, STATUS
欄位會顯示 Exited
或者 Restarting
,就可以進一步確認錯誤。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a1b2c3d4e5f6 custom_airflow_poetry:latest "airflow webserver" 2 minutes ago Up 2 minutes 0.0.0.0:8080->8080/tcp airflow_webserver
f6e5d4c3b2a1 custom_airflow_poetry:latest "airflow scheduler" 2 minutes ago Up 2 minutes airflow_scheduler
1a2b3c4d5e6f custom_airflow_poetry:latest "celery worker" 2 minutes ago Up 2 minutes airflow_worker
6e5f4d3c2b1a postgres:13 "docker-entrypoint..." 2 minutes ago Up 2 minutes 5432/tcp postgres
當服務啟動完成後,你可以透過用瀏覽器造訪 http://localhost:8080 看見 Airflow 的網頁介面。
圖/Airflow UI 的登入畫面,用預設帳密即可登入。
/airflow-project
│
├── dags/ # Airflow DAGs (工作流程)
│ └── example_dag.py
│
├── plugins/ # Airflow Plugins (插件)
│ └── custom_plugin.py
│
├── logs/ # Airflow 日誌 (自動生成)
│
├── Dockerfile # Dockerfile 用於自定義 Airflow 映像檔
│
├── docker-compose.yaml # Docker Compose 文件,用於管理服務
│
├── pyproject.toml # Poetry 管理的相依性檔案
│
├── poetry.lock # Poetry 鎖定檔案,確保相依性版本一致
│
└── .env # 環境變數設定
以上就是 Airflow 專案開發初步建置作業。為了讓排程任務順利運行,把排程應用程式所需要的套件 (poetry)、變數 (.env) 及 Airflow 元件 (docker-compose.yaml) 等,透過這些檔案的編寫建構而成。