今天我們將介紹如何在 fastapi 中集成與資料庫的非同步連線。
我們與官方文件教學不同的是使用 sqlalchemy 本身的 asyncio 功能,筆者記得以前翻閱 fastapi 官方文件時看到的做法還是使用這個的,目前在 fastapi 官方文件中使用的非同步套件是 https://github.com/encode/databases , 但筆者本身較熟悉 sqlalchemy 作法便直接使用 sqlalchemy進行介紹。
本篇程式碼將會使用之前的簡易庫存系統進行改寫。
pip install aiosqlite
在與資料庫進行非同步連線時往往需要不同的驅動程式套件,之前的範例我們使用sqllite作為資料庫,那我們這邊就要安裝 aiosqlite 這個套件來進行驅動。
我們一樣先針對資料庫設定的檔案進行改寫:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
SQLALCHEMY_DATABASE_URL = "sqlite+aiosqlite:///./example.db"
我們引入 sqlalchemy.ext.asyncio
底下的非同步資料庫引擎和Session進行使用,資料庫路徑的部分要多指定使用 aiosqlite 進行驅動,否則無法使用非同步連線。
# initial db engine
async_engine = create_async_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
)
實體化引擎的地方一樣 connect_args 參數不能少,就是改成用非同步的函式來實體化。
# initial session maker
async_session = sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
sessionmaker 的部分要指定使用 AsyncSession ,否則預設會是同步的Session。
# Dependency
async def get_db() -> AsyncSession:
async with async_session() as session:
yield session
拿取 session 的寫法可以直接用 context manager 簡化為這樣,在response 完後一樣會釋放 session。
# src/service.py
# 原本的同步寫法
def get_all_items(db: Session):
query = select(models.Item)
items = db.execute(query).scalars().all()
return items
# 非同步寫法
async def get_all_items(db: AsyncSession):
query = select(models.Item)
result = await db.execute(query)
items = result.scalars().all()
return items
可以看到用法基本上沒有差別,就是函式要改成非同步的,以及呼叫db操作時要使用 await 呼叫。
# src/main.py
@app.get(
"/items",
response_model=list[schemas.ItemBase],
dependencies=[Depends(dependencies.check_is_admin)],
)
async def get_items(db: AsyncSession = Depends(get_db)):
"""
Get all items
"""
return await service.get_all_items(db)
router 的地方也是一樣,改寫成非同步的方法,以及呼叫非同步方法時要用 await 即可。
成功可以一樣的拿取到正確的資料。
在實務上大家可以根據情境來選擇要使用非同步或是同步的方法來連接資料庫,但要記得與FastAPI 搭配使用時要注意說同步的方式在FastAPI 中使用同步的函式操作即可,有使用非同步再改為非同步,否則效能是不會比較快的。
Async SQL (Relational) Databases with Encode/Databases - FastAPI (tiangolo.com)