我們在前面幾章介紹了如何實作一個簡易的會員系統以及JWT機制,在本章節我們將結合 JWT 來對庫存的 Item 表進行一個權限控管。
我們將會實作以下功能:
我們這次一樣開 4 個接口對應 CRUD 的動作:
# src/schemas.py
class ItemBase(DateTimeBase):
id: str
item_name: str
price: float
quantity: int
class Config:
orm_mode = True
class ItemCreateInput(BaseModel):
item_name: str
price: float
quantity: int
class ItemUpdateInput(BaseModel):
item_name: Optional[str]
price: Optional[float]
quantity: Optional[int]
回傳時使用 ItemBase,其他兩個分別對應 Create、Update。
# src/service.py
def create_item(db: Session, item: schemas.ItemCreateInput):
db_item = models.Item(**item.dict())
db.add(db_item)
try:
db.commit()
db.refresh(db_item)
except Exception as e:
db.rollback()
print(e)
raise exceptions.ServerError("Error creating item")
return db_item
def get_item_by_id(db: Session, id: str) -> models.Item:
query = select(models.Item).where(models.Item.id == id)
item = db.execute(query).scalar()
return item
def get_item_by_name(db: Session, name: str) -> models.Item:
query = select(models.Item).where(models.Item.item_name == name)
item = db.execute(query).scalar()
return item
def get_all_items(db: Session):
query = select(models.Item)
items = db.execute(query).scalars().all()
return items
def update_item(
db: Session, update_data: schemas.ItemUpdateInput, item: models.Item
) -> models.Item:
update_data: dict = update_data.dict(exclude_unset=True, exclude_none=True)
for key, value in update_data.items():
setattr(item, key, value)
try:
db.commit()
db.refresh(item)
except Exception as e:
db.rollback()
print(e)
raise exceptions.ServerError("Error updating item")
return item
def delete_item(db: Session, item: models.Item):
try:
db.delete(item)
db.commit()
except Exception as e:
db.rollback()
print(e)
raise exceptions.ServerError("Error deleting item")
根據各接口撰寫各自的業務邏輯。
# src/main.py
@app.post(
"/items",
status_code=status.HTTP_201_CREATED,
response_model=schemas.ItemBase,
)
def create_item(dependency=Depends(dependencies.check_new_item)):
"""
Create an item
"""
item, db = dependency
return service.create_item(db, item)
@app.get(
"/items",
response_model=list[schemas.ItemBase],
)
def get_items(db: Session = Depends(get_db)):
"""
Get all items
"""
return service.get_all_items(db)
@app.patch(
"/items",
response_model=schemas.ItemBase,
responses={404: {"description": "Item not found"}},
)
def update_item(
id: UUID, update_data: schemas.ItemUpdateInput, db: Session = Depends(get_db)
):
"""
Update an item
"""
id = str(id)
item = service.get_item_by_id(db, id)
if not item:
raise exceptions.ItemNotFound()
return service.update_item(db, update_data, item)
@app.delete(
"/items",
status_code=status.HTTP_204_NO_CONTENT,
responses={404: {"description": "Item not found"}},
)
def delete_item(id: UUID, db: Session = Depends(get_db)):
"""
Delete an item
"""
id = str(id)
item = service.get_item_by_id(db, id)
if not item:
raise exceptions.ItemNotFound()
service.delete_item(db, item)
新增 CRUD 四個接口。
基本上上述流程皆與之前介紹過的大同小異,大家若不熟悉這次可以順便練習。
接著我們就可以新增一個後台管理員的權限只讓他來存取這些接口。
本次範例就不將其寫入資料庫中,我們直接從全域變數中獲取帳號密碼進行驗證即可。首先我們先在 config 中新增帳號密碼:
# src/config.py
ADMIN_USERNAME = "admin@test.com"
ADMIN_PASSWORD = "1234"
接著在 dependencies 中新增驗證的方法:
# src/dependencies.py
def authenticate_admin(data: schemas.LoginInput) -> bool:
if data.username != ADMIN_USERNAME or data.password != ADMIN_PASSWORD:
raise exceptions.InvalidPasswordOrEmail()
return True
簡單的判斷是不是我們設定的帳號密碼即可。
最後新增一個路徑操作函式:
# src/main.py
@app.post("/admin/login", response_model=str)
def admin_login(
result: bool = Depends(dependencies.authenticate_admin),
):
"""
Admin Login
"""
access_token = jwt.create_access_token(data={"sub": "admin"})
return access_token
只要帳號密碼正確我們就回傳一個 token ,裡面包含主體為 admin,接著我們就可以使用這個 token 進行身分驗證。
我們有了JWT當作我們的權限卡了,現在我們要使用這張權限卡來去存取我們允許的接口。
首先我們在 dependencies 中撰寫驗證 JWT 是否為 admin 的程式:
# src/dependencies.py
def check_is_admin(jwt_data: dict = Depends(jwt.decode_jwt)) -> None:
if jwt_data['sub'] != 'admin':
raise exceptions.NotAdmin()
我們解開 JWT 後去驗證 sub 主體是不是 admin ,是的話我們才可以往下執行程式。
# src/main.py
@app.get(
"/items",
response_model=list[schemas.ItemBase],
dependencies=[Depends(dependencies.check_is_admin)],
)
def get_items(db: Session = Depends(get_db)):
"""
Get all items
"""
return service.get_all_items(db)
我們以拿取所有庫存為例,由於我們不用透過這個 check_is_admin 獲得任何回傳值,所以我們可以直接將其定義在路徑裝飾器中 dependencies=[Depends(dependencies.check_is_admin)]
,這樣就不用去新增變數容器給他。其他的 Item 接口也是照這個做法進行權限控管。
完成後可以看到,沒有傳遞正確 JWT 時會被阻擋。
有傳遞 admin 的 JWT 時便回得到正確結果。
今天算是 JWT 的一個簡單應用,拿來做為身分驗證,並與其他顧客區隔開,就算使用顧客登入拿取到的token來使用 admin 的端點,也會因為 jwt 內含的資料不同而被阻擋。