import and setup
# pip install llama-index-utils-workflow
# OPENAI_API_KEY
import os
from dotenv import find_dotenv, load_dotenv
_ = load_dotenv(find_dotenv())
from llama_index.core.workflow import Workflow
from llama_index.core.workflow import step
from llama_index.core.workflow import Event
from llama_index.core.workflow import StartEvent
from llama_index.core.workflow import StopEvent
from llama_index.core.workflow import Context
from llama_index.utils.workflow import draw_all_possible_flows
pip install llama-index-utils-workflow
,用來可視化定義出來的 workflow.env
下有 OPENAI_API_KEY,雖然這個 part2 才會用到workflow 概覽
我們這邊分成三個 steps:
query
sub_question
combine_answers
我們就三步
如果你覺得這幾步命名很混淆的話
class QueryEvent(Event):
question: str
class AnswerEvent(Event):
question: str
answer: str
class SubQuestionQueryEngine(Workflow):
@step
async def query(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# Fake subquestions gen
FAKE_NUM_SUB_QUESTION = 5
sub_questions = [f'q{i}' for i in range(FAKE_NUM_SUB_QUESTION)]
# get num_questions
num_question = len(sub_questions)
await ctx.store.set("num_question", len(sub_questions))
for q in sub_questions:
#self.send_event(QueryEvent(question=question))
print(f"send: {q}")
ctx.send_event(QueryEvent(question=q))
return None
@step
async def sub_question(self, ctx: Context, ev: QueryEvent) -> AnswerEvent:
print(f"Sub-question is {ev.question}")
# get fake answer
answer = f"answer of: {ev.question}"
return AnswerEvent(question=ev.question, answer=answer)
@step
async def combine_answers(
self, ctx: Context, ev: AnswerEvent
) -> StopEvent | None:
num_question = await ctx.store.get("num_question")
# wait until we receive all events
result = ctx.collect_events(ev, [AnswerEvent] * num_question)
if result is None:
print('combine_answers output None')
return None
# do something with all {num_question} together
print(result)
return StopEvent(result="Done")
ctx.send_event
把 sub_question 發出去
self.send_event
,不確定是不是版本問題,總之我用 ctx.send
才不會報錯'answer of {question}'
num_question
ctx.collect_events(ev, [AnswerEvent] * num_question)
來取 answer
None
,然後我們就不理他draw_all_possible_flows(
SubQuestionQueryEngine, filename="sub_question_query_engine.html"
)
w = SubQuestionQueryEngine(timeout=10, verbose=False)
result = await w.run()
print('---')
print(result)
send: q0
send: q1
send: q2
send: q3
send: q4
Sub-question is q0
Sub-question is q1
Sub-question is q2
Sub-question is q3
Sub-question is q4
combine_answers output None
combine_answers output None
combine_answers output None
combine_answers output None
[AnswerEvent(question='q0', answer='answer of: q0'), AnswerEvent(question='q1', answer='answer of: q1'), AnswerEvent(question='q2', answer='answer of: q2'), AnswerEvent(question='q3', answer='answer of: q3'), AnswerEvent(question='q4', answer='answer of: q4')]
---
Done
num_question
次,但只有最後一次才真的會做事首先是我們的範例考題
qset = {
"id": "113-1-1-med-surg",
"year": "113",
"time": "1",
"qid": "1",
"discipline": "內外科護理學",
"ans": "C",
"question": "有關多發性硬化症之診斷檢查,下列何者錯誤?",
"options": {
"A": "腦脊髓液分析可發現IgG抗體上升",
"B": "視覺誘發電位可觀察到受損的神經在傳導過程出現延遲和中斷",
"C": "超音波檢查可發現中樞神經系統髓鞘脫失",
"D": "核磁共振影像可用來確認多發性硬化症之斑塊"
},
"discipline_slug": "med-surg"
}
接著是我們的 prompt
question = f"以下為單選題,請從各選項中選擇最符合題意的答案:\n題幹: {qset['question']}.\n選項: \nA: {qset['options']['A']}; B: {qset['options']['B']}; C: {qset['options']['C']}; D: {qset['options']['D']}."
print(question)
prompt = f"""給定一個使用者的問題,請輸出一系列相關的子問題,讓所有子問題的答案合在一起後,
能完整回答該問題。
請只用純 JSON 格式回應,不要包含任何 Markdown,例如:
{{
"sub_questions": [
"舊金山的人口是多少?",
"舊金山的預算是多少?",
"舊金山的 GDP 是多少?"
]
}}
以下是使用者的問題:{question}
"""
code
import json
from llama_index.llms.openai import OpenAI
llm = OpenAI(
model="gpt-5-mini",
temperature=0,
json_mode=True
)
response = llm.complete(prompt)
print(json.loads(response.text))
result:
首先他確實是回傳了 json 給我們,這個後續我們格式確定可以加上更嚴格的限制
以回答的品質來說:
看來我們後面還要看看是不是有更靠譜的方法來幫我們生成子問題
Sub Question Query Engine as a workflow
Concurrent execution of workflows