在上一篇文章中,我們建立了 LangChain Agent GUI 應用程式的基礎架構。本文將深入探討進階功能的實作,包括檔案上傳處理系統、智能專家系統、訊息顯示與格式化,以及完整的程式架構總覽。
def upload_file(self):
"""上傳檔案功能"""
file_path = filedialog.askopenfilename(
title="選擇要上傳的檔案",
filetypes=[
("所有支援的檔案", "*.txt;*.py;*.js;*.html;*.css;*.json;*.xml;*.csv;*.docx;*.doc;*.pdf;*.xlsx;*.xls"),
("Excel 檔案", "*.xlsx;*.xls"),
("文字檔案", "*.txt"),
("程式檔案", "*.py;*.js;*.html;*.css;*.json;*.xml"),
("Word 檔案", "*.docx;*.doc"),
("PDF 檔案", "*.pdf"),
("CSV 檔案", "*.csv"),
("所有檔案", "*.*")
]
)
if file_path:
self.message_queue.put(("系統", f"正在上傳檔案: {Path(file_path).name}..."))
thread = threading.Thread(target=self._upload_file_thread, args=(file_path,))
thread.start()
def _upload_file_thread(self, file_path):
"""在背景執行緒中處理檔案上傳"""
try:
filename = Path(file_path).name
file_ext = Path(file_path).suffix.lower()
# 使用一般檔案上傳工具
result = self.system_tools.upload_file_tool(file_path)
self.message_queue.put(("系統", result))
# 如果上傳成功,設置等待用戶操作的狀態
if "上傳成功" in result:
# 提取檔案內容並保存
content_start = result.find("內容: ") + 4
file_content = result[content_start:] if content_start > 3 else result
# 保存檔案信息以供後續使用
self.last_file_content = file_content
self.last_file_name = filename
self.waiting_for_file_action = True
# 詢問用戶想要做什麼
self.message_queue.put(("系統", f"檔案 {filename} 上傳成功!請問您想要做什麼?"))
except Exception as e:
self.message_queue.put(("系統", f"檔案處理時發生錯誤: {e}"))
程式支援多種檔案格式:
程式具備智能意圖識別功能,透過 _detect_user_intention()
函數根據用戶問題自動選擇適合的專家類型:
def _detect_user_intention(self, question):
"""使用AI智能檢測用戶意圖並返回對應的專家類型"""
# 使用AI來分析用戶意圖
intention_prompt = f"""你是一位意圖識別專家,請根據用戶的輸入文字來判別其意圖,並確定最適合的專家類型。
用戶輸入:"{question}"
請分析用戶的需求並從以下專家類型中選擇最適合的一個:
[專家類型列表]
特別注意:
- 如果用戶說「解答案」、「解題」、「求解」、「計算」等,應該選擇「數學專家」
- 如果用戶要求分析數學相關內容,應該選擇「數學專家」
請只回答專家類型的名稱(如:數學專家),不要包含其他解釋。"""
try:
# 使用gemini模型來分析意圖
intention_response = self.gemini_llm._call(intention_prompt)
expert_type = intention_response.strip()
# 根據專家類型返回相應的分析類型和prompt
expert_mapping = {
"數學專家": {
"analysis_type": "解題",
"expert_prompt": "數學專家專用提示詞"
},
"文學專家": {
"analysis_type": "心得",
"expert_prompt": "文學專家專用提示詞"
}
# ... 其他專家類型
}
return expert_mapping.get(expert_type, default_expert)
except Exception as e:
print(f"[ERROR] 意圖識別出錯: {e}")
return default_expert
def _append_to_history(self, speaker, message):
"""新增訊息到歷史紀錄"""
self.chat_history.append(f"{speaker}: {message}")
self._update_history_display()
def _update_history_display(self):
"""更新歷史紀錄顯示(支援markdown格式)"""
self.history_textbox.configure(state="normal")
self.history_textbox.delete("1.0", "end")
textbox = self.history_textbox._textbox
for message in self.chat_history:
# 分析訊息格式:講者: 內容
if ':' in message:
speaker, content = message.split(':', 1)
speaker = speaker.strip()
content = content.strip()
# 添加講者名稱
start_pos = textbox.index(tk.INSERT)
textbox.insert(tk.END, f"{speaker}: ")
end_pos = textbox.index(tk.INSERT)
# 根據講者類型設定顏色
if speaker == "使用者":
textbox.tag_add("speaker_user", start_pos, end_pos)
elif speaker.startswith("Agent"):
textbox.tag_add("speaker_agent", start_pos, end_pos)
elif "分析" in speaker:
textbox.tag_add("speaker_analysis", start_pos, end_pos)
else:
textbox.tag_add("speaker_system", start_pos, end_pos)
# 處理內容的markdown格式
try:
formatted_parts = self._format_markdown_text(content)
for text_part, tag in formatted_parts:
start_pos = textbox.index(tk.INSERT)
textbox.insert(tk.END, text_part)
if tag:
end_pos = textbox.index(tk.INSERT)
textbox.tag_add(tag, start_pos, end_pos)
except Exception as e:
# 如果格式化失敗,直接插入原始文本
print(f"[DEBUG] 格式化失敗: {e}")
textbox.insert(tk.END, content)
textbox.insert(tk.END, "\n\n")
else:
# 沒有講者的訊息,直接顯示
textbox.insert(tk.END, f"{message}\n\n")
self.history_textbox.see("end")
self.history_textbox.configure(state="disabled")
def _remove_waiting_message(self):
"""移除等待訊息"""
for i in range(len(self.chat_history) - 1, -1, -1):
if "正在處理…" in self.chat_history[i]:
self.chat_history.pop(i)
self._update_history_display()
return
def _format_markdown_text(self, text):
"""將markdown文字轉換為格式化文字並返回標籤資訊"""
formatted_parts = []
# 首先處理LaTeX數學公式
text = self._convert_latex_to_text(text)
# 處理代碼塊
if '```' in text:
parts = text.split('```')
for i, part in enumerate(parts):
if i % 2 == 0: # 非代碼塊部分
if part.strip():
formatted_parts.extend(self._process_text_lines(part))
else: # 代碼塊部分
if part.strip():
formatted_parts.append((part.strip() + '\n', 'code_block'))
else:
formatted_parts.extend(self._process_text_lines(text))
return formatted_parts
系統能夠將 LaTeX 數學公式轉換為 Unicode 符號,支援:
支援常見的 Markdown 格式:
**文字**
或 __文字__
*文字*
或 _文字_
`程式碼`
``` 程式碼 ```
if __name__ == "__main__":
try:
customtkinter.set_appearance_mode("dark")
app = LangChainAgentApp()
app.mainloop()
except Exception as e:
print(f"應用程式啟動錯誤: {e}")
input("按Enter鍵結束...")
LangChainAgentApp(customtkinter.CTk)
├── __init__() # 初始化和設定
├── _initialize_langchain() # 初始化LangChain組件
├── _create_gui() # 建立GUI界面
├── _configure_text_tags() # 配置文字標籤
├── start_agent_call() # 啟動Agent處理
├── get_agent_response() # Agent回應處理
├── _detect_user_intention() # 智能意圖識別
├── clear_history() # 清除對話歷史
├── check_queue() # 檢查訊息佇列
├── upload_file() # 檔案上傳選擇
├── _upload_file_thread() # 背景檔案處理
├── _append_to_history() # 添加訊息到歷史
├── _update_history_display() # 更新顯示
├── _remove_waiting_message() # 移除等待訊息
├── _format_markdown_text() # Markdown格式化
├── _convert_latex_to_text() # LaTeX轉換
└── _process_text_lines() # 文字行處理