後台任務是指說,將回應返回給用戶端之後要執行的任務。例如說寄出電子郵件,將資料傳送到消息佇列中之類的需要時間的操作。如果在回應前就執行這些操作可能會額外增加用戶端等待的時間,所以我們可以將其放在後台任務中。
我們在今天的介紹中,我們將透過庫存系統中訂單與庫存的操作來讓大家了解後台任務如何使用。
我們新增一個端點給新增訂單這個動作,如果前面已經有練習實作完的這邊可以跳過。
# src/schemas.py
class OrderBase(DateTimeBase):
id: str
customer_id: str
item_id: str
quantity: int
class Config:
orm_mode = True
class OrderCreateInput(BaseModel):
item_id: str
quantity: int
在 schemas 中新增 Order 的 pydantic model 一個用來回傳資料一個用來驗證新增訂單時傳入的資料。
# src/dependencies.py
def check_item_enough_quantity(
order: schemas.OrderCreateInput, db: Session = Depends(get_db)
) -> tuple[models.Item, schemas.OrderCreateInput, Session]:
item = service.get_lock_item_by_id(db, order.item_id)
if not item:
raise exceptions.ItemNotFound()
if item.quantity < order.quantity:
raise exceptions.ItemNotEnoughQuantity()
return item, order, db
我們在建立訂單前需要先驗證說庫存的數量夠不夠,所以可以使用依賴注入的方法先對指定的 item 進行驗證,沒有問題便可以將我們需要的東西返回給路徑操作函式,這邊會返回 models.Item 的原因是因為我們建立訂單的同時也要對庫存的數量做增減,所以需要用到這個物件。
# src/service.py
def get_lock_item_by_id(db: Session, id: str) -> models.Item:
query = select(models.Item).where(models.Item.id == id).with_for_update()
item = db.execute(query).scalar()
return item
def create_order(
db: Session, item: models.Item, order: schemas.OrderCreateInput, customer_id: str
):
db_order = models.Order(**order.dict())
db_order.customer_id = customer_id
item.quantity -= order.quantity
db.add(db_order)
try:
db.commit()
db.refresh(db_order)
except Exception as e:
db.rollback()
print(e)
raise exceptions.ServerError("Error creating order")
return db_order
我們會使用 get_lock_item_by_id
的原因是因為不希望說有並發的狀況發生時,庫存發生數量不足的狀況,所以需要先將該筆資料做鎖定,使其他 Session 暫時無法更改。新增訂單時就是順便扣除庫存的數量即可。
# src/main.py
@app.post(
"/orders",
status_code=status.HTTP_201_CREATED,
response_model=schemas.OrderBase,
)
def create_order(
jwt_data: dict = Depends(jwt.decode_jwt),
dependency=Depends(dependencies.check_item_enough_quantity),
):
"""
Create an order
"""
item, order, db = dependency
customer_id = jwt_data['sub']
return service.create_order(db, item, order, customer_id)
路徑操作函式接收依賴注入的資料再傳給業務邏輯。
# src/mail.py
def fake_send_mail(msg: str):
print(f"Sending mail: {msg}")
我們用個範例的函式來模擬即可,沒有要真的寄信。
# src/main.py
from fastapi import FastAPI, status, Depends, Response, BackgroundTasks
@app.post(
"/orders",
status_code=status.HTTP_201_CREATED,
response_model=schemas.OrderBase,
)
def create_order(
background_tasks: BackgroundTasks,
jwt_data: dict = Depends(jwt.decode_jwt),
dependency=Depends(dependencies.check_item_enough_quantity),
):
"""
Create an order
"""
item, order, db = dependency
customer_id = jwt_data['sub']
order = service.create_order(db, item, order, customer_id)
background_tasks.add_task(mail.fake_send_mail, f'Order {order.id} created')
return order
我們使用 fastapi 中的 BackgroundTasks 來幫助我們新增後台任務。一樣需要在路徑操作函式中聲明引入該類別。FastAPI 會自動幫我們在返回回應後執行任務。
我們可以使用 background_tasks.add_task
來增加要執行的任務,第一個參數為要執行的函式,後面的參數皆為要帶入執行函式的引數。這樣當我們建立完訂單後他就會模擬發信說有訂單被建立。
可以看到我們成功建立完訂單後也成功模擬寄信。
BackgroundTasks 可以有效的執行 阻塞 ( blocking ) 及 非阻塞( non-blocking ) I/O 的操作,同步的函式會使用多執行緒的方法執行,非同步的方法則是使用 await 執行。
所以說在使用時要注意以下事項:
如果要使用大量資源運算且不用在同一個執行緒執行的話,那麼使用其他較大的工具會是更好的選擇,例如 Celery。又或者說直接集成 radis、RabbitMQ 等可作為消息佇列器的軟體也是一個解決方案。