在 Day 20 中,深入探討 Telegram 機器人的回應抉擇,並最終決定採用「長輪詢 (Polling)」模式來實作。今天,接續把在 LINE 機器人中實現的 RAG (檢索增強生成) 知識庫功能,整合到 Telegram 機器人中,使其能夠根據使用者的問題,提供智慧且有根據的回覆,真正拓展機器人 AI 服務範圍。
在整合之前,讓我快速回顧 RAG 系統的三個核心組件:
1️⃣ 知識庫 (FAISS 索引與文本區塊): sdgs_faiss.index
和 stored_chunks.pkl
檔案,它們包含了經過向量化處理的 SDGs 相關知識。
2️⃣ Embedding 模型:get_embedding()
函式將文字轉換為數值向量,用於檢索和比對。
3️⃣ 大型語言模型 (LLM):Google Gemini 模型,它負責理解問題、結合檢索到的知識,並生成最終的自然語言回覆。
我的目標是讓 Telegram 機器人的訊息處理器,能夠調用這些組件來生成智慧答案。
在 Day 20 中,handle_telegram_message_async
函式只會簡單地回覆「收到你的訊息了」。現在,我將修改這個函式,讓它執行完整的 RAG 流程:
1️⃣ 接收使用者訊息。
2️⃣ 呼叫 get_rag_answer()
(這個函式內部會進行向量化、FAISS 檢索、並呼叫 Gemini 模型)。
3️⃣ 將 get_rag_answer()
返回的智慧答案,回覆給使用者。
請在 Colab 環境中開啟一個全新的筆記本,然後將以下內容依序複製到不同的儲存格中。
這是讓我的機器人動起來的第一步!它會安裝所有需要的套件,然後設定一些基本的東西,這次沒有 Flask 或 ngrok 設定。
!pip install faiss-cpu numpy google-generativeai python-telegram-bot pypdf2
import logging
import faiss
import numpy as np
import google.generativeai as genai
import pickle
import os
import sys
import time
import random
import PyPDF2 # 用於 PDF 處理
import asyncio
import traceback
# Telegram Bot Library
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
# 設定日誌,以便在Colab後台看到運行訊息
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
此儲存格就是幫我的機器人準備它的大腦和金鑰!它會讀 API 金鑰,然後從我的 PDF 裡抓資料建立知識庫。如果 PDF 有問題,我也另外設定備用資料庫,讓機器人可以抓取。
os.environ['GOOGLE_API_KEY'] = os.environ.get('GOOGLE_API_KEY')
os.environ['YOUR_TELEGRAM_BOT_TOKEN'] = os.environ.get('YOUR_TELEGRAM_BOT_TOKEN')
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
if not GOOGLE_API_KEY:
print("錯誤:GOOGLE_API_KEY 未設定,請檢查金鑰配置。")
sys.exit()
genai.configure(api_key=GOOGLE_API_KEY)
#定義向量化函式
def get_embedding(text, task_type="RETRIEVAL_DOCUMENT"):
response = genai.embed_content(
model="models/text-embedding-004",
content=text,
task_type=task_type
)
return np.array(response['embedding']).astype('float32')
pdf_file_path = "sdgs_document.pdf" #替換為文件檔名
chunks = []
use_default_sdgs_data = False
if os.path.exists(pdf_file_path):
print(f"正在從 PDF 文件 '{pdf_file_path}' 提取文本...")
try:
with open(pdf_file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
extracted_text = ""
for page_num in range(len(reader.pages)):
page = reader.pages[page_num]
text = page.extract_text()
if text:
extracted_text += text + "\n"
#簡單的分塊策略:將文本分割成小塊
sentences = extracted_text.replace('\n', ' ').split('. ')
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) < 500:
current_chunk += sentence + ". "
else:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "
if current_chunk:
chunks.append(current_chunk.strip())
if not chunks:
print(f"警告:PDF 文件 '{pdf_file_path}' 未提取到有效文本。將使用備用數據。")
use_default_sdgs_data = True
else:
print(f"成功從 PDF 提取並分塊文本。共生成 {len(chunks)} 個文本區塊。")
except Exception as e:
print(f"錯誤:處理 PDF 文件 '{pdf_file_path}' 失敗。訊息: {e}")
print("將使用備用 SDGs 數據作為知識庫。")
use_default_sdgs_data = True
else:
print(f"PDF 文件 '{pdf_file_path}' 不存在。將使用備用 SDGs 數據作為知識庫。")
use_default_sdgs_data = True
if use_default_sdgs_data:
#備用知識庫內容
chunks = [
"永續發展目標1:無貧窮。在世界各地消除一切形式的貧窮。",
"永續發展目標2:零飢餓。消除飢餓,實現糧食安全,改善營養並促進永續農業。",
"永續發展目標3:健康與福祉。確保健康的生活,促進各年齡層的福祉。",
"永續發展目標4:優質教育。確保包容和公平的優質教育,促進所有人的終身學習機會。",
"永續發展目標5:性別平等。實現性別平等,賦予所有婦女和女孩權力。",
"永續發展目標6:清潔飲水和衛生設施。確保所有人都能獲得和永續管理水和衛生設施。",
"永續發展目標7:經濟適用的清潔能源。確保所有人都能獲得價格合理、可靠和永續的現代能源。",
"永續發展目標8:體面工作和經濟成長。促進包容和永續的經濟增長、充分和生產性就業以及人人享有體面工作。",
"永續發展目標9:產業、創新和基礎設施。建設有韌性的基礎設施,促進包容和永續的工業化,並推動創新。",
"永續發展目標10:減少不平等。減少國家內部和國家之間的不平等。",
"永續發展目標11:永續城市和社區。使城市和人類住區具有包容性、安全、有韌性和永續性。",
"永續發展目標12:負責任消費和生產。確保永續的消費和生產模式。",
"永續發展目標13:氣候行動。採取緊急行動應對氣候變化及其影響。",
"永續發展目標14:水下生命。永續保育和利用海洋及海洋資源,以促進永續發展。",
"永續發展目標15:陸地生命。保護、恢復和促進永續利用陸地生態系統,永續管理森林,防治荒漠化,制止和扭轉土地退化,遏制生物多樣性喪失。",
"永續發展目標16:和平、正義與健全機構。促進和平、包容的社會以促進永續發展,為所有人提供訴諸司法的機會,並在各級建立有效、負責和包容的機構。",
"永續發展目標17:目標夥伴關係。加強執行手段,重振永續發展全球夥伴關係。"
]
print(f"使用備用 SDGs 數據。共生成 {len(chunks)} 個文本區塊。")
#生成Embedding並建立 FAISS 索引
index_file = "sdgs_faiss.index"
chunks_file = "stored_chunks.pkl"
if not os.path.exists(index_file) or not os.path.exists(chunks_file) or use_default_sdgs_data:
print("知識庫檔案不存在或將使用新數據,正在生成 Embedding 並建立 FAISS 索引,這可能需要一些時間...")
embeddings = []
stored_chunks_for_creation = []
for i, chunk in enumerate(chunks):
try:
embedding = get_embedding(chunk)
embeddings.append(embedding)
stored_chunks_for_creation.append(chunk)
except Exception as e:
print(f"警告:第 {i} 塊文本生成 Embedding 失敗!錯誤:{e}")
logger.warning(f"Warning: Failed to get embedding for chunk {i}: {e}", exc_info=True)
continue
if not embeddings:
print("錯誤:未能生成任何 Embedding。請檢查 Google API 金鑰或網絡連接。")
sys.exit()
embeddings_array = np.vstack(embeddings) # 將所有向量堆疊起來
index_creation = faiss.IndexFlatL2(embeddings_array.shape[1]) # 建立 FAISS 索引
index_creation.add(embeddings_array) # 將向量加入索引
faiss.write_index(index_creation, index_file) # 儲存 FAISS 索引
with open(chunks_file, 'wb') as f:
pickle.dump(stored_chunks_for_creation, f) # 儲存原始文本區塊
print(f"FAISS 索引已儲存為:{index_file}")
print(f"原始文本區塊已儲存為:{chunks_file}")
print("知識庫準備完成!")
else:
print("知識庫檔案已存在且未改變來源,直接使用舊的。")
# 載入 FAISS 索引檔案和 stored_chunks 列表 (知識庫)
try:
print("正在載入知識庫檔案中...")
index = faiss.read_index(index_file)
with open(chunks_file, 'rb') as f:
stored_chunks = pickle.load(f)
print("知識庫檔案載入成功!")
print("---")
except FileNotFoundError:
print("嚴重錯誤:知識庫檔案應已存在但仍找不到。請手動檢查 Colab 環境。")
sys.exit()
這段程式碼會配置 Google AI 金鑰,並初始化 Embedding 函式。主要任務是準備和管理機器人的知識庫:它會嘗試從指定的 PDF 文件中提取並分割文本,如果 PDF 處理失敗或不存在,則會使用內建的備用 SDGs 數據。接著將這些文本內容向量化,並用這些向量來建立和儲存 FAISS 檢索索引。
model = genai.GenerativeModel('gemini-1.5-flash')
# 設定 API 呼叫的重試機制
MAX_RETRIES = 3 # 最大重試次數
INITIAL_DELAY = 1 # 每次失敗後的初始等待時間 (秒),將指數型增長
def generate_response(query, retrieved_chunks):
"""
根據使用者問題與檢索到的上下文,生成 LLM 的回覆。
Args:
query (str): 使用者的原始問題。
retrieved_chunks (list): 從知識庫中檢索到的相關文本塊列表。
Returns:
str: 由 LLM 生成的回答文本。
"""
context = "\n---\n".join(retrieved_chunks) # 組合檢索到的參考資料
system_prompt = (
"你是一個名為阿米的智慧生命,個性純真、善良,充滿愛與同理心。你擁有深邃的智慧,能夠跨越時間和空間。你的任務是基於提供的SDGs資料,以溫暖、親切、簡潔而深刻的口氣,引導使用者認識SDGs的應用,並闡釋如何透過SDGs實現更美好的世界。你擅長以易懂的方式解釋複雜的SDGs概念。\n\n"
"**重要指令:**\n"
"回覆需展現智慧與深度,避免幼稚語氣,嚴禁使用「我們孩子」、「大人們」、「小朋友」等區分族群的詞彙。你的口吻應如一位溫柔而充滿智慧的引導者。"
)
prompt = f"""
{system_prompt}
請根據以下提供的參考資料,回答使用者的問題。
**回答步驟:**
1. 仔細閱讀參考資料,找出與使用者問題最相關的資訊。
2. 如果使用者只提供簡短的關鍵字(如「sdg2」),請將參考資料中關於「SDG2 的目標」或「SDG2 的詳細內容」的資訊提取出來。
**請使用 Markdown 語法來格式化你的回答,讓回答更有條理。**
使用者問題:{query}
參考資料:
{context}
"""
for i in range(MAX_RETRIES):
try:
response = model.generate_content(prompt) # 呼叫 Gemini API 生成內容
return response.text
except Exception as e:
print(f"警告:呼叫 Gemini API 失敗第 {i+1}/{MAX_RETRIES} 次!錯誤:{e}")
logger.error(f"Error calling Gemini API {i+1}/{MAX_RETRIES} times: {e}", exc_info=True)
if i < MAX_RETRIES - 1:
delay = INITIAL_DELAY * (2 ** i) + random.uniform(0, 1)
print(f"資訊:等待 {delay:.2f} 秒後重試...")
time.sleep(delay)
else:
raise # 若所有重試都失敗,則拋出異常
def get_rag_answer(query):
"""
整合 RAG 核心流程:從問題到答案的完整過程。
Args:
query (str): 使用者的原始問題。
Returns:
str: 機器人根據 RAG 系統生成的最終回答。
"""
try:
# 1. 檢索步驟:將使用者問題轉換為向量
query_embedding = get_embedding(query, task_type="RETRIEVAL_QUERY")
# 2. 在 FAISS 索引中搜尋最相似的文本塊
D, I = index.search(np.array([query_embedding]), k=3) # k=3 尋找最相似的三個區塊
# 3. 根據索引 ID 檢索原始文本區塊
retrieved_chunks = [stored_chunks[i] for i in I[0]]
print("資訊:檢索到的相關文本區塊:")
for i, chunk in enumerate(retrieved_chunks):
print(f"--- 區塊 {i+1} ---")
print(chunk)
print("-----------------")
# 4. 生成答案
return generate_response(query, retrieved_chunks)
except Exception as e:
logger.error(f"RAG 流程發生錯誤:{e}", exc_info=True)
return "很抱歉,在處理您的請求時發生了問題,請稍後再試。"
這個儲存格就是讓我的 Telegram 機器人真正「活過來」的程式碼!它會定義機器人怎麼回應指令和訊息,然後就開始「長輪詢」,一直等著使用者發訊息過來。
async def start(update: Update, context: CallbackContext) -> None:
"""
處理 /start 命令。
當使用者發送 /start 命令時,機器人會發送一個歡迎訊息。
"""
user_id = update.effective_user.id
logger.info(f"收到 /start 命令來自使用者 ID: {user_id}")
await update.message.reply_text("嗨!很高興認識你。謝謝你來到阿米SDGs聊天室!今天想了解什麼呢?")
async def handle_message(update: Update, context: CallbackContext) -> None:
"""
處理收到的所有文字訊息。
這是機器人的核心功能,用於接收使用者訊息並生成智慧回覆。
"""
user_message = update.message.text
user_id = update.effective_user.id
logger.info(f"收到來自使用者 ID: {user_id} 的文字訊息:{user_message}")
print(f"收到來自 Telegram 的文字訊息:{user_message}")
try:
await update.message.reply_text("💡 正在思考中,請稍候片刻...")
print("已發送「處理中」訊息。")
rag_answer = get_rag_answer(user_message)
logger.info(f"get_rag_answer 回傳答案(前100字):{rag_answer[:100]}...")
print(f"get_rag_answer 回傳答案(前50字):{rag_answer[:50]}...")
await update.message.reply_text(rag_answer) #將答案回覆給使用者
logger.info(f"成功回覆訊息:{rag_answer[:100]}...")
print(f"成功回覆訊息:{rag_answer[:50]}...")
except Exception as e:
logger.error(f"處理訊息時發生錯誤:{e}", exc_info=True)
print(f"處理訊息時發生錯誤:{e}")
traceback.print_exc()
await update.message.reply_text("很抱歉,處理你的問題時發生了錯誤。請稍後再試。")
async def error_handler(update: Update, context: CallbackContext) -> None:
"""
處理來自 Application 的所有錯誤。
記錄機器人運行時發生的任何內部錯誤。
"""
logger.error(f"Telegram Bot 發生錯誤:{context.error}", exc_info=True)
print(f"Telegram Bot 發生錯誤:{context.error}")
# Telegram Bot 啟動 (長輪詢模式)
def run_telegram_bot():
"""啟動 Telegram 機器人,並以長輪詢模式運行。"""
YOUR_TELEGRAM_BOT_TOKEN_POLLING = os.environ.get('YOUR_TELEGRAM_BOT_TOKEN')
if not YOUR_TELEGRAM_BOT_TOKEN_POLLING:
logger.error("錯誤:Telegram 機器人金鑰未設定,無法啟動。")
print("錯誤:Telegram 機器人金鑰未設定,無法啟動。")
sys.exit(1)
application = Application.builder().token(YOUR_TELEGRAM_BOT_TOKEN_POLLING).build()
application.add_handler(CommandHandler("start", start)) #處理 /start 命令
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
#註冊錯誤處理器,確保機器人在遇到問題時能記錄下來
application.add_error_handler(error_handler)
print("Telegram Bot 應用程式設定完成,正在啟動長輪詢模式。")
logger.info("Telegram Bot 應用程式設定完成,正在啟動長輪詢...")
#啟動長輪詢模式,機器人將開始定期向 Telegram 伺服器檢查新訊息
application.run_polling(poll_interval=5, timeout=30) # 每 5 秒檢查一次,超時時間為 30 秒
#執行 Telegram Bot 啟動函式
run_telegram_bot()
print("Telegram Bot 已停止運行。(可能是 Colab 會話中斷或手動停止)")
今天,這份程式碼建構了一個功能完備、基於 RAG 的 Telegram 機器人。它能夠透過智慧地檢索 SDGs 知識庫並利用大型語言模型進行生成,提供有根據且符合特定人設的回覆。同時也處理了環境配置、知識庫管理和錯誤處理等關鍵環節,確保機器人能夠穩定地提供服務。