llama_index.core.agent.workflow import
的 ReActAgent 上
首先我們做一些必要的 import
from llama_index.core.workflow import (
StartEvent,
StopEvent,
Workflow,
step,
Event,
Context,
)
Events
class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
response: str
class ProgressEvent(Event):
msg: str
Workflow
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
### 1. 我們可以在每個 step 的中間把 event 先用 ctx.write_event_to_stream 發出一個 ProgressEvent
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
###
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step two is happening"))
return SecondEvent(
second_output="Second step complete, full response attached",
response='this is llm response',
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")
ctx.write_event_to_stream
來寫出 ProgressEvent
w = MyWorkflow(timeout=120, verbose=False)
handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print("[PROGRESS]", ev.msg)
elif isinstance(ev, StopEvent):
print("[DONE]", ev.result)
else:
print("[unknown event]")
[PROGRESS] Step one is happening
[PROGRESS] Step two is happening
[PROGRESS] Step three is happening
[DONE] Workflow complete.
async for ev in handler.stream_events()
來捕獲上面用 ctx.write_event_to_stream
寫出來的 streaming event
write_event_to_stream
寫出來的 event 會抓到以外,StopEvent
也會跑到這裡handler.stream_events()
拿到每一步的資訊
import os
from dotenv import find_dotenv, load_dotenv
_ = load_dotenv(find_dotenv())
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import (
FunctionAgent,
ReActAgent,
)
llama_index.core.agent.workflow
import ReActAgent
from llama_index.core.agent import ReActAgent
from llama_index.tools.tavily_research.base import TavilyToolSpec
tavily_tool = TavilyToolSpec(
api_key=TAVILY_API_KEY,
)
type(tavily_tool) # llama_index.tools.tavily_research.base.TavilyToolSpec
#tavily_function_tool = tavily_tool.to_tool_list()[0]
#type(tavily_function_tool) # llama_index.core.tools.function_tool.FunctionTool
from typing import List, Dict
from llama_index.core import Document
from llama_index.core.tools import FunctionTool
MAX_RESULTS = 3
def tavily_search(query: str) -> List[Document]:
"""Search Tavily and return a list[Document], one per result."""
docs = tavily_tool.search(query=query, max_results=MAX_RESULTS)
return docs
tavily_function_tool = FunctionTool.from_defaults(
fn=tavily_search,
name="tavily_search",
description="Search Tavily and return a list of Document objects (one block per result).",
)
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import Context
llm = OpenAI(model="gpt-5-mini", temperature=0, is_streaming=False) # streaming False for non-verified organisations
agent = ReActAgent(tools=[tavily_function_tool], llm=llm, streaming=False, verbose=False)
# Create a context to store the conversation history/session state
ctx = Context(agent)
from llama_index.core.agent.workflow import AgentInput, AgentOutput, ToolCall, ToolCallResult
sub_question = '關於多發性硬化症(MS)現今常用的診斷檢查有哪些?'
handler = agent.run(sub_question, ctx=ctx)
rvs = []
async for ev in handler.stream_events(expose_internal=False):
name = ev.__class__.__name__
print(f"-----stream event: {name}")
rvs.append((name, ev))
if isinstance(ev, AgentInput):
print(f"len of chat message: {len(ev.input)}")
elif isinstance(ev, AgentOutput):
print(ev.response.blocks[0].text)
elif isinstance(ev, ToolCall):
print(f"{ev.tool_name}: {ev.tool_kwargs}")
elif isinstance(ev, ToolCallResult):
num_rv = len(ev.tool_output.blocks)
print(f"num_result: {num_rv}")
response = await handler
print(response)
'\n\n'
直接拼起來func_agent = FunctionAgent(tools=[tavily_function_tool],
system_prompt='你是一個網路查詢助理,會先使用網路搜索工具來才回答問題。',
llm=llm, streaming=False, verbose=False)
ctx = Context(func_agent)
from llama_index.core.agent.workflow import AgentInput, AgentOutput, ToolCall, ToolCallResult
sub_question = '關於多發性硬化症(MS)現今常用的診斷檢查有哪些?'
handler = func_agent.run(sub_question, ctx=ctx)
rvs = []
async for ev in handler.stream_events(expose_internal=False):
name = ev.__class__.__name__
print(f"-----stream event: {name}")
rvs.append((name, ev))
if isinstance(ev, AgentInput):
print(f"len of chat message: {len(ev.input)}")
elif isinstance(ev, AgentOutput):
if len(ev.response.blocks):
print(ev.response.blocks[0].text)
else:
print(ev.response.additional_kwargs['tool_calls'])
elif isinstance(ev, ToolCall):
print(f"{ev.tool_name}: {ev.tool_kwargs}")
elif isinstance(ev, ToolCallResult):
num_rv = len(ev.tool_output.blocks)
print(f"num_result: {num_rv}")
response = await handler