由於有傳統發票的 API 需要上傳圖片,這邊我想介紹我怎麼先做一個圖片 API 的服務(回收前面的伏筆)。而主要服務 FastAPI 在連結 MongoDB,我會先展示需要有 Model 的做法,之後在 Airflow 那裡會介紹不用定義資料結構也可以使用。(又是伏筆?故事總是一波平息一波又起XD)
class Media(Document):
stream: bytes
content_type: str
created_timestamp: datetime
@router.post("/upload", summary="上傳媒體檔案")
async def upload_file(request: Request,
file: UploadFile = File(...)):
file_extension = file.filename.split(".")[-1]
if file_extension not in ["jpg", "jpeg", "png", "gif"]:
raise HTTPException(400, "File Type occurred error.")
mime_type, _ = mimetypes.guess_type(file.filename)
content_type = mime_type if mime_type else "application/octet-stream"
file_content = await file.read()
media = Media(
stream=file_content,
content_type=content_type,
created_timestamp=datetime.now(tz)
)
await media.save()
return success_response(message="媒體上傳成功", data=media.id)
content_type 是要儲存這張照片的資料格式,例如 “image/png”
@router.get("", summary="查詢檔案")
async def get_media(request: Request,
mediaId: str = Query(..., description="mediaId")):
if not ObjectId.is_valid(mediaId):
raise HTTPException(400, f"Invalid field type in {mediaId}. Expected: ObjectId")
media = await Media.get(mediaId)
if media is None:
raise HTTPException(400, f"Invalid metadataId: {mediaId}.")
return StreamingResponse(BytesIO(media.stream), media_type=media.content_type)