iT邦幫忙

2025 iThome 鐵人賽

DAY 13
0
生成式 AI

阿,又是一個RAG系列 第 13

Day12: SubQuestionQueryEngine(中): Streaming events 與 ReActAgent

  • 分享至 

  • xImage
  •  

Situation

  • 我們昨天架構了 SubQuestionQueryEngine as workflow 的整體工作流程
  • 並且測試了 prompt llm 來直接產生 list of subquestion
  • 今天主要關注在中間回答子問題的 Agent
  • 原始筆記本可以參考 Sub Question Query Engine as a workflow
    • 我們將修正了一些因為版本問題所導致的中間報錯
    • 我們將抽換本地文檔的檢索部分,改為用 Tavily 進行檢索
    • 我們預計要把 prompt 改為客製化的 Prompt

Task

  • 今天的任務很具體:
    • 首先我們要先學習一下 llama-index 的 workflow 中的 streaming events
      • 主要會用來協助我們查看 ReActAgent 中間步驟所發生的 event
    • 接著把學習到的知識套用到 從 llama_index.core.agent.workflow import 的 ReActAgent 上

Action

part1: streaming events

  1. 首先我們做一些必要的 import

    from llama_index.core.workflow import (
        StartEvent,
        StopEvent,
        Workflow,
        step,
        Event,
        Context,
    )
    
  2. Events

    class FirstEvent(Event):
        first_output: str
    
    
    class SecondEvent(Event):
        second_output: str
        response: str
    
    
    class ProgressEvent(Event):
        msg: str
    
    • 除了常規的 Event 外,注意這個 ProgressEvent Event
  3. Workflow

    class MyWorkflow(Workflow):
        @step
        async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
            ### 1. 我們可以在每個 step 的中間把 event 先用 ctx.write_event_to_stream 發出一個 ProgressEvent
            ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
            ###
            return FirstEvent(first_output="First step complete.")
    
        @step
        async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
            ctx.write_event_to_stream(ProgressEvent(msg="Step two is happening"))
            return SecondEvent(
                second_output="Second step complete, full response attached",
                response='this is llm response',
            )
    
        @step
        async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
            ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
            return StopEvent(result="Workflow complete.")
    
  • 整體流程是: (StartEvent) -> [step_one] -> (FirstEvent) -> [step_two] -> (SecondEvent) -> [step_three] -> (StopEvent)
  • 但在每一個 step 中間,我們都使用 ctx.write_event_to_stream 來寫出 ProgressEvent
  1. Run
  • code
    w = MyWorkflow(timeout=120, verbose=False)
    handler = w.run(first_input="Start the workflow.")
    async for ev in handler.stream_events():
        if isinstance(ev, ProgressEvent):
            print("[PROGRESS]", ev.msg)
        elif isinstance(ev, StopEvent):
            print("[DONE]", ev.result)
        else:
            print("[unknown event]")
    
  • result
    [PROGRESS] Step one is happening
    [PROGRESS] Step two is happening
    [PROGRESS] Step three is happening
    [DONE] Workflow complete.
    
  • 除了常規的 run workflow 以外
  • 我們使用 async for ev in handler.stream_events() 來捕獲上面用 ctx.write_event_to_stream 寫出來的 streaming event
    • 這邊除了有用 write_event_to_stream 寫出來的 event 會抓到以外,StopEvent 也會跑到這裡
  • 總之,如果一個 workflow 在寫的時候,有把這種 streaming events 寫出來,我們就可以在執行的時候用 handler.stream_events() 拿到每一步的資訊
    • 例如:
      • ReAct Agent 的 prompt, thought, Action
      • tool call 的 argument 以及 tool output

part2: ReActAgent

  1. 首先是一些必要的 import
    import os
    from dotenv import find_dotenv, load_dotenv
    _ = load_dotenv(find_dotenv())
    
    TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
    
    from llama_index.core.tools import FunctionTool
    from llama_index.core.agent.workflow import (
        FunctionAgent,
        ReActAgent,
    )
    
    
  • 我們需要 OPENAI_API_KEY 以及 TAVILY_API_KEY
  • 我們從 llama_index.core.agent.workflow import ReActAgent
  1. 接著是 tool 的部分,我們嘗試一下把自訂 function 做成 tool
  • code
    from llama_index.tools.tavily_research.base import TavilyToolSpec
    tavily_tool = TavilyToolSpec(
        api_key=TAVILY_API_KEY,
    )
    type(tavily_tool)  # llama_index.tools.tavily_research.base.TavilyToolSpec
    
    #tavily_function_tool = tavily_tool.to_tool_list()[0]
    #type(tavily_function_tool)  # llama_index.core.tools.function_tool.FunctionTool
    
    from typing import List, Dict
    from llama_index.core import Document
    from llama_index.core.tools import FunctionTool
    MAX_RESULTS = 3
    
    def tavily_search(query: str) -> List[Document]:
        """Search Tavily and return a list[Document], one per result."""
        docs = tavily_tool.search(query=query, max_results=MAX_RESULTS)
        return docs
    
    tavily_function_tool = FunctionTool.from_defaults(
        fn=tavily_search,
        name="tavily_search",
        description="Search Tavily and return a list of Document objects (one block per result).",
    )
    
  • 我們 import llama-index 整合的 TavilyToolSpec
  • 我們這邊用 FunctionTool 來把自訂的 python function 做成 tool 給 Agent 使用
  • 跟先前範例的差異是:
    • 本來的 Tavily_tool 會有兩個參數: (query, max_result)
    • 我們固定 max_result 等於 MAX_RESULTS,所以只剩 query 一個參數
  • 關於其他把自訂的 function 做成 tool 可以參考: examples_agent_react_agent
  1. ReAct Agent
  • code
    from llama_index.llms.openai import OpenAI
    from llama_index.core.workflow import Context
    
    llm = OpenAI(model="gpt-5-mini", temperature=0, is_streaming=False)  # streaming False for non-verified organisations
    agent = ReActAgent(tools=[tavily_function_tool], llm=llm, streaming=False, verbose=False)
    # Create a context to store the conversation history/session state
    ctx = Context(agent)
    
  • 一個事情是 non-verified organisations 在使用 gpt-5-mini 的時候沒有權限開 streaming,所以必須設 False
    • 所以我們才需要用 streaming event 看 ReActAgent 的中間結果
  • 第二個事情是我們把 Context 給 agent 讓他可以看到前面的 chatmessage
  1. RUN
  • code
    from llama_index.core.agent.workflow import AgentInput, AgentOutput, ToolCall, ToolCallResult
    
    sub_question = '關於多發性硬化症(MS)現今常用的診斷檢查有哪些?'
    handler = agent.run(sub_question, ctx=ctx)
    rvs = []
    async for ev in handler.stream_events(expose_internal=False):
        name = ev.__class__.__name__
        print(f"-----stream event: {name}")
        rvs.append((name, ev))
    
        if isinstance(ev, AgentInput):
            print(f"len of chat message: {len(ev.input)}")
        elif isinstance(ev, AgentOutput):
            print(ev.response.blocks[0].text)
        elif isinstance(ev, ToolCall):
            print(f"{ev.tool_name}: {ev.tool_kwargs}")
        elif isinstance(ev, ToolCallResult):
            num_rv = len(ev.tool_output.blocks)
            print(f"num_result: {num_rv}")
    response = await handler
    print(response)
    
  • part1 的學習就是為了讓我們看懂這段的呼叫
    • 主要就是如果 ReAct Agent 有把 stream event 寫出來
    • 如果是 AgentInput: 我們看一下 歷史對話有多長
      • history 包含 system prompt, prompt 都在這
    • 如果是 AgentOutput: 我們看 Agent 說了什麼
      • 這個就會是 thoght, action 等等
    • 如果是 ToolCall
      • 我們要看呼叫哪個 tool, 以及用什麼參數呼叫
    • 如果是 ToolCallResult
      • 我們看一下回傳了多少個結果
  • result
    • https://ithelp.ithome.com.tw/upload/images/20250927/20177855phcPzW0qKl.jpg
    • 第一次的 AgentInput 有 2 個 message,是 system prompt 和 query
    • 第二次的 AgentInput 有 4 個 message,是 上一輪的 agent output 和 tool 的 output
      • 這邊他是把 檢索回來的 context 用 '\n\n' 直接拼起來
  1. 然後我們用 FunctionAgent 把一樣的問題再跑一次
  • code
    func_agent = FunctionAgent(tools=[tavily_function_tool], 
                               system_prompt='你是一個網路查詢助理,會先使用網路搜索工具來才回答問題。',
                               llm=llm, streaming=False, verbose=False)
    ctx = Context(func_agent)
    from llama_index.core.agent.workflow import AgentInput, AgentOutput, ToolCall, ToolCallResult
    
    sub_question = '關於多發性硬化症(MS)現今常用的診斷檢查有哪些?'
    handler = func_agent.run(sub_question, ctx=ctx)
    rvs = []
    async for ev in handler.stream_events(expose_internal=False):
        name = ev.__class__.__name__
        print(f"-----stream event: {name}")
        rvs.append((name, ev))
    
        if isinstance(ev, AgentInput):
            print(f"len of chat message: {len(ev.input)}")
        elif isinstance(ev, AgentOutput):
            if len(ev.response.blocks):
                print(ev.response.blocks[0].text)
            else:
                print(ev.response.additional_kwargs['tool_calls'])
        elif isinstance(ev, ToolCall):
            print(f"{ev.tool_name}: {ev.tool_kwargs}")
        elif isinstance(ev, ToolCallResult):
            num_rv = len(ev.tool_output.blocks)
            print(f"num_result: {num_rv}")
    response = await handler
    
  • result
    • https://ithelp.ithome.com.tw/upload/images/20250927/20177855kJtJ3iB5EB.jpg
    • 這邊看起來沒有誰好誰壞,或許之後兩個都可以抽換著造 dataset 再來評估
    • 不過現在知道用 stream_event 呼叫之後,整體 Agent 的執行過程看起來更透明了

Summary

  • 我們今天學習了 workflow 中的 streaming_event
  • 具體來說在寫 workflow 的時候,如何把中間的狀態發送出去
  • 以及在執行 workflow 的時候,如何拿到中間發送的狀態
  • 湊起來可以讓我們在呼叫 agent 的時候感覺更透明
  • 我們學習了把自訂的 python function 包成 tool 給 agent 用
  • 我們嘗試使用 ReActAgent 以及 Function Agent 來查網路並且回答一個子問題

其他

  • 本來想說 SubQuestionQueryEngine 寫兩篇就結束了,真的開始寫才發現這邊的內容出乎意料的多,只好又切成上中下三篇
  • 實際修改 ReAct prompt 的部分今天沒有寫到
  • 我們今天主要是探索 ReActAgent 的使用部分,也許我們後面再寫一篇用 workflow 來把 ReActAgent 給架出來
  • 就純是挖坑,有空再補

Reference:


上一篇
Day11: SubQuestionQueryEngine(上): SubQuestion 與 Workflow
下一篇
Day13: SubQuestionQueryEngine(下): combine_answer 與 update_prompts
系列文
阿,又是一個RAG14
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言