iT邦幫忙

2024 iThome 鐵人賽

DAY 5
0
生成式 AI

2024 年用 LangGraph 從零開始實現 Agentic AI System系列 第 5

【Day 5】- LangChain 與 LangGraph 串流技術深度探索

  • 分享至 

  • xImage
  •  

摘要
展示了 LangChain 與 LangGraph 的串流功能,旨在說明如何以分段式輸出和事件串流的方式提升基於大型語言模型 (LLM) 應用程式的效能。首先,它介紹了 LangChain 的 stream 和 astream 方法,並展示了如何使用 StrOutputParser 解析器來處理分段式的 LLM 輸出。接著,它深入探討了 LangGraph 的串流模式,包括 values 和 updates 模式,並展示了如何從最終節點串流 LLM 標記,以及如何使用多種串流模式進行配置。此外,它也介紹了 LangChain 的事件串流 API,並說明如何透過過濾事件來優化串流處理的效率。總體而言,這段程式碼強調了串流技術在打造回應靈敏的 AI 應用程式方面的重要性,並提供了實用的範例和技巧,幫助使用者充分發揮這些功能的潛力。

前言

img

在這個教程中,我們將深入探討 LangChain 和 LangGraph 的串流(Streaming)功能,這對於打造反應靈敏的 AI 應用至關重要。本指南假設您已熟悉以下概念:

  • 聊天模型(Chat models)
  • LangChain 表達語言(LangChain Expression Language)
  • 輸出解析器(Output parsers)

不熟悉的話,下面教程為手把手帶你執行與操作串流的重要性,本篇內容編排方式為先行介紹 LangChain 的使用方法後在套入 LangGraph 當中做使用。誠如 Day 3 介紹到 LangGraph 為 LangChain 延伸,實務上很多函數以及使用方式都是建立在開發人員已經熟悉 LangChain 下進行展開。

本章目錄如下:

  • LangChain 環節
      1. 為什麼串流如此重要?
      1. 深入了解 Chat Model 的互動機制:invoke()stream() 差別
      1. 探索 Stream_Events 高級串流技術
  • LangGraph 環節
      1. 多種串流模式介紹:valuesupdates
      1. 最終節點串流實作

===LangChain 環節===

1. 為什麼串流如此重要?

在基於大型語言模型(LLM)的應用中,串流技術扮演著舉足輕重的角色。它能夠顯著提升用戶體驗,讓應用感覺更加靈敏和即時。

LangChain 中的核心元素,如聊天模型、輸出解析器、提示模板、檢索器和代理等,都實現了 LangChain 可運行接口(Runnable Interface)。這個接口提供了兩種主要的串流方法:

  1. sync streamasync astream:這是默認的串流實現,用於串流鏈條的最終輸出。
  2. async astream_eventsasync astream_log:這些方法能夠串流中間步驟和最終輸出。

相關文件:LangChain/How-to-Guides/How to stream runnables

接下來,我們將深入探討這兩種方法,並學習如何靈活運用它們。

[技術亮點] LangChain 的串流功能不僅提高了應用的響應速度,還能讓開發者更靈活地處理和展示 AI 生成的內容。

2. 深入了解 Chat Model 的互動機制

為了簡化鏈的創建過程,LangChain 實現了一個"可運行"(Runnable)協議。許多 LangChain 組件都實現了這個協議,包括聊天模型、LLM、輸出解析器、檢索器、提示模板等。

這個標準接口使得定義和調用自定義鏈變得簡單直接。主要包括以下方法:

  • invoke:對輸入調用鏈
  • stream:串流回應的片段
  • batch:對輸入列表調用鏈

所有的 Runnable 物件都實作了一個同步方法 stream 和一個異步方法 astream

這些方法旨在以分塊(Chunk)形式串流最終輸出,並在每個塊可用時立即生成。

相關文件:LangChain Runnable interface

2.1 invoke:同步調用方法解析

invoke() 方法是最直接的使用方式,會在模型生成完整回應後才返回完整結果。讓我們看一個實際例子:

from langchain_openai import ChatOpenAI
model = ChatOpenAI()
model.invoke("請你講一個早安問候語")

img

[技術說明] invoke() 方法適用於對實時性要求不高的場景,或當您需要一次性獲取完整回應時使用。

2.2 Stream:串流方法的魅力

相比之下,stream() 方法允許我們在生成過程中逐步獲取回應,這對提升用戶體驗至關重要。stream() 方法返回一個生成器,能夠在生成過程中逐步產生 token。

串流處理的實現需要程序中的每個步驟都能夠處理輸入流。這意味著每個組件都要能夠一次處理一個輸入塊,並生成相應的輸出塊。這種處理方式帶來了獨特的優勢和挑戰:

  1. 連續性處理:串流允許系統在接收到完整輸入之前就開始處理和輸出,大大提高了響應速度和效率。
  2. 複雜度變化:串流處理的複雜度可能差異很大:
  • 簡單任務:如直接輸出 LLM 生成的 token。
  • 複雜任務:如在 JSON 結果完全生成之前就開始串流其部分內容。
  1. 靈活性與挑戰:這種方法提供了極大的靈活性,但也帶來了編程上的挑戰,特別是在處理結構化數據或需要整體上下文的任務時。
  2. 最佳起點:要開始探索串流技術,最好的切入點就是 LLM 應用中最核心的組件——LLM 本身。這是因為 LLM 天生就適合token級別的串流輸出。

[技術要點] 串流的關鍵在於程序中的每個步驟都能夠處理輸入流,即一次處理一個輸入塊,並生成相應的輸出塊。它要求開發者重新思考數據處理流程,從整體處理轉向增量處理。

print(model.invoke("請你講一個早安問候語"))
print(model.stream("請你講一個早安問候語"))

img

如果您在異步環境中工作,可以考慮使用異步的 astream API:

chunks = []
async for chunk in model.astream("請你告訴我 k8s 跟 docker 區別"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

img

[實用技巧] 使用 end="|" 和 flush=True 可以實現平滑的輸出效果,讓用戶感受到實時生成的過程。

chunks[1]
chunks[2]
chunks[1] + chunks[2]

img

[注意] 我們得到的是 AIMessageChunk 對象。這些塊在設計上是可累加的——只需簡單地將它們相加,就能得到到目前為止的完整響應狀態!

for chunk in model.stream("寫一首關於 k8s 的兒歌"):
    print(chunk.content, end="", flush=True)

img

2.3 結合 LangChain 表達語言實現持續串流回應

現在,讓我們將 Prompt、Model 和 OutputParser 結合起來,驗證串流效果。

[技術焦點] 我們將使用 StrOutputParser 來解析模型的輸出。這是一個簡單的解析器,它從 AIMessageChunk 中提取內容字段,給我們模型返回的 token。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "父母親"}):
    print(chunk, end="|", flush=True)

img

[提示] LCEL 是一種聲明式方法,用於通過鏈接不同的 LangChain 原語來指定"程序"。使用 LCEL 創建的鏈條會自動實現 stream 和 astream,允許串流最終輸出。

StrOutputParser 用途就是把 Chunk 當中 content 提出==

3. 探索 Stream_Events 高級串流技術

Event Streaming 是一個 beta API,可能會根據反饋進行調整。本指南演示的是 V2 API,需要 langchain-core >= 0.2 版本。如果要使用 V1 API 的話需要參考舊版 LangChain 官方文件,新版文件則需要點擊langChain v0.2 Concepts/Streaming 文件連結

為了讓 astream_events API 正常工作,請注意以下幾點:

  • 盡可能在整個代碼中使用 async(例如,異步工具等)
  • 在定義自定義函數 runnalbes 時傳播 callbacks
  • 當不使用 LCEL 而使用可運行對象時,確保調用 LLM 的 .astream() 而不是 .ainvoke,以強制 LLM 串流 token

3.1 Stream_Events 事件參考指南

以下是各種 Runnable 物件 可能發出的一些事件的參考表,主要有

img

[注意] 當串流正確實現時,可運行對象的輸入直到輸入流完全消耗後才能知道。這意味著輸入通常只包含在結束事件中,而不是開始事件中。

3.2 透過 Chat Model 了解運作機制

讓我們來看看聊天模型產生的事件:

events = []
async for event in model.astream_events("早安", version="v2"):
    events.append(event)

讓我們看看前幾個事件和最後幾個事件。

img

可以注意到回傳 Message 當中不同 Chunk 所在的 event 是不同的,從一開始 on_chat_model_start, on_chat_model_stream, 到最後 on_chat_model_end

整理一下,只想要的 type 讓串流輸出方便識別。

async for event in model.astream_events("早安", version="v2"):
    event_type = event["event"]
    if event_type =="on_chat_model_start":
        print("Streamgin Start", flush=False)
    if event_type == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
          #要考慮一下回傳沒有的情況
          print(content, end="|", flush=False)
    if event_type == "on_chat_model_end":
        print("\nStreamgin End", flush=False)

3.3 結合 Parser 整理串流輸出格式

讓我們重新審視使用串流輸出解析器的鏈範例,探索串流事件 API:

prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

parser_events = []
async for event in chain.astream_events({"topic": "父母親"}, version="v2"):
    parser_events.append(event)

讓我們來觀察前六比事件。

rich.print(parser_events[:6])

img

當我們仔細檢視事件流的開始部分時,會發現一個有趣的現象:出現了三個不同的開始事件,而不是預期的兩個。這三個開始事件分別對應於:

  1. 整個鏈(模型 + 解析器)的啟動 on_chain_start
  2. 模型的初始化
  3. 解析器的準備階段 on_prompt_start

這種層次化的事件結構反映了整個處理流程的組織方式,讓我們能夠精確地追蹤每個組件的運作狀態。

讓我們通過實際操作來探索這個 API,特別關注模型和解析器的串流事件。為了聚焦於核心過程,我們將忽略開始事件結束事件以及整個鏈條的事件。

num_events = 0

async for event in chain.astream_events(
    {"topic": "父母親"},
    version="v2",
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(
            f"模型輸出 chunk: {repr(event['data']['chunk'].content)}",
            flush=True,
        )
    if kind == "on_parser_stream":
        print(f"解析器輸出 chunk: {event['data']['chunk']}", flush=True)
    num_events += 1
    if num_events > 30:
        # Truncate the output
        print("...")
        break

img

[技術洞察]

  1. 事件分層:這種分層結構使我們能夠分別監控和分析鏈、模型和解析器的行為。
  2. 實時追蹤:通過觀察 on_chat_model_streamon_parser_stream 事件,我們可以實時跟蹤生成過程和解析過程。
  3. 性能優化機會:這種細粒度的事件流為性能分析和優化提供了寶貴的數據源。

3.4 串流事件過濾技巧

由於這個 API 產生了大量事件,能夠過濾事件變得非常有用。你可以按組件 name、組件 tags 或組件 type 進行過濾。

3.4.1 按類型(type)過濾

max_events = 0
async for event in chain.astream_events(
    {"topic": "父母親"},
    version="v2",
    include_types=["chat_model"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

img

[提示] 注意事件名稱通常是 on_XXX_[start/stream/end] 這樣的格式,但在示例中只寫了 chat_model。這是因為內部過濾器會提取 XXX 部分。

3.4.2 綜合過濾:按類型(type)、名稱(name)和標籤(tags)

你也可以將三者 by name, by tags, by type 一起混著使用,這通常在串連較多 Chain 場合下使用,下面只展示如何丟入使用

prompt = ChatPromptTemplate.from_template("講一個早安問候語關於 {topic}")
parser = StrOutputParser().with_config({"run_name": "my_str_parser"})

max_events = 0
async for event in chain.astream_events(
    {"topic": "父母親"},
    version="v2",
    include_names=["OAI_MODEL_GPT3.5"],
    include_types=["chat_model", "prompt"],
    include_tags=["my_str_parser"],
):
    print(event)
    max_events += 1
    if max_events > 10:
        # Truncate output
        print("...")
        break

img

LangChain 串流功能結論

通過本小節,我們深入探討了 LangChain 的串流功能,從基本的 invokestream 方法,到更高級的事件串流和過濾技術。這些工具和技巧將幫助你構建更加靈活、高效的 AI 應用,提供更好的用戶體驗。記住,選擇合適的串流方法取決於你的具體需求和應用場景。持續實踐和探索,你將能夠充分發揮這些強大功能的潛力!

關鍵:

  • 串流技術對於提升AI應用的響應性至關重要
  • LangChain提供了多種串流方法,適用於不同場景
  • 事件串流API允許更細粒度的控制和監控
  • 合理使用過濾技術可以優化串流事件的處理效率

===LangGraph 環節===

LangGraph 串流應用:探索高效的資料處理方案

在現代資料處理和人工智慧應用中,串流(Streaming)技術扮演著越來越重要的角色。它能夠實時處理大量資訊,提高系統的響應速度和效率。本小節將深入探討 LangGraph 提供的串流功能,為您揭示其強大的應用潛力。

1. LangGraph 串流模式概覽

LangGraph 支持多種串流模式,其中最主要的兩種是:

  1. values: 此模式會串流回圖的值,即每個節點調用後圖的完整狀態。
  2. updates: 此模式會串流回圖的更新,即每個節點調用後圖狀態的變化。

這些模式為開發者提供了靈活的選擇,以適應不同的應用場景和需求。

1.1 探索 LangGraph 串流模式的兩種方法: values()updates()

1.1.1 準備工作:定義工具和模型

首先,我們需要定義一些基本元素:

@tool
def recommend_night_market_food(preference: Literal["鹹食", "甜食"]):
    """推薦夜市小吃。"""
    if preference == "鹹食":
        return "推薦你試試饒河夜市的胡椒餅,外皮酥脆內餡多汁,是台北知名小吃!"
    elif preference == "甜食":
        return "來一份寧夏夜市的圓仔冰如何?Q彈的湯圓搭配清涼冰品,超級消暑!"
    else:
        raise AssertionError("未知偏好")

@tool
def recommend_bubble_tea(style: Literal["傳統", "創新"]):
    """推薦手搖飲。"""
    if style == "傳統":
        return "你一定要品嚐一下台中第四信用合作社對面的珍珠奶茶,香濃滑順,珍珠有嚼勁!"
    elif style == "創新":
        return "試試看西門町的芋圓冰沙拿鐵,結合了傳統芋圓和現代咖啡,口感層次豐富!"
    else:
        raise AssertionError("未知風格")

tools = [recommend_night_market_food, recommend_bubble_tea]

在這個設置中,我們定義了兩個簡單的美食查詢工具和一個 ChatOpenAI 模型。這些元素將用於構建我們的 LangGraph 應用。

1.2 法ㄧ、values()範例

現在,讓我們看看如何使用 values 模式進行串流:


inputs = {"messages": [("human", "我想吃夜市的小吃,有什麼推薦嗎?")]}
async for chunk in graph.astream(inputs, stream_mode="values"):
    #顯示最新訊息的內容,並且漂亮顯示出
    chunk["messages"][-1].pretty_print()

img

這段程式碼將實時顯示圖的完整狀態,讓您能夠觀察到整個處理過程。

提醒:如果只想拿到最後 Agent 生成的結果,在相同模式下只要獲取最新也就是最後一個 chunk 就可以了呦

inputs = {"messages": [("human", "有什麼手搖飲推薦嗎?")]}
async for chunk in graph.astream(inputs, stream_mode="values"):
    final_result = chunk

final_result["messages"][-1].pretty_print()

img

1.3 法二、updates()範例

除了獲取完整狀態,我們還可以關注節點狀態的變化:

inputs = {"messages": [("human", "我想喝夜市的特色飲料,該去哪裡?")]}
async for chunk in graph.astream(inputs, stream_mode="updates"):
    for node, values in chunk.items():
        print(f"接收來自節點 '{node}' 的更新:")
        values["messages"][-1].pretty_print()
        # print(values)
        # print("\n\n")

img

這種方法讓您能夠精確地追踪每個節點的變化,對於調試和優化非常有用。

提示:可以注意到 valuesupdates 兩者在使用上的差異。想要知道 Message 的狀態的話選 values,想要關注每個節點的狀態的話,選擇 updates

1.4 多模式串流配置示例

若想要同時取得的話,直接在 astream() 當中的 stream_mode 當中填入即可

inputs = {"messages": [("human", "請推薦一個夜市的小吃和一個飲料")]}
async for event, chunk in graph.astream(inputs, stream_mode=["values", "updates", "debug"]):
    print(f"接收新的事件類型: {event}...")
    print(chunk)
    print("\n\n")

img

這種多模式配置為開發者提供了全方位的數據洞察,有助於更深入地理解和優化您的應用。

2. 從最終節點串流

在許多情況下,我們可能只對最終結果感興趣。以下範例展示了如何從最終節點串流 LLM token:

2.1 定義模型和工具

@tool
def recommend_night_market_food(preference: Literal["鹹食", "甜食"]):
    """推薦夜市小吃。"""
    if preference == "鹹食":
        return "推薦你試試饒河夜市的胡椒餅,外皮酥脆內餡多汁,是台北知名小吃!"
    elif preference == "甜食":
        return "來一份寧夏夜市的圓仔冰如何?Q彈的湯圓搭配清涼冰品,超級消暑!"
    else:
        raise AssertionError("未知偏好")
@tool
def recommend_bubble_tea(style: Literal["傳統", "創新"]):
    """推薦手搖飲。"""
    if style == "傳統":
        return "你一定要品嚐一下台中第四信用合作社對面的珍珠奶茶,香濃滑順,珍珠有嚼勁!"
    elif style == "創新":
        return "試試看西門町的芋圓冰沙拿鐵,結合了傳統芋圓和現代咖啡,口感層次豐富!"
    else:
        raise AssertionError("未知風格")

tools = [recommend_night_market_food, recommend_bubble_tea]

2.2 定義節點函數

def should_continue(state: MessagesState) -> Literal["tools", "final"]:
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return "final"
def call_model(state: MessagesState):
    messages = state["messages"]
    response = model.invoke(messages)
    return {"messages": [response]}
def call_final_model(state: MessagesState):
    messages = state["messages"]
    last_ai_message = messages[-1]
    response = final_model.invoke(
        [
            SystemMessage("美食部落客的口吻重新表達"),
            HumanMessage(last_ai_message.content),
        ]
    )
    response.id = last_ai_message.id
    return {"messages": [response]}

2.3 編譯圖之後,查看結構

workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_node("final", call_final_model)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
)
workflow.add_edge("tools", "agent")
workflow.add_edge("final", END)
app = workflow.compile()

img

2.4 獲取串流輸出

然後,我們可以從最終節點獲取串流輸出:

inputs = {"messages": [("human", "請推薦一款傳統手搖飲")]}
async for chunk in app.astream(inputs, stream_mode="values"):
    #顯示最新訊息的內容,並且漂亮顯示出
    chunk["messages"][-1].pretty_print()

img

結論

通過本教程,我們深入探討了 LangChain 和 LangGraph 的串流功能,從基本的 invoke 和 stream 方法,到更高級的事件串流和過濾技術。這些工具和技巧將幫助你構建更加靈活、高效的 AI 應用,提供卓越的用戶體驗。

關鍵精華:

  • 串流技術對於提升 AI 應用的響應性至關重要
  • LangChain 和 LangGraph 提供了多種串流方法,適用於不同場景
  • 事件串流 API 允許更細粒度的控制和監控
  • 合理使用過濾技術可以優化串流事件的處理效率
  • 選擇適當的串流模式(values 或 updates)可以根據需求獲取不同層面的信息

本篇教學程式碼位於比賽用 Repo,記得多多操作


上一篇
【Day 4】- LangGraph 入門教程:節點、邊、狀態
下一篇
【Day 6】- LangChain 與 LangGraph 工具實戰探討:AI 模型的程式呼叫能力
系列文
2024 年用 LangGraph 從零開始實現 Agentic AI System31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言