iT邦幫忙

0

python如何實現三端同步取消

  • 分享至 

  • xImage

問題背景:

我有三個端,我希望當client端的請求斷掉後,gateway就會取消串流,繼而backend也會同步取消。

client.py:

import requests
response = requests.get("http://127.0.0.1:8000/G_stream", stream=True)
count = 0
with response as r:
    r.raise_for_status() 
    for chunk in r.iter_content(chunk_size=1024): 
        print(chunk.decode("utf-8", errors="ignore"), end="", flush=True)
        count += 1
        if count == 10:
            break

gateway.py

import requests
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()


async def text_generator(response):
    with response as r:
        r.raise_for_status()  
        for chunk in r.iter_content(chunk_size=1): 
            if chunk:
                print(chunk.decode("utf-8", errors="ignore"), end="", flush=True)
                yield chunk.decode("utf-8", errors="ignore")
                await asyncio.sleep(0)


async def generate_stream():
    deresponse = requests.get("http://127.0.0.1:8001/B_stream", stream=True)
    response = StreamingResponse(
        text_generator(deresponse), media_type="text/event-stream"
    )

    return response


@app.get("/G_stream")
async def stream_text():
    output = await generate_stream()
    return output

backend.py:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def text_generator():

    text = "Hello, this is a streaming example."
    for chunk in text:
        print(chunk)
        yield chunk
        await asyncio.sleep(0.2)  # 模擬每200毫秒傳輸一個字元

    print("done")


@app.get("/B_stream")
async def stream_text():
    response = StreamingResponse(text_generator(), media_type="text/event-stream")
    return response

目前我能夠當client停止後,gateway會收到socket.send() raised exception.的warnning,但我不知道在哪裡可以取得這個問題,然後想說如果catch到這個錯誤就能break後面的backend? 還是各位前輩有其他方式?

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 個回答

0
dashuai
iT邦新手 5 級 ‧ 2024-01-30 14:25:37

在當前的架構中,當client斷開連接時,FastAPI作為閘道層應該能够檢測到這個斷開事件。 但你提到的警告資訊可能是因為FastAPI的StreamingResponse在嘗試向已關閉的socket發送數據時拋出的异常。
為了在Gateway層處理這個問題,你可以嘗試捕獲StreamingResponse在生成器函數text_ generator中可能拋出的异常,並在捕獲到异常時停止從Backend獲取流數據。 為此,可以在text_ generator函數內部添加一個try-except塊:
Python
async def text_ generator(response):
try:
with response as r:
r.raise_ for_ status()
for chunk in r.iter_ content(chunk_size=1):
if chunk:
print(chunk.decode(“utf-8”,errors=“ignore”),end=“”,flush=True)
yield chunk.decode(“utf-8”,errors=“ignore”)
await asyncio.sleep(0)
except Exception as e:
#當發生异常(例如用戶端斷開連接)時,可以在這裡執行相應的清理操作
print(f“Caught exception: {e}”)
#可以選擇在此處斷開與Backend的連接或取消任務,但由於requests庫本身並不支持stream=True模式下的優雅關閉,所以這裡沒有直接的方法來關閉請求連接
return
然而,在requests庫中使用stream=True時,無法直接得知用戶端是否斷開連接並優雅地關閉到Backend的連接。 要實現這一目標,你可能需要切換到一個支持雙向通信和更精細控制的庫,如aiohttp或httpx,它們提供了對響應流更好的控制。
對於Backend,由於它是一個獨立的服務,通常會繼續生成資料流程,直到其生成器函數完成。 在這種情況下,除非Gateway能够通知Backend用戶端已經斷開連接,否則Backend不會主動停止流式傳輸。 為了解决這個問題,你可能需要引入額外的消息傳遞機制(如Redis Pub/Sub或RabbitMQ等消息隊列)來同步前後端的狀態

我要發表回答

立即登入回答