昨天把 3 個 state 大致架構寫了一下,今天來把 LLM 加進去
今天內容主要 focus 在實作,補一下 LLM Agent 最主要的 component
先實作 stream_response function
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def stream_response(model_name: str, messages: list[Message]):
stream = await client.chat.completions.create(
model=model_name, messages=messages, stream=True
)
async for chunk in stream:
yield chunk.choices[0].delta.content
這樣就能 streaming 的輸出文字了。
由於需要請 llm 輸出 action,這邊使用了一個套件,專門修復 llm 輸出的 schema 可能出錯的情形
https://pypi.org/project/json-repair/
還有 demo site 可以玩 https://mangiucugna.github.io/json_repair/
from json_repair import repair_json
def parse_json_response(response: str) -> dict:
try:
repaired = repair_json(response, return_objects=True)
return repaired if isinstance(repaired, dict) else {}
except Exception as e:
print(f"Error repairing JSON: {e}")
return {}
class Action(BaseModel):
name: str
args: dict[str, Any]
後來把 state 都改成 statless 的狀態,想法是 state 在整個 inference 的過程中是不需要存東西的,同樣的 state 會根據不同的 env 來決定當下的行為。
Reasoning state
class ReasoningState(State):
async def _run(self, env: "MemoryEnv") -> AsyncIterator[str]:
generated_response = ""
# streaming output here
messages = await get_messages(env)
async for chunk in stream_response(
model_name=REASONING_MODEL, messages=messages
):
if chunk:
generated_response += chunk
yield chunk
# update env, we'll discuss this in the future
env.update(generated_response)
# parse llm predicted action, and assign it into environment
if action := parse_json_response(generated_response):
env.next_action = Action(**action)
else:
env.next_action = None
async def _transition(self, env: "MemoryEnv") -> None:
if env.next_action:
env.set_state(StateType.ACTION)
else:
env.set_state(StateType.ANSWER)
Parse function 最主要用在這邊
Answer state
class AnswerState(State):
async def _run(self, env: "MemoryEnv") -> AsyncIterator[str]:
generated_response = ""
# streaming output
messages = await get_messages(env)
async for chunk in llm.stream_response(
model_name=ANSERING_MODEL, messages=messages
):
if chunk:
generated_response += chunk
yield chunk
env.update(generated_response)
async def _transition(self, env: "MemoryEnv") -> None:
env.done = True
相比於昨天,慢慢有完整 state 的雛型了,明天繼續完成最重要的 env
ref: