本次的程式碼與目錄結構可以參考 FastAPI Tutorial : Day15 branch
我們在 Day01 就有提到 FastAPI 支援非同步 handler
而 Day10 到 Day14 我們成功連接 DB 、 設定 SQLAlchemy 以 ORM 來 CRUD
但是到目前為止我們的 DB 存取都是同步的
也就是說,當我們的 API endpoint 有 DB 存取的時候,會等待 DB 存取完成後才會回傳 response
這樣的話,當 DB 存取時間過長時,會造成 API endpoint 的 response time 過長 ( 因為被 Block 住 )
所以我們可以將 DB 存取改為非同步的方式,讓 API endpoint 在等待 DB 存取的時候,可以先處理其他的 request
如果之前沒跟著實作的朋友可以直接跳到 非同步存取 DB
為了今天的 benchmark 我們會將專案架構做一些調整
最後會比較sync
和async
存取 DB 的 performance
sync
目錄我們新增一個 sync
木怒在 backend
底下
並複製 api
crud
database
到 sync
底下
調整完的目錄結構如下:
.
├── api
│ ├── depends.py
│ ├── infor.py
│ ├── items.py
│ └── users.py
├── crud
│ ├── items.py
│ └── users.py
├── database
│ ├── fake_db.py
│ └── generic.py
├── main.py
├── models
│ ├── base.py
│ ├── item.py
│ └── user.py
├── run.py
├── schemas
│ ├── items.py
│ └── users.py
├── setting
│ ├── .env.dev
│ ├── .env.prod
│ ├── .env.test
│ └── config.py
└── sync
├── api
│ ├── depends.py
│ ├── infor.py
│ ├── items.py
│ └── users.py
├── crud
│ ├── items.py
│ └── users.py
└── database
├── fake_db.py
└── generic.py
run.py
.env
setting/config.py
與 Day10 相似,用一樣的方法來加上 run_mode
的選項
讓我們能夠以 --run_mode
來決定要使用 sync
還是 async
的方式來存取 DB
run.py
# ...
# 新增 run_mode
run_mode = parser.add_argument_group(title="Run Mode", description="Run the server in Async or Sync mode. Default is Async.")
run_mode.add_argument("--sync",action="store_true", help="Run the server in Sync mode.")
# ...
if args.sync:
os.environ["RUN_MODE"] = "SYNC"
else:
os.environ["RUN_MODE"] = "ASYNC"
# ...
所以在 setting/config.py
中,我們可以透過載入 RUN_MODE
來動態的載入對應的 Database url
同時也需要調整 .env
中的原本 Database url
.env.dev
SYNC_POSTGRESQL_DATABASE_URL='postgresql+psycopg2://fastapi_tutorial:fastapi_tutorial_password@localhost:5432/fastapi_tutorial'
ASYNC_POSTGRESQL_DATABASE_URL='postgresql+asyncpg://fastapi_tutorial:fastapi_tutorial_password@localhost:5432/fastapi_tutorial'
SYNC_MYSQL_DATABASE_URL='mysql+pymysql://root:fastapi_tutorial_password@localhost:3306/fastapi_tutorial'
ASYNC_MYSQL_DATABASE_URL='mysql+aiomysql://root:fastapi_tutorial_password@localhost:3306/fastapi_tutorial'
可以看到 SYNC_POSTGRESQL_DATABASE_URL
和 ASYNC_POSTGRESQL_DATABASE_URL
的差別在於 ASYNC
的 Driver 是我們剛剛多加的非同步 Driver asyncpg
SYNC
的 Driver 是原本的 psycopg2
setting/config.py
class Settings():
# ...
# 多新增 run_mode
run_mode:str = os.getenv("RUN_MODE").upper() # <--- 新增
database_url: str = os.getenv(f"{run_mode}_{db_type}_DATABASE_URL") # <--- 修改
# ...
這樣 settings 就可以依據 argument --sync
來決定要使用 ASYNC
還是 SYNC
的方式來存取 DB !
main.py
因為我們現在將 同步處理 與 非同步處理 分開
所以在 main.py
中也要依據現在的 RUN_MODE
來決定來載入 async
或 sync
的 Router 或 CRUD
多新增 settings
來判斷要載入 async
或 sync
的 Router 或 CRUD main.py
from fastapi import FastAPI
from setting.config import get_settings # <--- 新增
settings = get_settings()
app = FastAPI()
如果是 ASYNC
的話,就載入 async
的 Router 或 CRUD main.py
# ...
if settings.run_mode == "ASYNC":
from api.infor import router as infor_router
from api.users import router as user_router
from api.items import router as item_router
from database.generic import init_db , close_db
app.include_router(infor_router)
app.include_router(user_router)
app.include_router(item_router)
@app.on_event("startup")
async def startup():
await init_db()
@app.on_event("shutdown")
async def shutdown():
await close_db()
反之,如果是 SYNC
的話,就載入 sync
的 Router 或 CRUD main.py
# ...
else:
from sync.api.infor import router as infor_router
from sync.api.users import router as user_router
from sync.api.items import router as item_router
from sync.database.generic import init_db , close_db
app.include_router(infor_router)
app.include_router(user_router)
app.include_router(item_router)
@app.on_event("startup")
def startup():
init_db()
@app.on_event("shutdown")
def shutdown():
close_db()
SQLAlchemy 也有支援非同步的方式來存取 DB
不過我們需要使用其他的 DB driver
不同的 DB 有不同的 driver
asyncpg
aiomysql
而
greenlet
是asyncpg
的 dependency
poetry add asyncpg
poetry add greenlet
poetry add aiomysql
database/generic.py
與之前的 database/generic.py
相比,我們只需要將 Session
改為 AsyncSession
而原本的 create_engine
要改為 create_async_engine
create_engine
是在from sqlalchemy import create_engine
底下create_async_engine
是在from sqlalchemy.ext.asyncio
中sessionmaker
也要改為async_sessionmaker
database/generic.py
# async version
from sqlalchemy.ext.asyncio import create_async_engine , async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from setting.config import get_settings
settings = get_settings()
# 這邊要改為 create_async_engine
engine = create_async_engine(
settings.database_url,
echo=True,
pool_pre_ping=True
)
# 改為 async_sessionmaker
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, autocommit=False)
class Base(DeclarativeBase):
pass
# 等等會特別說明這個 `get_db`
async def get_db() -> AsyncGenerator:
async with SessionLocal() as db:
async with db.begin():
yield db
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close_db():
async with engine.begin() as conn:
await conn.close()
要注意的是,我們的 get_db
有一點不一樣
因為改為非同步的方式,所以我們需要使用 async with connection.begin()
來取得 connection
並透過 yield
來回傳一個 AsyncGenerator
讓我們在 api
或 crud
中可以透過 Depends
來取得 AsyncSession
常見的寫法如下:
接下來都先以 get users 為例
AsyncSession
crud/users.py
from sqlalchemy.ext.asyncio import AsyncSession # <--- 新增
# ...
async def get_users(db_session:AsyncSession, keyword:str=None,last:int=0,limit:int=50):
stmt = select(UserModel.name,UserModel.id,UserModel.email,UserModel.avatar)
if keyword:
stmt = stmt.where(UserModel.name.like(f"%{keyword}%"))
stmt = stmt.offset(last).limit(limit)
result = await db_session.execute(stmt)
users = result.all()
return users
# ...
這邊要注意的是,我們透過 Depends(get_db)
來取得 AsyncSession
再傳入 CRUD 的 get_users
api/users.py
@router.get("/users",
response_model=List[UserSchema.UserRead],
response_description="Get list of user",
)
async def get_users(page_parms:dict= Depends(pagination_parms), db_session:AsyncSession=Depends(get_db)):
users = await UserCrud.get_users(db_session,**page_parms)
return users
如果在每個有使用到 CRUD 的 Routers 都需要這樣寫的話,會有點冗長
我們可以透過將 Depends(get_db)
抽出來寫在開頭
api/users.py
db_depends = Depends(get_db) # <--- 新增
@router.get("/users",
response_model=List[UserSchema.UserRead],
response_description="Get list of user",
)
async def get_users(page_parms:dict= Depends(pagination_parms), db_session=db_depends):
users = await UserCrud.get_users(db_session,**page_parms)
return users
這樣我們只要使用 db_session=db_depends
就可以取得 AsyncSession
來注入 CRUD
我們可以透過 Apache benchmark tool (ab
) 來測試 API endpoint 的 performance
如果是使用 Mac , 內建就有安裝
ab
了
但是 hostname 必須使用127.0.0.1
不能使用localhost
如果是以 50000
個 request 並且將 concurrency 設為 32
來打 API 的話:
sync
: ab -n 50000 -c 32 http://127.0.0.1:8001/sync/api/users
apr_socket_recv: timeout
的 error async
: ab -n 50000 -c 32 http://127.0.0.1:8001/api/users
50000
個 request Time per request
是約 67
ms 如果是以 10000
個 request 並且將 concurrency 設為 4
來打 API 的話:
sync
: ab -n 10000 -c 4 http://127.0.0.1:8001/sync/api/users
async
: ab -n 10000 -c 4 http:///127.0.0.1:8001/api/users
會發現反而 sync
的 performance 比 async
好
sync
的 code 切到 /sync/api/xxx
/api/xxx
改為 async
async
的方式來存取 DB Depends
將 AsyncSession
注入到 CRUD functionab
benchmark 後 async
的 performance 會隨著 concurrency 的增加而變好 sync
的 performance 比 async
好