這邊特別設計 MongoDB Dao(Data Access Object)類似 ORM 框架,像是 Sqlalchemy
用物件導向的方式與資料庫互動
然而,你可能會好奇 FastAPI 怎麼沒有呢?在 FastAPI 也可以實現這個方法,但我在 FastAPI 已經選用其他的套件(Beanie) 去實作。而使用 Beanie 實作的時候,必須要具體定義欄位名稱,有助於 API 檢查格式,避免錯誤的資料塞進 DB。
from beanie import Document
from datetime import datetime
class Media(Document):
stream: bytes
content_type: str
created_timestamp: datetime
當然,不同的方法沒有好壞,只有適不適合XD
Airflow 這裡不會跟客戶端連線,通常僅會有查詢資料、更新資料,比較少新增資料的情況,因此我直接用 pymongo
可以從 docker-compose.yml
取得 MONGO_URI
import asyncio
import logging
import os
from pymongo import MongoClient
class MongoDBDao:
def __init__(self, collection_name):
self.mongo_uri = os.environ["MONGO_URI"]
self.collection_name = collection_name
def connect(self):
try:
self.client = MongoClient(self.mongo_uri)
self.db = self.client.get_database()
self.collection = self.db[self.collection_name]
logging.info("Connected to MongoDB")
except Exception as e:
logging.warning(f"Error connecting to MongoDB: {str(e)}")
def find_one(self, query={}):
try:
document = self.collection.find_one(query)
return document
except Exception as e:
logging.warning(f"Error finding document in MongoDB: {str(e)}")
def find_all(self, query={}):
try:
documents = self.collection.find(query)
return list(documents)
except Exception as e:
logging.warning(f"Error finding documents in MongoDB: {str(e)}")
return []
class DataDao:
def __init__(self):
self.conn = MongoDBDao(collection_name="Data")
self.conn.connect()
def close(self):
if self.conn:
self.conn.close()
使用的時候可以再搭配 DataDao 讓程式碼更簡潔
data_dao = DataDao()
conn = data_dao.conn
query = {
"tag": "invoice_paper",
"data.gov.status": "審查中",
"data.gov.gov_status": "排程執行成功"
}
invoices = conn.find_all(query)
更多的 MongoDB Dao 方法可以參考連結