問題背景:
我有三個端,我希望當client端的請求斷掉後,gateway就會取消串流,繼而backend也會同步取消。
client.py:
import requests
response = requests.get("http://127.0.0.1:8000/G_stream", stream=True)
count = 0
with response as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=1024):
print(chunk.decode("utf-8", errors="ignore"), end="", flush=True)
count += 1
if count == 10:
break
gateway.py
import requests
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def text_generator(response):
with response as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=1):
if chunk:
print(chunk.decode("utf-8", errors="ignore"), end="", flush=True)
yield chunk.decode("utf-8", errors="ignore")
await asyncio.sleep(0)
async def generate_stream():
deresponse = requests.get("http://127.0.0.1:8001/B_stream", stream=True)
response = StreamingResponse(
text_generator(deresponse), media_type="text/event-stream"
)
return response
@app.get("/G_stream")
async def stream_text():
output = await generate_stream()
return output
backend.py:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def text_generator():
text = "Hello, this is a streaming example."
for chunk in text:
print(chunk)
yield chunk
await asyncio.sleep(0.2) # 模擬每200毫秒傳輸一個字元
print("done")
@app.get("/B_stream")
async def stream_text():
response = StreamingResponse(text_generator(), media_type="text/event-stream")
return response
目前我能夠當client停止後,gateway會收到socket.send() raised exception.的warnning,但我不知道在哪裡可以取得這個問題,然後想說如果catch到這個錯誤就能break後面的backend? 還是各位前輩有其他方式?
在當前的架構中,當client斷開連接時,FastAPI作為閘道層應該能够檢測到這個斷開事件。 但你提到的警告資訊可能是因為FastAPI的StreamingResponse在嘗試向已關閉的socket發送數據時拋出的异常。
為了在Gateway層處理這個問題,你可以嘗試捕獲StreamingResponse在生成器函數text_ generator中可能拋出的异常,並在捕獲到异常時停止從Backend獲取流數據。 為此,可以在text_ generator函數內部添加一個try-except塊:
Python
async def text_ generator(response):
try:
with response as r:
r.raise_ for_ status()
for chunk in r.iter_ content(chunk_size=1):
if chunk:
print(chunk.decode(“utf-8”,errors=“ignore”),end=“”,flush=True)
yield chunk.decode(“utf-8”,errors=“ignore”)
await asyncio.sleep(0)
except Exception as e:
#當發生异常(例如用戶端斷開連接)時,可以在這裡執行相應的清理操作
print(f“Caught exception: {e}”)
#可以選擇在此處斷開與Backend的連接或取消任務,但由於requests庫本身並不支持stream=True模式下的優雅關閉,所以這裡沒有直接的方法來關閉請求連接
return
然而,在requests庫中使用stream=True時,無法直接得知用戶端是否斷開連接並優雅地關閉到Backend的連接。 要實現這一目標,你可能需要切換到一個支持雙向通信和更精細控制的庫,如aiohttp或httpx,它們提供了對響應流更好的控制。
對於Backend,由於它是一個獨立的服務,通常會繼續生成資料流程,直到其生成器函數完成。 在這種情況下,除非Gateway能够通知Backend用戶端已經斷開連接,否則Backend不會主動停止流式傳輸。 為了解决這個問題,你可能需要引入額外的消息傳遞機制(如Redis Pub/Sub或RabbitMQ等消息隊列)來同步前後端的狀態