在現代 GUI 應用程式中,保持界面的響應性是至關重要的用戶體驗要素。當應用程式需要處理耗時的 AI 推理或檔案處理時,如何避免界面凍結成為關鍵技術挑戰。本文將深入解析 LangChain Agent 系統中多執行緒訊息佇列的設計實作,展示如何實現 GUI 與 Agent 之間的非阻塞通信機制。
class LangChainAgentApp(customtkinter.CTk):
def __init__(self):
super().__init__()
# 初始化變量
self.chat_history = []
self.message_queue = queue.Queue()
self.agent = None
self.system_tools = None
# 檔案處理相關變量
self.waiting_for_file_action = False
self.last_uploaded_file_content = ""
self.last_uploaded_filename = ""
self.last_analysis_result = ""
# 開始訊息處理迴圈
self.check_queue()
關鍵設計決策:
queue.Queue()
:Python 標準庫提供的執行緒安全佇列訊息採用簡潔的元組格式:(speaker, message)
"remove_waiting"
用於移除等待狀態訊息def check_queue(self):
"""檢查訊息佇列"""
try:
processed_count = 0
max_process_per_cycle = 5 # 限制每次處理的最大訊息數
while processed_count < max_process_per_cycle:
speaker, message = self.message_queue.get_nowait()
if speaker == "remove_waiting":
self._remove_waiting_message()
else:
self._append_to_history(speaker, message)
processed_count += 1
except queue.Empty:
pass
except Exception as e:
print(f"[DEBUG] 佇列處理錯誤: {e}")
# 增加檢查間隔,降低CPU使用率
self.after(200, self.check_queue)
批量處理限制:
錯誤處理機制:
def _remove_waiting_message(self):
"""移除等待訊息"""
for i in range(len(self.chat_history) - 1, -1, -1):
if "正在處理…" in self.chat_history[i]:
self.chat_history.pop(i)
self._update_history_display()
return
等待訊息管理:
def start_agent_call(self):
"""啟動 Agent 處理用戶輸入"""
question = self.entry.get().strip()
if question:
self.entry.delete(0, "end")
self._append_to_history("使用者", question)
self.message_queue.put(("Agent", "正在處理…"))
# 啟動新執行緒來呼叫 Agent
thread = threading.Thread(target=self.get_agent_response, args=(question,))
thread.start()
else:
self.message_queue.put(("系統", "請輸入一個問題。"))
主執行緒職責:
def get_agent_response(self, question):
"""使用 Agent 處理問題"""
if not self.agent:
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("Agent", "Agent 初始化失敗,請檢查設定。"))
return
# 檢查是否正在等待檔案操作回覆
if hasattr(self, 'waiting_for_file_action') and self.waiting_for_file_action:
if hasattr(self, 'last_uploaded_file_content') and self.last_uploaded_file_content:
print(f"[DEBUG] 檢測到檔案操作意圖: {question}")
# 智能專家和意圖識別
analysis_type, expert_type, expert_prompt = self._detect_user_intention(question)
try:
# 直接調用檔案分析工具,傳入專家信息
response = self.system_tools.analyze_file_content_tool(analysis_type, expert_type, expert_prompt)
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("Agent", response))
# 保存分析結果
self.last_analysis_result = response
# 清除等待標記
self.waiting_for_file_action = False
return
except Exception as e:
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("系統", f"檔案分析時發生錯誤: {e}"))
self.waiting_for_file_action = False
return
try:
# 使用 Agent 處理問題
response = self.agent.run(question)
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("Agent", response))
except Exception as e:
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("系統", f"Agent 處理時發生錯誤: {e}"))
Agent 執行緒職責:
def upload_file(self):
"""上傳檔案功能"""
file_path = filedialog.askopenfilename(
title="選擇要上傳的檔案",
filetypes=[
("所有支援的檔案", "*.txt;*.py;*.js;*.html;*.css;*.json;*.xml;*.csv;*.docx;*.doc;*.pdf;*.xlsx;*.xls"),
("Excel 檔案", "*.xlsx;*.xls"),
("文字檔案", "*.txt"),
("程式檔案", "*.py;*.js;*.html;*.css;*.json;*.xml"),
("Word 檔案", "*.docx;*.doc"),
("PDF 檔案", "*.pdf"),
("CSV 檔案", "*.csv"),
("所有檔案", "*.*")
]
)
if file_path:
self.message_queue.put(("系統", f"正在上傳檔案: {Path(file_path).name}..."))
thread = threading.Thread(target=self._upload_file_thread, args=(file_path,))
thread.start()
def _upload_file_thread(self, file_path):
"""在背景執行緒中處理檔案上傳"""
try:
filename = Path(file_path).name
file_ext = Path(file_path).suffix.lower()
# 使用一般檔案上傳工具
result = self.system_tools.upload_file_tool(file_path)
self.message_queue.put(("系統", result))
# 如果上傳成功,設置等待用戶操作的狀態
if "上傳成功" in result:
# 提取檔案內容並保存
content_start = result.find("內容: ") + 4
file_content = result[content_start:] if content_start > 3 else result
# 保存檔案信息以供後續使用
self.last_file_content = file_content
self.last_file_name = filename
self.waiting_for_file_action = True
# 詢問用戶想要做什麼
self.message_queue.put(("系統", f"檔案 {filename} 上傳成功!請問您想要做什麼?"))
except Exception as e:
self.message_queue.put(("系統", f"檔案處理時發生錯誤: {e}"))
檔案處理執行緒職責:
def _append_to_history(self, speaker, message):
"""新增訊息到歷史紀錄"""
self.chat_history.append(f"{speaker}: {message}")
self._update_history_display()
def _update_history_display(self):
"""更新歷史紀錄顯示(支援markdown格式)"""
self.history_textbox.configure(state="normal")
self.history_textbox.delete("1.0", "end")
textbox = self.history_textbox._textbox
for message in self.chat_history:
# 分析訊息格式:講者: 內容
if ':' in message:
speaker, content = message.split(':', 1)
speaker = speaker.strip()
content = content.strip()
# 添加講者名稱
start_pos = textbox.index(tk.INSERT)
textbox.insert(tk.END, f"{speaker}: ")
end_pos = textbox.index(tk.INSERT)
# 根據講者類型設定顏色
if speaker == "使用者":
textbox.tag_add("speaker_user", start_pos, end_pos)
elif speaker.startswith("Agent"):
textbox.tag_add("speaker_agent", start_pos, end_pos)
elif "分析" in speaker:
textbox.tag_add("speaker_analysis", start_pos, end_pos)
else:
textbox.tag_add("speaker_system", start_pos, end_pos)
# 處理內容的markdown格式
try:
formatted_parts = self._format_markdown_text(content)
for text_part, tag in formatted_parts:
start_pos = textbox.index(tk.INSERT)
textbox.insert(tk.END, text_part)
if tag:
end_pos = textbox.index(tk.INSERT)
textbox.tag_add(tag, start_pos, end_pos)
except Exception as e:
# 如果格式化失敗,直接插入原始文本
print(f"[DEBUG] 格式化失敗: {e}")
textbox.insert(tk.END, content)
textbox.insert(tk.END, "\n\n")
else:
# 沒有講者的訊息,直接顯示
textbox.insert(tk.END, f"{message}\n\n")
self.history_textbox.see("end")
self.history_textbox.configure(state="disabled")
顯示更新特點:
# 檔案處理相關變量
self.waiting_for_file_action = False
self.last_uploaded_file_content = ""
self.last_uploaded_filename = ""
self.last_analysis_result = ""
狀態管理策略:
# 啟動新執行緒來呼叫 Agent
thread = threading.Thread(target=self.get_agent_response, args=(question,))
thread.start()
# 檔案處理執行緒
thread = threading.Thread(target=self._upload_file_thread, args=(file_path,))
thread.start()
執行緒設計原則:
try:
# 使用 Agent 處理問題
response = self.agent.run(question)
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("Agent", response))
except Exception as e:
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("系統", f"Agent 處理時發生錯誤: {e}"))
錯誤處理層次:
if not self.agent:
self.message_queue.put(("remove_waiting", ""))
self.message_queue.put(("Agent", "Agent 初始化失敗,請檢查設定。"))
return
降級策略:
主執行緒 (GUI)
├── 事件處理 (用戶輸入、按鈕點擊)
├── 訊息佇列檢查 (check_queue)
├── 界面更新 (_update_history_display)
└── 執行緒創建 (threading.Thread)
Agent 執行緒
├── AI 推理處理 (self.agent.run)
├── 檔案分析邏輯 (analyze_file_content_tool)
├── 意圖識別 (_detect_user_intention)
└── 結果回傳 (message_queue.put)
檔案處理執行緒
├── 檔案 I/O 操作 (upload_file_tool)
├── 內容提取和解析
├── 狀態設定 (waiting_for_file_action)
└── 用戶提示 (message_queue.put)
訊息佇列 (queue.Queue)
├── 執行緒安全通信
├── 非阻塞訊息傳遞
├── 訊息序列化處理
└── 錯誤隔離機制
用戶輸入 → 主執行緒事件 → 創建背景執行緒 → AI處理 → 訊息佇列 → 界面更新 → 用戶看到結果
↑ ↓
用戶操作 ← 界面響應 ← 格式化顯示 ← 訊息處理迴圈 ← 佇列檢查 ← 定時器觸發