今天我們要介紹 pydantic 在寫入資料時該如何與 MongoDB 互相搭配,下圖中是我們本次資料集的截圖
可以看到截圖當中我們的格式是固定的,紅色方框內可以理解成一個 document,也就是一行 raw data 的概念,下方為我們今天會介紹的內容
通常我們會把存放 model 的 module 命名為 schema,下方為今日專案的截圖,可以看到 schema 為存放 model 的 module,而 demo 則是用來存放寫入資料的程式
通常在定義 MongoDB 資料模型時,筆者這邊習慣至少會分成兩類,分別為 Base 物件以及、ModelInDB,至於要特別建立一個 ModelInDB 的原因,主要是因為 MongoDB 的自帶的 id 欄位在內部的名稱為 _id
,因此我們不希望在寫入的時候多寫入一個 id 為空的欄位,而將 InDB 的class 獨立出來後,我們就可以透過設定別名的方式將 _id
欄位對應名稱到 id 了,下方直接附上範例
由於後續要解說查詢 MongoDB 資料時,會解釋道如何查詢 json 格式的欄位,因此觀察資料集後我們將統計資訊的部分獨立出來
from pydantic import BaseModel, Field
from datetime import datetime
from bson.objectid import ObjectId
class Statistics(BaseModel):
A1_amount: int
A2_amount: int
A2_injury: int # A2 受傷
A3_amount: int
total_amount: int
class HighRiskIntersection(BaseModel):
year: int
month: int
rank: int # 排名
jurisdiction: str # 轄區
intersection: str # 路口
statistics: Statistics
accident_time_interval: str # 發生時間區間 (24 小時制)
cause: str # 主要肇因
created_time: datetime = datetime.now()
class HighRiskIntersectionInDB(HighRiskIntersection):
id: ObjectId = Field(alias="_id")
class Config:
arbitrary_types_allowed = True
而根據需求不同筆者這邊建議可以將 model 做更細項的拆分,比如下方範例中可以看到筆者將 create、update 兩個模型拆開,目的是用來控制在新增或修改時,不會讓其他人改動到原本的資料集,只開放該開放的欄位即可,剩下的 InDB、InAPI 則是根據需求個別去訂該如何轉譯
from pydantic import BaseModel
class CarBase(BaseModel):
pass
class CarInDB(CarBase):
pass
class CarInAPI(CarBase):
pass
class CarCreate(BaseModel):
pass
class CarUpdate(BaseModel):
pass
下方為寫入資料的程式碼範例,可以看到我們把 datas 目錄下的檔案全部讀進程式當中並逐個建立 HighRiskIntersection 物件,最後使用 insert_many 一次性進行寫入,可以避免重複對資料庫進行連線、寫入的操作
盡量減少對資料庫的連線請求次數是對資料庫資進行操作的最大原則,能在資料庫內解決的事情就在資料庫內一次進行解決
記得在寫入前要將物件轉換為 dictionary 型態才可以正確寫入
import re
import os
import json
from pathlib import Path
from dotenv import load_dotenv
from pymongo.database import Database
from pymongo.collection import Collection
from pymongo.mongo_client import MongoClient
from schema import Statistics, HighRiskIntersection
# 讀取 .env 取得連線資訊
BASE_DIR = Path(__file__).parent.parent
load_dotenv(str(BASE_DIR / ".env"))
# 取得所有 datas 檔案路徑下的所有檔案
file_names = []
for root, dirs, files in os.walk(BASE_DIR / "datas"):
for file in files:
file_path = os.path.join(root, file)
file_names.append(file_path)
# 建立 client 並與 db、collection 進行連線
client = MongoClient(host=os.getenv("MONGODB_ATLAS_URL"))
database = Database(client=client, name="2023_ithelp")
collection = Collection(database=database, name="HighRiskIntersection")
# 依照檔案路徑將所有的資料打開並寫入資料庫
object_list = []
for file in file_names:
with open(file=file, encoding="utf-8") as f:
datas = json.load(fp=f)
for data in datas:
# 建立 Statistics 物件
statistics = Statistics(
A1_amount=data.get("A1", 0),
A2_amount=data.get("A2件數", 0),
A2_injury=data.get("A2受傷", 0),
A3_amount=data.get("A3", 0),
total_amount=data.get("總件數", 0)
)
# 建立 high_risk_intersection 物件
high_risk_intersection = HighRiskIntersection(
year=2022,
month=int(re.findall(r'(\d+)月', file)[0]),
rank=data.get("編號"),
jurisdiction=data.get("轄區分局"),
intersection=data.get("路口名稱"),
statistics=statistics,
accident_time_interval=data.get("發生時間"),
cause=data.get("主要肇因")
)
# 將物件轉換為 dictionary 後才放入 list 當中
object_list.append(high_risk_intersection.dict())
# 使用 insert_many 一次進行插入
collection.insert_many(object_list)
client.close()
下方圖片當中可以看到我們成功將資料插入,且紅色方框中為 dictionary 格式,這個型態在 mongodb 當中有一個專有名詞叫做 embedded document (嵌入式文檔),白話來說就是在 dictionary 格式裡面的 dictionary