今天要用最簡單的架構完成 Jaffle Shop 的 PoC(Proof of Concept,概念驗證),主要目的是讓大家能在自己的本機成功運行 Jaffle Shop 的 Cosmos 專案,進而熟悉 dbt 和 Airflow 的整合方式~
題外話:PoC、MVP、Prototype 的差異是什麼?
- PoC:主要是測試想法的可行性,可能只需要數天
- prototype:主要是展示UI/UX 或特定功能,可能需要數週
- MVP,minimum viable product 最小可行性產品,可能需要幾個月,基本上已經是半成品,主要是要測試市場的接受度
流程:PoC => Prototype => MVP
git clone https://github.com/snhou/astronomer-cosmos-poc.git
cd astronomer-cosmos-poc
還是會希望開啟虛擬環境是因為在開發專案過程中,會用到一些 vscode 的插件或是測試,而這部分不一定都會進到正在運行中的 docker 去執行,其中一個範例就是明天會提到的 dbt 必備 vscode 插件 dbt Power User,基本上插件很難運用到 docker 環境,然後避免本機有多個環境,還是用虛擬環境最方便。
如果有人知道 dbt Power User 可以用 docker 環境的,拜託跟我說
python3 -m venv venv
source venv/bin/activate
python3 -m pip install -r requirements.txt
執行之前記得打開 Docker Desktop!!
docker compose build
如果過程中有修改 Dockerfile,建議加上
--no-cache
參數來重新建立
docker compose up -d
-d
代表要運行在背景
docker ps
確認是否正常運行,都有看到 (healthy)
代表都沒問題!理論上,我們可以直接進到下一步開始執行 airflow 的 DAG,但既然前幾天有談到 dbt 的運行流程,進到 Docker 容器當中,除了可以複習一下,也能確定環境都沒問題
dbt debug
確認連線資訊dbt debug
這部分為了讓 git 的部分不會有 error,可以在 Dockerfile 中看到有特別切換成 ROOT 來安裝 git
dbt seed
載入靜態檔案dbt seed
dbt run
執行 models/
當中的 sql 轉換dbt run
4. dbt test
執行 models/
當中 schema.yml 的資料測試
dbt test
開啟 localhost:8080
直接執行 simple_task_group
DAG
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig
from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config
@dag(
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False
)
def simple_task_group():
pre_dbt = EmptyOperator(task_id="pre_dbt")
jaffle_shop = DbtTaskGroup(
group_id="my_jaffle_shop_project",
project_config=ProjectConfig(jaffle_shop_path),
profile_config=airflow_db,
execution_config=venv_execution_config
)
post_dbt = EmptyOperator(task_id="post_dbt")
pre_dbt >> jaffle_shop >> post_dbt
simple_task_group()
group_id
: 這個 TaskGroup 的 ID。project_config
: 指定 dbt 專案的路徑。profile_config
: 使用 airflow_db 的設定來連接資料庫。execution_config
: 指定執行 dbt 時的路徑。include
放在 plugins
當中,而 plugins
在 docker 環境建置時就會載入,所以就可以直接用 from include.profiles import ...
和 from include.constants import ...
載入所需的變數和函式,如果檔案不是在 plugins
當中的話,會需要修改 docker 環境設定{Jaffle Shop}
├── dags
│ └── simple_task_group.py
├── dbt
│ └── jaffle_shop
│ ├── dbt_project.yml
│ ├── models
│ │ ├── marts
│ │ │ ├── orders.sql
│ │ │ └── schema.yml
│ │ └── staging
│ │ ├── schema.yml
│ │ ├── stg_orders.sql
│ │ └── stg_payments.sql
│ ├── profiles.yml
│ └── seeds
│ ├── raw_orders.csv
│ └── raw_payments.csv
├── Dockerfile
├── docker-compose.yaml
├── plugins
│ └── include
│ ├── constants.py
│ └── profiles.py
└── requirements.txt
從整個專案結構可以發現在 dbt/
資料夾中,可以建立多個 dbt 專案,再透過 dag 執行不同專案目錄,就能夠達成在同一專案中進行不同資料庫的 dbt 資料轉換,如果只是單純想要讓 dag 執行特定的 model,也可以透過 tag 來分類執行
在 dbt 專案當中的 profiles.yml
其實以 airflow 來說是不必要的,因為已經從 profiles.py
設定好了,但是如果要像上方進入到 docker 容器中做測試,profiles.yml
還是不可少的,如果不想讓連接設定在多個不同地方,也可以透過環境變數統一控制
在 profiles.py
當中使用的 conn_id="airflow_metadata_db"
,正常是會到 airflow Web UI 上設定,但這次用的方式是直接寫在 docker-compose.yml
來設定,這樣如果有多個 db 設定的話,可以確保 docker 環境開啟後就能順利連接,設定方式如下,只需要在 conn_id 名稱前面加上 AIRFLOW_CONN_
就可以了:
x-airflow-common:
&airflow-common
...
#image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.2}
build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
...
AIRFLOW_CONN_AIRFLOW_METADATA_DB: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
明天就會介紹 dbt Power User,這個用 dbt 的朋友都愛不釋手的工具~~