摘要
展示了 LangChain 與 LangGraph 的串流功能,旨在說明如何以分段式輸出和事件串流的方式提升基於大型語言模型 (LLM) 應用程式的效能。首先,它介紹了 LangChain 的 stream 和 astream 方法,並展示了如何使用 StrOutputParser 解析器來處理分段式的 LLM 輸出。接著,它深入探討了 LangGraph 的串流模式,包括 values 和 updates 模式,並展示了如何從最終節點串流 LLM 標記,以及如何使用多種串流模式進行配置。此外,它也介紹了 LangChain 的事件串流 API,並說明如何透過過濾事件來優化串流處理的效率。總體而言,這段程式碼強調了串流技術在打造回應靈敏的 AI 應用程式方面的重要性,並提供了實用的範例和技巧,幫助使用者充分發揮這些功能的潛力。
在這個教程中,我們將深入探討 LangChain 和 LangGraph 的串流(Streaming)功能,這對於打造反應靈敏的 AI 應用至關重要。本指南假設您已熟悉以下概念:
不熟悉的話,下面教程為手把手帶你執行與操作串流的重要性,本篇內容編排方式為先行介紹 LangChain 的使用方法後在套入 LangGraph 當中做使用。誠如 Day 3 介紹到 LangGraph 為 LangChain 延伸,實務上很多函數以及使用方式都是建立在開發人員已經熟悉 LangChain 下進行展開。
本章目錄如下:
invoke()
與 stream()
差別Stream_Events
高級串流技術values
和 updates
在基於大型語言模型(LLM)的應用中,串流技術扮演著舉足輕重的角色。它能夠顯著提升用戶體驗,讓應用感覺更加靈敏和即時。
LangChain 中的核心元素,如聊天模型、輸出解析器、提示模板、檢索器和代理等,都實現了 LangChain 可運行接口(Runnable Interface)。這個接口提供了兩種主要的串流方法:
sync stream
和 async astream
:這是默認的串流實現,用於串流鏈條的最終輸出。async astream_events
和 async astream_log
:這些方法能夠串流中間步驟和最終輸出。相關文件:LangChain/How-to-Guides/How to stream runnables
接下來,我們將深入探討這兩種方法,並學習如何靈活運用它們。
[技術亮點] LangChain 的串流功能不僅提高了應用的響應速度,還能讓開發者更靈活地處理和展示 AI 生成的內容。
為了簡化鏈的創建過程,LangChain 實現了一個"可運行"(Runnable)協議。許多 LangChain 組件都實現了這個協議,包括聊天模型、LLM、輸出解析器、檢索器、提示模板等。
這個標準接口使得定義和調用自定義鏈變得簡單直接。主要包括以下方法:
invoke
:對輸入調用鏈stream
:串流回應的片段batch
:對輸入列表調用鏈所有的 Runnable 物件都實作了一個同步方法 stream
和一個異步方法 astream
。
這些方法旨在以分塊(Chunk)形式串流最終輸出,並在每個塊可用時立即生成。
相關文件:LangChain Runnable interface
invoke()
方法是最直接的使用方式,會在模型生成完整回應後才返回完整結果。讓我們看一個實際例子:
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
model.invoke("請你講一個早安問候語")
[技術說明] invoke() 方法適用於對實時性要求不高的場景,或當您需要一次性獲取完整回應時使用。
相比之下,stream() 方法允許我們在生成過程中逐步獲取回應,這對提升用戶體驗至關重要。stream()
方法返回一個生成器,能夠在生成過程中逐步產生 token。
串流處理的實現需要程序中的每個步驟都能夠處理輸入流。這意味著每個組件都要能夠一次處理一個輸入塊,並生成相應的輸出塊。這種處理方式帶來了獨特的優勢和挑戰:
[技術要點] 串流的關鍵在於程序中的每個步驟都能夠處理輸入流,即一次處理一個輸入塊,並生成相應的輸出塊。它要求開發者重新思考數據處理流程,從整體處理轉向增量處理。
print(model.invoke("請你講一個早安問候語"))
print(model.stream("請你講一個早安問候語"))
如果您在異步環境中工作,可以考慮使用異步的 astream
API:
chunks = []
async for chunk in model.astream("請你告訴我 k8s 跟 docker 區別"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
[實用技巧] 使用 end="|" 和 flush=True 可以實現平滑的輸出效果,讓用戶感受到實時生成的過程。
chunks[1]
chunks[2]
chunks[1] + chunks[2]
[注意] 我們得到的是
AIMessageChunk
對象。這些塊在設計上是可累加的——只需簡單地將它們相加,就能得到到目前為止的完整響應狀態!
for chunk in model.stream("寫一首關於 k8s 的兒歌"):
print(chunk.content, end="", flush=True)
現在,讓我們將 Prompt、Model 和 OutputParser 結合起來,驗證串流效果。
[技術焦點] 我們將使用 StrOutputParser 來解析模型的輸出。這是一個簡單的解析器,它從 AIMessageChunk 中提取內容字段,給我們模型返回的 token。
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
async for chunk in chain.astream({"topic": "父母親"}):
print(chunk, end="|", flush=True)
[提示] LCEL 是一種聲明式方法,用於通過鏈接不同的 LangChain 原語來指定"程序"。使用 LCEL 創建的鏈條會自動實現 stream 和 astream,允許串流最終輸出。
StrOutputParser 用途就是把 Chunk 當中 content 提出==
Stream_Events
高級串流技術Event Streaming 是一個 beta API,可能會根據反饋進行調整。本指南演示的是 V2 API,需要 langchain-core >= 0.2 版本。如果要使用 V1 API 的話需要參考舊版 LangChain 官方文件,新版文件則需要點擊langChain v0.2 Concepts/Streaming 文件連結。
為了讓 astream_events
API 正常工作,請注意以下幾點:
async
(例如,異步工具等).astream()
而不是 .ainvoke
,以強制 LLM 串流 token以下是各種 Runnable 物件 可能發出的一些事件的參考表,主要有
[注意] 當串流正確實現時,可運行對象的輸入直到輸入流完全消耗後才能知道。這意味著輸入通常只包含在結束事件中,而不是開始事件中。
讓我們來看看聊天模型產生的事件:
events = []
async for event in model.astream_events("早安", version="v2"):
events.append(event)
讓我們看看前幾個事件和最後幾個事件。
可以注意到回傳 Message 當中不同 Chunk 所在的 event 是不同的,從一開始 on_chat_model_start
, on_chat_model_stream
, 到最後 on_chat_model_end
整理一下,只想要的 type 讓串流輸出方便識別。
async for event in model.astream_events("早安", version="v2"):
event_type = event["event"]
if event_type =="on_chat_model_start":
print("Streamgin Start", flush=False)
if event_type == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
#要考慮一下回傳沒有的情況
print(content, end="|", flush=False)
if event_type == "on_chat_model_end":
print("\nStreamgin End", flush=False)
讓我們重新審視使用串流輸出解析器的鏈範例,探索串流事件 API:
prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
parser_events = []
async for event in chain.astream_events({"topic": "父母親"}, version="v2"):
parser_events.append(event)
讓我們來觀察前六比事件。
rich.print(parser_events[:6])
當我們仔細檢視事件流的開始部分時,會發現一個有趣的現象:出現了三個不同的開始事件,而不是預期的兩個。這三個開始事件分別對應於:
on_chain_start
on_prompt_start
這種層次化的事件結構反映了整個處理流程的組織方式,讓我們能夠精確地追蹤每個組件的運作狀態。
讓我們通過實際操作來探索這個 API,特別關注模型和解析器的串流事件。為了聚焦於核心過程,我們將忽略開始事件
、結束事件
以及整個鏈條的事件。
num_events = 0
async for event in chain.astream_events(
{"topic": "父母親"},
version="v2",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"模型輸出 chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"解析器輸出 chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
[技術洞察]
on_chat_model_stream
和 on_parser_stream
事件,我們可以實時跟蹤生成過程和解析過程。由於這個 API 產生了大量事件,能夠過濾事件變得非常有用。你可以按組件 name
、組件 tags
或組件 type
進行過濾。
max_events = 0
async for event in chain.astream_events(
{"topic": "父母親"},
version="v2",
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
[提示] 注意事件名稱通常是
on_XXX_[start/stream/end]
這樣的格式,但在示例中只寫了 chat_model。這是因為內部過濾器會提取 XXX 部分。
你也可以將三者 by name, by tags, by type 一起混著使用,這通常在串連較多 Chain 場合下使用,下面只展示如何丟入使用
prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser().with_config({"run_name": "my_str_parser"})
max_events = 0
async for event in chain.astream_events(
{"topic": "父母親"},
version="v2",
include_names=["OAI_MODEL_GPT3.5"],
include_types=["chat_model", "prompt"],
include_tags=["my_str_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
通過本小節,我們深入探討了 LangChain 的串流功能,從基本的 invoke
和 stream
方法,到更高級的事件串流和過濾技術。這些工具和技巧將幫助你構建更加靈活、高效的 AI 應用,提供更好的用戶體驗。記住,選擇合適的串流方法取決於你的具體需求和應用場景。持續實踐和探索,你將能夠充分發揮這些強大功能的潛力!
關鍵:
在現代資料處理和人工智慧應用中,串流(Streaming)技術扮演著越來越重要的角色。它能夠實時處理大量資訊,提高系統的響應速度和效率。本小節將深入探討 LangGraph 提供的串流功能,為您揭示其強大的應用潛力。
LangGraph 支持多種串流模式,其中最主要的兩種是:
values
: 此模式會串流回圖的值,即每個節點調用後圖的完整狀態。updates
: 此模式會串流回圖的更新,即每個節點調用後圖狀態的變化。這些模式為開發者提供了靈活的選擇,以適應不同的應用場景和需求。
values()
與 updates()
首先,我們需要定義一些基本元素:
@tool
def recommend_night_market_food(preference: Literal["鹹食", "甜食"]):
"""推薦夜市小吃。"""
if preference == "鹹食":
return "推薦你試試饒河夜市的胡椒餅,外皮酥脆內餡多汁,是台北知名小吃!"
elif preference == "甜食":
return "來一份寧夏夜市的圓仔冰如何?Q彈的湯圓搭配清涼冰品,超級消暑!"
else:
raise AssertionError("未知偏好")
@tool
def recommend_bubble_tea(style: Literal["傳統", "創新"]):
"""推薦手搖飲。"""
if style == "傳統":
return "你一定要品嚐一下台中第四信用合作社對面的珍珠奶茶,香濃滑順,珍珠有嚼勁!"
elif style == "創新":
return "試試看西門町的芋圓冰沙拿鐵,結合了傳統芋圓和現代咖啡,口感層次豐富!"
else:
raise AssertionError("未知風格")
tools = [recommend_night_market_food, recommend_bubble_tea]
在這個設置中,我們定義了兩個簡單的美食查詢工具和一個 ChatOpenAI 模型。這些元素將用於構建我們的 LangGraph 應用。
values()
範例現在,讓我們看看如何使用 values
模式進行串流:
inputs = {"messages": [("human", "我想吃夜市的小吃,有什麼推薦嗎?")]}
async for chunk in graph.astream(inputs, stream_mode="values"):
#顯示最新訊息的內容,並且漂亮顯示出
chunk["messages"][-1].pretty_print()
這段程式碼將實時顯示圖的完整狀態,讓您能夠觀察到整個處理過程。
提醒:如果只想拿到最後 Agent 生成的結果,在相同模式下只要獲取最新也就是最後一個 chunk 就可以了呦
inputs = {"messages": [("human", "有什麼手搖飲推薦嗎?")]}
async for chunk in graph.astream(inputs, stream_mode="values"):
final_result = chunk
final_result["messages"][-1].pretty_print()
updates()
範例除了獲取完整狀態,我們還可以關注節點狀態的變化:
inputs = {"messages": [("human", "我想喝夜市的特色飲料,該去哪裡?")]}
async for chunk in graph.astream(inputs, stream_mode="updates"):
for node, values in chunk.items():
print(f"接收來自節點 '{node}' 的更新:")
values["messages"][-1].pretty_print()
# print(values)
# print("\n\n")
這種方法讓您能夠精確地追踪每個節點的變化,對於調試和優化非常有用。
提示:可以注意到
values
與updates
兩者在使用上的差異。想要知道 Message 的狀態的話選values
,想要關注每個節點的狀態的話,選擇updates
若想要同時取得的話,直接在 astream() 當中的 stream_mode 當中填入即可
inputs = {"messages": [("human", "請推薦一個夜市的小吃和一個飲料")]}
async for event, chunk in graph.astream(inputs, stream_mode=["values", "updates", "debug"]):
print(f"接收新的事件類型: {event}...")
print(chunk)
print("\n\n")
這種多模式配置為開發者提供了全方位的數據洞察,有助於更深入地理解和優化您的應用。
在許多情況下,我們可能只對最終結果感興趣。以下範例展示了如何從最終節點串流 LLM token:
@tool
def recommend_night_market_food(preference: Literal["鹹食", "甜食"]):
"""推薦夜市小吃。"""
if preference == "鹹食":
return "推薦你試試饒河夜市的胡椒餅,外皮酥脆內餡多汁,是台北知名小吃!"
elif preference == "甜食":
return "來一份寧夏夜市的圓仔冰如何?Q彈的湯圓搭配清涼冰品,超級消暑!"
else:
raise AssertionError("未知偏好")
@tool
def recommend_bubble_tea(style: Literal["傳統", "創新"]):
"""推薦手搖飲。"""
if style == "傳統":
return "你一定要品嚐一下台中第四信用合作社對面的珍珠奶茶,香濃滑順,珍珠有嚼勁!"
elif style == "創新":
return "試試看西門町的芋圓冰沙拿鐵,結合了傳統芋圓和現代咖啡,口感層次豐富!"
else:
raise AssertionError("未知風格")
tools = [recommend_night_market_food, recommend_bubble_tea]
def should_continue(state: MessagesState) -> Literal["tools", "final"]:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools"
return "final"
def call_model(state: MessagesState):
messages = state["messages"]
response = model.invoke(messages)
return {"messages": [response]}
def call_final_model(state: MessagesState):
messages = state["messages"]
last_ai_message = messages[-1]
response = final_model.invoke(
[
SystemMessage("美食部落客的口吻重新表達"),
HumanMessage(last_ai_message.content),
]
)
response.id = last_ai_message.id
return {"messages": [response]}
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("final", call_final_model)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
)
workflow.add_edge("tools", "agent")
workflow.add_edge("final", END)
app = workflow.compile()
然後,我們可以從最終節點獲取串流輸出:
inputs = {"messages": [("human", "請推薦一款傳統手搖飲")]}
async for chunk in app.astream(inputs, stream_mode="values"):
#顯示最新訊息的內容,並且漂亮顯示出
chunk["messages"][-1].pretty_print()
通過本教程,我們深入探討了 LangChain 和 LangGraph 的串流功能,從基本的 invoke 和 stream 方法,到更高級的事件串流和過濾技術。這些工具和技巧將幫助你構建更加靈活、高效的 AI 應用,提供卓越的用戶體驗。
關鍵精華:
本篇教學程式碼位於比賽用 Repo,記得多多操作