iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0

多執行緒訊息佇列 — GUI 與 Agent 的非阻塞通信設計

前言

在現代 GUI 應用程式中,保持界面的響應性是至關重要的用戶體驗要素。當應用程式需要處理耗時的 AI 推理或檔案處理時,如何避免界面凍結成為關鍵技術挑戰。本文將深入解析 LangChain Agent 系統中多執行緒訊息佇列的設計實作,展示如何實現 GUI 與 Agent 之間的非阻塞通信機制。

一、訊息佇列架構設計

1.1 核心組件初始化

class LangChainAgentApp(customtkinter.CTk):
    def __init__(self):
        super().__init__()
        
        # 初始化變量
        self.chat_history = []
        self.message_queue = queue.Queue()
        self.agent = None
        self.system_tools = None
        
        # 檔案處理相關變量
        self.waiting_for_file_action = False
        self.last_uploaded_file_content = ""
        self.last_uploaded_filename = ""
        self.last_analysis_result = ""
        
        # 開始訊息處理迴圈
        self.check_queue()

1.2 訊息佇列設計原理

關鍵設計決策:

  • queue.Queue():Python 標準庫提供的執行緒安全佇列
  • 非阻塞通信:避免 GUI 主執行緒被長時間運算阻塞
  • 狀態管理:透過標誌變量協調不同執行緒的狀態
  • 錯誤隔離:各執行緒錯誤不會影響其他執行緒運作

1.3 訊息格式設計

訊息採用簡潔的元組格式:(speaker, message)

  • speaker:訊息來源標識("使用者"、"Agent"、"系統"、"remove_waiting")
  • message:訊息內容或特殊控制指令
  • 特殊控制"remove_waiting" 用於移除等待狀態訊息

二、核心訊息處理機制

2.1 check_queue() - 主要訊息循環

def check_queue(self):
    """檢查訊息佇列"""
    try:
        processed_count = 0
        max_process_per_cycle = 5  # 限制每次處理的最大訊息數
        
        while processed_count < max_process_per_cycle:
            speaker, message = self.message_queue.get_nowait()
            if speaker == "remove_waiting":
                self._remove_waiting_message()
            else:
                self._append_to_history(speaker, message)
            processed_count += 1
    except queue.Empty:
        pass
    except Exception as e:
        print(f"[DEBUG] 佇列處理錯誤: {e}")
    
    # 增加檢查間隔,降低CPU使用率
    self.after(200, self.check_queue)

2.2 性能優化策略

批量處理限制:

  • max_process_per_cycle = 5:避免單次處理過多訊息導致 UI 卡頓
  • get_nowait():非阻塞方式獲取訊息,避免執行緒等待
  • self.after(200):每 200 毫秒檢查一次,平衡響應性與 CPU 效率

錯誤處理機制:

  • queue.Empty 例外:正常情況,佇列為空時觸發
  • 通用例外處理:記錄錯誤但不中斷系統運作
  • 持續循環:確保訊息處理迴圈永不停止

2.3 訊息生命週期管理

def _remove_waiting_message(self):
    """移除等待訊息"""
    for i in range(len(self.chat_history) - 1, -1, -1):
        if "正在處理…" in self.chat_history[i]:
            self.chat_history.pop(i)
            self._update_history_display()
            return

等待訊息管理:

  • 即時狀態更新:顯示"正在處理…"提供用戶反饋
  • 精確移除:只移除特定的等待訊息,保留其他歷史
  • 倒序搜尋:從最新訊息開始搜尋,提高效率

三、多執行緒協調機制

3.1 主執行緒 - GUI 事件處理

def start_agent_call(self):
    """啟動 Agent 處理用戶輸入"""
    question = self.entry.get().strip()
    if question:
        self.entry.delete(0, "end")
        self._append_to_history("使用者", question)
        self.message_queue.put(("Agent", "正在處理…"))
        
        # 啟動新執行緒來呼叫 Agent
        thread = threading.Thread(target=self.get_agent_response, args=(question,))
        thread.start()
    else:
        self.message_queue.put(("系統", "請輸入一個問題。"))

主執行緒職責:

  • 用戶輸入處理:即時響應用戶操作
  • 界面更新:維護 GUI 組件狀態
  • 執行緒啟動:創建背景執行緒處理耗時任務
  • 狀態反饋:立即顯示處理狀態給用戶

3.2 Agent 執行緒 - AI 推理處理

def get_agent_response(self, question):
    """使用 Agent 處理問題"""
    if not self.agent:
        self.message_queue.put(("remove_waiting", ""))
        self.message_queue.put(("Agent", "Agent 初始化失敗,請檢查設定。"))
        return
    
    # 檢查是否正在等待檔案操作回覆
    if hasattr(self, 'waiting_for_file_action') and self.waiting_for_file_action:
        if hasattr(self, 'last_uploaded_file_content') and self.last_uploaded_file_content:
            print(f"[DEBUG] 檢測到檔案操作意圖: {question}")
            
            # 智能專家和意圖識別
            analysis_type, expert_type, expert_prompt = self._detect_user_intention(question)
            
            try:
                # 直接調用檔案分析工具,傳入專家信息
                response = self.system_tools.analyze_file_content_tool(analysis_type, expert_type, expert_prompt)
                self.message_queue.put(("remove_waiting", ""))
                self.message_queue.put(("Agent", response))
                
                # 保存分析結果
                self.last_analysis_result = response
                
                # 清除等待標記
                self.waiting_for_file_action = False
                return
                
            except Exception as e:
                self.message_queue.put(("remove_waiting", ""))
                self.message_queue.put(("系統", f"檔案分析時發生錯誤: {e}"))
                self.waiting_for_file_action = False
                return
    
    try:
        # 使用 Agent 處理問題
        response = self.agent.run(question)
        self.message_queue.put(("remove_waiting", ""))
        self.message_queue.put(("Agent", response))
    except Exception as e:
        self.message_queue.put(("remove_waiting", ""))
        self.message_queue.put(("系統", f"Agent 處理時發生錯誤: {e}"))

Agent 執行緒職責:

  • AI 推理處理:執行耗時的語言模型推理
  • 檔案分析邏輯:處理上傳檔案的智能分析
  • 狀態協調:管理檔案處理相關的狀態標誌
  • 錯誤處理:捕捉並報告處理過程中的錯誤

3.3 檔案處理執行緒 - 背景檔案操作

def upload_file(self):
    """上傳檔案功能"""
    file_path = filedialog.askopenfilename(
        title="選擇要上傳的檔案",
        filetypes=[
            ("所有支援的檔案", "*.txt;*.py;*.js;*.html;*.css;*.json;*.xml;*.csv;*.docx;*.doc;*.pdf;*.xlsx;*.xls"),
            ("Excel 檔案", "*.xlsx;*.xls"),
            ("文字檔案", "*.txt"),
            ("程式檔案", "*.py;*.js;*.html;*.css;*.json;*.xml"),
            ("Word 檔案", "*.docx;*.doc"),
            ("PDF 檔案", "*.pdf"),
            ("CSV 檔案", "*.csv"),
            ("所有檔案", "*.*")
        ]
    )
    
    if file_path:
        self.message_queue.put(("系統", f"正在上傳檔案: {Path(file_path).name}..."))
        thread = threading.Thread(target=self._upload_file_thread, args=(file_path,))
        thread.start()

def _upload_file_thread(self, file_path):
    """在背景執行緒中處理檔案上傳"""
    try:
        filename = Path(file_path).name
        file_ext = Path(file_path).suffix.lower()
        
        # 使用一般檔案上傳工具
        result = self.system_tools.upload_file_tool(file_path)
        self.message_queue.put(("系統", result))
        
        # 如果上傳成功,設置等待用戶操作的狀態
        if "上傳成功" in result:
            # 提取檔案內容並保存
            content_start = result.find("內容: ") + 4
            file_content = result[content_start:] if content_start > 3 else result
            
            # 保存檔案信息以供後續使用
            self.last_file_content = file_content
            self.last_file_name = filename
            self.waiting_for_file_action = True
            
            # 詢問用戶想要做什麼
            self.message_queue.put(("系統", f"檔案 {filename} 上傳成功!請問您想要做什麼?"))
    except Exception as e:
        self.message_queue.put(("系統", f"檔案處理時發生錯誤: {e}"))

檔案處理執行緒職責:

  • 檔案 I/O 操作:處理檔案讀取和解析
  • 內容提取:從檔案中提取可分析的內容
  • 狀態設定:設定檔案處理相關的狀態標誌
  • 用戶引導:提供後續操作的引導訊息

四、訊息顯示與更新機制

4.1 執行緒安全的訊息添加

def _append_to_history(self, speaker, message):
    """新增訊息到歷史紀錄"""
    self.chat_history.append(f"{speaker}: {message}")
    self._update_history_display()

4.2 複雜的顯示更新邏輯

def _update_history_display(self):
    """更新歷史紀錄顯示(支援markdown格式)"""
    self.history_textbox.configure(state="normal")
    self.history_textbox.delete("1.0", "end")
    
    textbox = self.history_textbox._textbox
    
    for message in self.chat_history:
        # 分析訊息格式:講者: 內容
        if ':' in message:
            speaker, content = message.split(':', 1)
            speaker = speaker.strip()
            content = content.strip()
            
            # 添加講者名稱
            start_pos = textbox.index(tk.INSERT)
            textbox.insert(tk.END, f"{speaker}: ")
            end_pos = textbox.index(tk.INSERT)
            
            # 根據講者類型設定顏色
            if speaker == "使用者":
                textbox.tag_add("speaker_user", start_pos, end_pos)
            elif speaker.startswith("Agent"):
                textbox.tag_add("speaker_agent", start_pos, end_pos)
            elif "分析" in speaker:
                textbox.tag_add("speaker_analysis", start_pos, end_pos)
            else:
                textbox.tag_add("speaker_system", start_pos, end_pos)
            
            # 處理內容的markdown格式
            try:
                formatted_parts = self._format_markdown_text(content)
                
                for text_part, tag in formatted_parts:
                    start_pos = textbox.index(tk.INSERT)
                    textbox.insert(tk.END, text_part)
                    if tag:
                        end_pos = textbox.index(tk.INSERT)
                        textbox.tag_add(tag, start_pos, end_pos)
            except Exception as e:
                # 如果格式化失敗,直接插入原始文本
                print(f"[DEBUG] 格式化失敗: {e}")
                textbox.insert(tk.END, content)
            
            textbox.insert(tk.END, "\n\n")
        else:
            # 沒有講者的訊息,直接顯示
            textbox.insert(tk.END, f"{message}\n\n")
    
    self.history_textbox.see("end")
    self.history_textbox.configure(state="disabled")

顯示更新特點:

  • 完整重繪:每次更新都重新繪製整個歷史
  • 格式化支援:支援 Markdown 格式和語法高亮
  • 色彩標識:不同發話者使用不同顏色區分
  • 自動捲動:自動捲動到最新訊息位置

五、執行緒同步與狀態管理

5.1 狀態標誌協調

# 檔案處理相關變量
self.waiting_for_file_action = False
self.last_uploaded_file_content = ""
self.last_uploaded_filename = ""
self.last_analysis_result = ""

狀態管理策略:

  • waiting_for_file_action:檔案上傳後等待用戶指令的狀態
  • last_uploaded_file_content:保存檔案內容供後續處理
  • last_analysis_result:緩存分析結果避免重複處理

5.2 執行緒生命週期管理

# 啟動新執行緒來呼叫 Agent
thread = threading.Thread(target=self.get_agent_response, args=(question,))
thread.start()

# 檔案處理執行緒
thread = threading.Thread(target=self._upload_file_thread, args=(file_path,))
thread.start()

執行緒設計原則:

  • 短生命週期:每個任務創建新執行緒,完成後自動結束
  • 無狀態設計:執行緒間不共享可變狀態
  • 訊息傳遞:透過佇列進行所有執行緒間通信

六、錯誤處理與恢復機制

6.1 多層次錯誤處理

try:
    # 使用 Agent 處理問題
    response = self.agent.run(question)
    self.message_queue.put(("remove_waiting", ""))
    self.message_queue.put(("Agent", response))
except Exception as e:
    self.message_queue.put(("remove_waiting", ""))
    self.message_queue.put(("系統", f"Agent 處理時發生錯誤: {e}"))

錯誤處理層次:

  1. 執行緒級錯誤:各執行緒內部捕捉和處理錯誤
  2. 佇列級錯誤:訊息處理迴圈中的錯誤處理
  3. 應用級錯誤:整體應用程式啟動錯誤處理

6.2 優雅降級機制

if not self.agent:
    self.message_queue.put(("remove_waiting", ""))
    self.message_queue.put(("Agent", "Agent 初始化失敗,請檢查設定。"))
    return

降級策略:

  • 功能檢查:執行前檢查依賴組件是否可用
  • 友善錯誤訊息:向用戶提供有用的錯誤說明
  • 系統穩定性:單一功能失敗不影響整體系統

七 執行緒架構圖

主執行緒 (GUI)
├── 事件處理 (用戶輸入、按鈕點擊)
├── 訊息佇列檢查 (check_queue)
├── 界面更新 (_update_history_display)
└── 執行緒創建 (threading.Thread)

Agent 執行緒
├── AI 推理處理 (self.agent.run)
├── 檔案分析邏輯 (analyze_file_content_tool)
├── 意圖識別 (_detect_user_intention)
└── 結果回傳 (message_queue.put)

檔案處理執行緒
├── 檔案 I/O 操作 (upload_file_tool)
├── 內容提取和解析
├── 狀態設定 (waiting_for_file_action)
└── 用戶提示 (message_queue.put)

訊息佇列 (queue.Queue)
├── 執行緒安全通信
├── 非阻塞訊息傳遞
├── 訊息序列化處理
└── 錯誤隔離機制

7.1資料流向分析

用戶輸入 → 主執行緒事件 → 創建背景執行緒 → AI處理 → 訊息佇列 → 界面更新 → 用戶看到結果
     ↑                                                              ↓
用戶操作 ← 界面響應 ← 格式化顯示 ← 訊息處理迴圈 ← 佇列檢查 ← 定時器觸發

上一篇
DAY 25
下一篇
DAY 27
系列文
我的 AI 助手開發28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言