今天要來介紹在 MongoDB 當中該如何進行一對一以及一對多的關聯,雖然說 Mongo 是一個無關聯的資料庫,但有時為了方便,還是會稍微將資料做一些正規劃,而在 Mongo 當中一對一以及一對多,同樣是透過直接寫入 id 的方式來進行,另外今天我們也會介紹 lookup 這個操作符號,他可以幫我們拉出與指定條件相關連的資料
下方為本次的資料模型,可以看到在 HighRiskIntersection 當中我們將 statistic 欄位換成 statistic_id,並且不給予預設值,表示這個欄位在建立的時候一定要進行輸入,否則會引發錯誤,因此稍後我們在進行資料寫入的時候會先進行 statistics 的寫入,取得寫入的 id 後再進行 HighRiskIntersection 的寫入
from pydantic import BaseModel
from datetime import datetime
from bson.objectid import ObjectId
class HighRiskIntersection(BaseModel):
year: int
month: int
rank: int # 排名
jurisdiction: str # 轄區
intersection: str # 路口
statistics_id: ObjectId
accident_time_interval: str # 發生時間區間 (24 小時制)
cause: str # 主要肇因
city: str
created_time: datetime = datetime.now()
class Config:
arbitrary_types_allowed = True
class Statistics(BaseModel):
A1_amount: int
A2_amount: int
A2_injury: int # A2 受傷
A3_amount: int
total_amount: int
下方附上寫入資料的程式,可以看到我們這次建立了兩個 collection 物件,並且先進行 statistics 的寫入同時取得寫入後的 ID,接著才進行 Intersection 的寫入
import re
import os
import json
from pathlib import Path
from dotenv import load_dotenv
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo.mongo_client import MongoClient
from schema import Statistics, HighRiskIntersection
# 讀取 .env 取得連線資訊
BASE_DIR = Path(__file__).parent.parent
load_dotenv(str(BASE_DIR / ".env"))
# 取得所有 datas 檔案路徑下的所有檔案
file_names = []
for root, dirs, files in os.walk(BASE_DIR / "datas"):
for file in files:
file_path = os.path.join(root, file)
file_names.append(file_path)
# 建立 client 並與 db、collection 進行連線
client = MongoClient(host=os.getenv("MONGODB_ATLAS_URL"))
database = Database(client=client, name="HighRiskIntersection")
intersection_collection = Collection(database=database, name="Intersection")
statistics_collection = Collection(database=database, name="Statistics")
# 依照檔案路徑將所有的資料打開並寫入資料庫
for file in file_names:
with open(file=file, encoding="utf-8") as f:
datas = json.load(fp=f)
for data in datas:
# 建立 Statistics 物件
statistics = Statistics(
A1_amount=data.get("A1", 0),
A2_amount=data.get("A2件數", 0),
A2_injury=data.get("A2受傷", 0),
A3_amount=data.get("A3", 0),
total_amount=data.get("總件數", 0)
)
statistics_inserted_id = statistics_collection.insert_one(statistics.dict()).inserted_id
# 建立 high_risk_intersection 物件
high_risk_intersection = HighRiskIntersection(
year=2022,
month=int(re.findall(r'(\d+)月', file)[0]),
rank=data.get("編號"),
jurisdiction=data.get("轄區分局"),
intersection=data.get("路口名稱"),
accident_time_interval=data.get("發生時間"),
cause=data.get("主要肇因"),
city="台中市",
statistics_id=statistics_inserted_id
)
intersection_collection.insert_one(high_risk_intersection.dict())
client.close()
下面兩張圖片當中我們可以看到在資料庫當中成功建立的兩個 collection,並且在 Intersection 當中的資料是有成功寫入 statistic_id 欄位的
在 mongo 當中提供了 lookup 指令來進行關聯的查詢,下方為一個簡易的語法格式
{
"$lookup": {
"from": "<要搜尋的表>",
"localField": "<當前表中的哪個欄位作為聯外鍵>",
"foreignField": "<要搜尋的表的欄位>",
"as": "<在查詢結果當中要呈現的名稱>"
}
}
要使用的時候要記得透過 aggregate 進行使用,否則會無法正確回傳資料甚至會引發錯誤,至於 aggregate 則會留到後面的部分會進行說明,下方先附上這次 lookup 查詢範例程式
data = intersection_collection.aggregate([
{
"$lookup": {
"from": "Statistics",
"localField": "statistics_id",
"foreignField": "_id",
"as": "statistic"
}
}
])
pprint(list(data))
下圖中我們可以看到已經成功地透過指定 statistics_id 欄位進行 lookup 動作,並且成功的取得資料