iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0
Cloud Native

《30 天 Cloud Native:AI 行程規劃後端開發》系列 第 17

Day17 - 深入解析 Vertex AI SDK 的非同步請求瓶頸

  • 分享至 

  • xImage
  •  

昨天直接用 curl 測試 Vertex AI API 就能夠同時發送多個請求觸發 Auto-Scaling ,今天就來了解一下使用 Vertex AI Python SDK 不能同時發送的原因。

使用 ipdb 進行 Debug

為了快速找到 async_stream_query 在哪裡,可以使用 ipdb 來 Debug :

python -m ipdb remotetest.py

進入後對想看的地方下斷點:

https://ithelp.ithome.com.tw/upload/images/20250831/20178194SzYX6OeKqn.png

然後 run 就會開始程式,一開始會在第一行停下來,輸入 c 就能讓他繼續往下跑,在 Debug 模式下可能是因為要蒐集 Debug 資訊的關係,會跑得比較慢,等他跑到斷點後就可以下 step 進去 async_stream_query 內,但因為是 Async 的關係所以會比較多層,主要的呼叫流程如下:

- .../remotetest.py
- .../asyncio/runners.py(190)run()
- .../asyncio/runners.py(118)run()
- .../asyncio/base_events.py(641)run_until_complete()
- .../asyncio/base_events.py(608)run_forever()
- .../asyncio/base_events.py(1936)_run_once()
- .../asyncio/events.py(84)_run()
- .../remotetest.py(54)run_query()
- .../site-packages/vertexai/agent_engines/_agent_engines.py(1509)_method()

觀察執行順序

這種併發請求我只想到用 print 大法來看順序,所以直接在主要的兩個函數前後加上 print 來看是誰拖慢了整體速度:

    async def _method(self, **kwargs) -> AsyncIterable[Any]:
        print('prepare req')
        req = aip_types.StreamQueryReasoningEngineRequest(
                name=self.resource_name,
                input=kwargs,
                class_method=method_name,
            )
        print('start query')
        response = self.execution_api_client.stream_query_reasoning_engine(
            request=req,
        )
        print('finish query')
        for chunk in response:
            for parsed_json in _utils.yield_parsed_json(chunk):
                if parsed_json is not None:
                    yield parsed_json

    return _method

執行後發現在 stream_query_reasoning_engine 這邊會卡住,導致所有請求變成照順序逐一發送,再往下追會找到在 .../google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/client.pyReasoningEngineExecutionServiceClient 類別內有個 stream_query_reasoning_engine 會將請求準備好後用 grpc 送出去。

用 DeepWiki 快速問了一下後得知 grpc 會自動判斷使否使用 Async ,所以問題應該不在 grpc 內,如果仔細的看一下程式碼會發現呼叫 stream_query_reasoning_engine 的時候沒有用 await ,再去翻了翻其他用到 Async 的函數:

def _wrap_async_query_operation(method_name: str) -> Callable[..., Coroutine]:
    """Wraps an Agent Engine method, creating an async callable for `query` API.

    This function creates a callable object that executes the specified
    Agent Engine method asynchronously using the `query` API. It handles the
    creation of the API request and the processing of the API response.

    Args:
        method_name: The name of the Agent Engine method to call.
        doc: Documentation string for the method.

    Returns:
        A callable object that executes the method on the Agent Engine via
        the `query` API.
    """

    async def _method(self, **kwargs) -> _utils.JsonDict:
        response = await self.execution_async_client.query_reasoning_engine(
            request=aip_types.QueryReasoningEngineRequest(
                name=self.resource_name,
                input=kwargs,
                class_method=method_name,
            ),
        )
        output = _utils.to_dict(response)
        return output.get("output", output)

    return _method

這邊就有好好的用 await ,而且他是使用 self.execution_async_client 而不是 self.execution_api_client ,再翻了翻程式碼發現其實程式寫好了,只是要做些調整而已:

https://ithelp.ithome.com.tw/upload/images/20250831/201781948t8Zv05nRn.png

改好後就可以用 Vertex AI Python SDK 來同時發送請求給 Agent ,順便發一下 PR


上一篇
Day16 - 504 Timeout 背後的真相:逐步拆解 Agent 回應延遲問題
下一篇
Day18 - 170 元的部署實驗:Cloud Run 測試成功與權限控管全紀錄
系列文
《30 天 Cloud Native:AI 行程規劃後端開發》19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言