昨天直接用 curl
測試 Vertex AI API 就能夠同時發送多個請求觸發 Auto-Scaling ,今天就來了解一下使用 Vertex AI Python SDK 不能同時發送的原因。
為了快速找到 async_stream_query
在哪裡,可以使用 ipdb
來 Debug :
python -m ipdb remotetest.py
進入後對想看的地方下斷點:
然後 run
就會開始程式,一開始會在第一行停下來,輸入 c
就能讓他繼續往下跑,在 Debug 模式下可能是因為要蒐集 Debug 資訊的關係,會跑得比較慢,等他跑到斷點後就可以下 step
進去 async_stream_query
內,但因為是 Async 的關係所以會比較多層,主要的呼叫流程如下:
- .../remotetest.py
- .../asyncio/runners.py(190)run()
- .../asyncio/runners.py(118)run()
- .../asyncio/base_events.py(641)run_until_complete()
- .../asyncio/base_events.py(608)run_forever()
- .../asyncio/base_events.py(1936)_run_once()
- .../asyncio/events.py(84)_run()
- .../remotetest.py(54)run_query()
- .../site-packages/vertexai/agent_engines/_agent_engines.py(1509)_method()
這種併發請求我只想到用 print
大法來看順序,所以直接在主要的兩個函數前後加上 print
來看是誰拖慢了整體速度:
async def _method(self, **kwargs) -> AsyncIterable[Any]:
print('prepare req')
req = aip_types.StreamQueryReasoningEngineRequest(
name=self.resource_name,
input=kwargs,
class_method=method_name,
)
print('start query')
response = self.execution_api_client.stream_query_reasoning_engine(
request=req,
)
print('finish query')
for chunk in response:
for parsed_json in _utils.yield_parsed_json(chunk):
if parsed_json is not None:
yield parsed_json
return _method
執行後發現在 stream_query_reasoning_engine
這邊會卡住,導致所有請求變成照順序逐一發送,再往下追會找到在 .../google/cloud/aiplatform_v1/services/reasoning_engine_execution_service/client.py
的 ReasoningEngineExecutionServiceClient
類別內有個 stream_query_reasoning_engine
會將請求準備好後用 grpc
送出去。
用 DeepWiki 快速問了一下後得知 grpc
會自動判斷使否使用 Async ,所以問題應該不在 grpc
內,如果仔細的看一下程式碼會發現呼叫 stream_query_reasoning_engine
的時候沒有用 await
,再去翻了翻其他用到 Async 的函數:
def _wrap_async_query_operation(method_name: str) -> Callable[..., Coroutine]:
"""Wraps an Agent Engine method, creating an async callable for `query` API.
This function creates a callable object that executes the specified
Agent Engine method asynchronously using the `query` API. It handles the
creation of the API request and the processing of the API response.
Args:
method_name: The name of the Agent Engine method to call.
doc: Documentation string for the method.
Returns:
A callable object that executes the method on the Agent Engine via
the `query` API.
"""
async def _method(self, **kwargs) -> _utils.JsonDict:
response = await self.execution_async_client.query_reasoning_engine(
request=aip_types.QueryReasoningEngineRequest(
name=self.resource_name,
input=kwargs,
class_method=method_name,
),
)
output = _utils.to_dict(response)
return output.get("output", output)
return _method
這邊就有好好的用 await
,而且他是使用 self.execution_async_client
而不是 self.execution_api_client
,再翻了翻程式碼發現其實程式寫好了,只是要做些調整而已:
改好後就可以用 Vertex AI Python SDK 來同時發送請求給 Agent ,順便發一下 PR。