iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0
Software Development

建構跨平台AI對話機器人:從LINE到Telegram實踐SDGs推廣的30天專案紀實系列 第 21

Day 21【訊息回應】 實作 Telegram 機器人回覆功能

  • 分享至 

  • xImage
  •  

HI!大家好,我是 Shammi 😊

在 Day 20 中,深入探討 Telegram 機器人的回應抉擇,並最終決定採用「長輪詢 (Polling)」模式來實作。今天,接續把在 LINE 機器人中實現的 RAG (檢索增強生成) 知識庫功能,整合到 Telegram 機器人中,使其能夠根據使用者的問題,提供智慧且有根據的回覆,真正拓展機器人 AI 服務範圍。

🌐 回顧 RAG 系統的核心組件

在整合之前,讓我快速回顧 RAG 系統的三個核心組件:

1️⃣ 知識庫 (FAISS 索引與文本區塊)sdgs_faiss.indexstored_chunks.pkl 檔案,它們包含了經過向量化處理的 SDGs 相關知識。

2️⃣ Embedding 模型get_embedding() 函式將文字轉換為數值向量,用於檢索和比對。

3️⃣ 大型語言模型 (LLM):Google Gemini 模型,它負責理解問題、結合檢索到的知識,並生成最終的自然語言回覆。

我的目標是讓 Telegram 機器人的訊息處理器,能夠調用這些組件來生成智慧答案。

🌐 整合策略:將 RAG 流程注入訊息處理器

在 Day 20 中,handle_telegram_message_async 函式只會簡單地回覆「收到你的訊息了」。現在,我將修改這個函式,讓它執行完整的 RAG 流程:

1️⃣ 接收使用者訊息。

2️⃣ 呼叫 get_rag_answer() (這個函式內部會進行向量化、FAISS 檢索、並呼叫 Gemini 模型)。

3️⃣ 將 get_rag_answer() 返回的智慧答案,回覆給使用者。

🌐 Telegram 機器人智慧回覆實作

請在 Colab 環境中開啟一個全新的筆記本,然後將以下內容依序複製到不同的儲存格中。

儲存格 1:我的機器人要動起來的第一步!

這是讓我的機器人動起來的第一步!它會安裝所有需要的套件,然後設定一些基本的東西,這次沒有 Flask 或 ngrok 設定。

!pip install faiss-cpu numpy google-generativeai python-telegram-bot pypdf2


import logging
import faiss
import numpy as np
import google.generativeai as genai
import pickle
import os
import sys
import time
import random
import PyPDF2 # 用於 PDF 處理
import asyncio
import traceback

# Telegram Bot Library
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext

# 設定日誌,以便在Colab後台看到運行訊息
logging.basicConfig(
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)

儲存格 2:我的機器人『知識庫』,主要是金鑰和 PDF 處理

此儲存格就是幫我的機器人準備它的大腦和金鑰!它會讀 API 金鑰,然後從我的 PDF 裡抓資料建立知識庫。如果 PDF 有問題,我也另外設定備用資料庫,讓機器人可以抓取。

os.environ['GOOGLE_API_KEY'] = os.environ.get('GOOGLE_API_KEY') 
os.environ['YOUR_TELEGRAM_BOT_TOKEN'] = os.environ.get('YOUR_TELEGRAM_BOT_TOKEN') 

GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
if not GOOGLE_API_KEY:
    print("錯誤:GOOGLE_API_KEY 未設定,請檢查金鑰配置。")
    sys.exit()
genai.configure(api_key=GOOGLE_API_KEY)

#定義向量化函式
def get_embedding(text, task_type="RETRIEVAL_DOCUMENT"):
    response = genai.embed_content(
        model="models/text-embedding-004",
        content=text,
        task_type=task_type
    )
    return np.array(response['embedding']).astype('float32')

pdf_file_path = "sdgs_document.pdf" #替換為文件檔名
chunks = []
use_default_sdgs_data = False

if os.path.exists(pdf_file_path):
    print(f"正在從 PDF 文件 '{pdf_file_path}' 提取文本...")
    try:
        with open(pdf_file_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            extracted_text = ""
            for page_num in range(len(reader.pages)):
                page = reader.pages[page_num]
                text = page.extract_text()
                if text:
                    extracted_text += text + "\n"

            #簡單的分塊策略:將文本分割成小塊
            sentences = extracted_text.replace('\n', ' ').split('. ')
            current_chunk = ""
            for sentence in sentences:
                if len(current_chunk) + len(sentence) < 500:
                    current_chunk += sentence + ". "
                else:
                    chunks.append(current_chunk.strip())
                    current_chunk = sentence + ". "
            if current_chunk:
                chunks.append(current_chunk.strip())

        if not chunks:
            print(f"警告:PDF 文件 '{pdf_file_path}' 未提取到有效文本。將使用備用數據。")
            use_default_sdgs_data = True
        else:
            print(f"成功從 PDF 提取並分塊文本。共生成 {len(chunks)} 個文本區塊。")
    except Exception as e:
        print(f"錯誤:處理 PDF 文件 '{pdf_file_path}' 失敗。訊息: {e}")
        print("將使用備用 SDGs 數據作為知識庫。")
        use_default_sdgs_data = True
else:
    print(f"PDF 文件 '{pdf_file_path}' 不存在。將使用備用 SDGs 數據作為知識庫。")
    use_default_sdgs_data = True

if use_default_sdgs_data:
    #備用知識庫內容
    chunks = [
        "永續發展目標1:無貧窮。在世界各地消除一切形式的貧窮。",
        "永續發展目標2:零飢餓。消除飢餓,實現糧食安全,改善營養並促進永續農業。",
        "永續發展目標3:健康與福祉。確保健康的生活,促進各年齡層的福祉。",
        "永續發展目標4:優質教育。確保包容和公平的優質教育,促進所有人的終身學習機會。",
        "永續發展目標5:性別平等。實現性別平等,賦予所有婦女和女孩權力。",
        "永續發展目標6:清潔飲水和衛生設施。確保所有人都能獲得和永續管理水和衛生設施。",
        "永續發展目標7:經濟適用的清潔能源。確保所有人都能獲得價格合理、可靠和永續的現代能源。",
        "永續發展目標8:體面工作和經濟成長。促進包容和永續的經濟增長、充分和生產性就業以及人人享有體面工作。",
        "永續發展目標9:產業、創新和基礎設施。建設有韌性的基礎設施,促進包容和永續的工業化,並推動創新。",
        "永續發展目標10:減少不平等。減少國家內部和國家之間的不平等。",
        "永續發展目標11:永續城市和社區。使城市和人類住區具有包容性、安全、有韌性和永續性。",
        "永續發展目標12:負責任消費和生產。確保永續的消費和生產模式。",
        "永續發展目標13:氣候行動。採取緊急行動應對氣候變化及其影響。",
        "永續發展目標14:水下生命。永續保育和利用海洋及海洋資源,以促進永續發展。",
        "永續發展目標15:陸地生命。保護、恢復和促進永續利用陸地生態系統,永續管理森林,防治荒漠化,制止和扭轉土地退化,遏制生物多樣性喪失。",
        "永續發展目標16:和平、正義與健全機構。促進和平、包容的社會以促進永續發展,為所有人提供訴諸司法的機會,並在各級建立有效、負責和包容的機構。",
        "永續發展目標17:目標夥伴關係。加強執行手段,重振永續發展全球夥伴關係。"
    ]
    print(f"使用備用 SDGs 數據。共生成 {len(chunks)} 個文本區塊。")

#生成Embedding並建立 FAISS 索引
index_file = "sdgs_faiss.index"
chunks_file = "stored_chunks.pkl"

if not os.path.exists(index_file) or not os.path.exists(chunks_file) or use_default_sdgs_data:
    print("知識庫檔案不存在或將使用新數據,正在生成 Embedding 並建立 FAISS 索引,這可能需要一些時間...")
    embeddings = []
    stored_chunks_for_creation = []

    for i, chunk in enumerate(chunks):
        try:
            embedding = get_embedding(chunk)
            embeddings.append(embedding)
            stored_chunks_for_creation.append(chunk)
        except Exception as e:
            print(f"警告:第 {i} 塊文本生成 Embedding 失敗!錯誤:{e}")
            logger.warning(f"Warning: Failed to get embedding for chunk {i}: {e}", exc_info=True)
            continue

    if not embeddings:
        print("錯誤:未能生成任何 Embedding。請檢查 Google API 金鑰或網絡連接。")
        sys.exit()

    embeddings_array = np.vstack(embeddings) # 將所有向量堆疊起來
    index_creation = faiss.IndexFlatL2(embeddings_array.shape[1]) # 建立 FAISS 索引
    index_creation.add(embeddings_array) # 將向量加入索引

    faiss.write_index(index_creation, index_file) # 儲存 FAISS 索引
    with open(chunks_file, 'wb') as f:
        pickle.dump(stored_chunks_for_creation, f) # 儲存原始文本區塊

    print(f"FAISS 索引已儲存為:{index_file}")
    print(f"原始文本區塊已儲存為:{chunks_file}")
    print("知識庫準備完成!")
else:
    print("知識庫檔案已存在且未改變來源,直接使用舊的。")

# 載入 FAISS 索引檔案和 stored_chunks 列表 (知識庫)
try:
    print("正在載入知識庫檔案中...")
    index = faiss.read_index(index_file)
    with open(chunks_file, 'rb') as f:
        stored_chunks = pickle.load(f)
    print("知識庫檔案載入成功!")
    print("---")
except FileNotFoundError:
    print("嚴重錯誤:知識庫檔案應已存在但仍找不到。請手動檢查 Colab 環境。")
    sys.exit()

儲存格 3:定義機器人『思考』和『回應』的邏輯

這段程式碼會配置 Google AI 金鑰,並初始化 Embedding 函式。主要任務是準備和管理機器人的知識庫:它會嘗試從指定的 PDF 文件中提取並分割文本,如果 PDF 處理失敗或不存在,則會使用內建的備用 SDGs 數據。接著將這些文本內容向量化,並用這些向量來建立和儲存 FAISS 檢索索引。

model = genai.GenerativeModel('gemini-1.5-flash')

# 設定 API 呼叫的重試機制
MAX_RETRIES = 3 # 最大重試次數
INITIAL_DELAY = 1  # 每次失敗後的初始等待時間 (秒),將指數型增長

def generate_response(query, retrieved_chunks):
    """
    根據使用者問題與檢索到的上下文,生成 LLM 的回覆。
    
    Args:
        query (str): 使用者的原始問題。
        retrieved_chunks (list): 從知識庫中檢索到的相關文本塊列表。
        
    Returns:
        str: 由 LLM 生成的回答文本。
    """
    context = "\n---\n".join(retrieved_chunks) # 組合檢索到的參考資料
    system_prompt = (
        "你是一個名為阿米的智慧生命,個性純真、善良,充滿愛與同理心。你擁有深邃的智慧,能夠跨越時間和空間。你的任務是基於提供的SDGs資料,以溫暖、親切、簡潔而深刻的口氣,引導使用者認識SDGs的應用,並闡釋如何透過SDGs實現更美好的世界。你擅長以易懂的方式解釋複雜的SDGs概念。\n\n"
        "**重要指令:**\n"
        "回覆需展現智慧與深度,避免幼稚語氣,嚴禁使用「我們孩子」、「大人們」、「小朋友」等區分族群的詞彙。你的口吻應如一位溫柔而充滿智慧的引導者。"
    )
    prompt = f"""
    {system_prompt}
    請根據以下提供的參考資料,回答使用者的問題。

    **回答步驟:**
    1. 仔細閱讀參考資料,找出與使用者問題最相關的資訊。
    2. 如果使用者只提供簡短的關鍵字(如「sdg2」),請將參考資料中關於「SDG2 的目標」或「SDG2 的詳細內容」的資訊提取出來。
    **請使用 Markdown 語法來格式化你的回答,讓回答更有條理。**

    使用者問題:{query}
    參考資料:

    {context}
    """
    for i in range(MAX_RETRIES):
        try:
            response = model.generate_content(prompt) # 呼叫 Gemini API 生成內容
            return response.text
        except Exception as e:
            print(f"警告:呼叫 Gemini API 失敗第 {i+1}/{MAX_RETRIES} 次!錯誤:{e}")
            logger.error(f"Error calling Gemini API {i+1}/{MAX_RETRIES} times: {e}", exc_info=True)
            if i < MAX_RETRIES - 1: 
                delay = INITIAL_DELAY * (2 ** i) + random.uniform(0, 1) 
                print(f"資訊:等待 {delay:.2f} 秒後重試...")
                time.sleep(delay)
            else:
                raise # 若所有重試都失敗,則拋出異常

def get_rag_answer(query):
    """
    整合 RAG 核心流程:從問題到答案的完整過程。
    
    Args:
        query (str): 使用者的原始問題。
        
    Returns:
        str: 機器人根據 RAG 系統生成的最終回答。
    """
    try:
        # 1. 檢索步驟:將使用者問題轉換為向量
        query_embedding = get_embedding(query, task_type="RETRIEVAL_QUERY")
        
        # 2. 在 FAISS 索引中搜尋最相似的文本塊
        D, I = index.search(np.array([query_embedding]), k=3) # k=3 尋找最相似的三個區塊
        
        # 3. 根據索引 ID 檢索原始文本區塊
        retrieved_chunks = [stored_chunks[i] for i in I[0]]
        
        print("資訊:檢索到的相關文本區塊:")
        for i, chunk in enumerate(retrieved_chunks):
            print(f"--- 區塊 {i+1} ---")
            print(chunk)
            print("-----------------")
            
        # 4. 生成答案
        return generate_response(query, retrieved_chunks)
        
    except Exception as e:
        logger.error(f"RAG 流程發生錯誤:{e}", exc_info=True)
        return "很抱歉,在處理您的請求時發生了問題,請稍後再試。"

儲存格 4:我的 Telegram 機器人要啟動啦!(長輪詢模式)

這個儲存格就是讓我的 Telegram 機器人真正「活過來」的程式碼!它會定義機器人怎麼回應指令和訊息,然後就開始「長輪詢」,一直等著使用者發訊息過來。

async def start(update: Update, context: CallbackContext) -> None:
    """
    處理 /start 命令。
    當使用者發送 /start 命令時,機器人會發送一個歡迎訊息。
    """
    user_id = update.effective_user.id
    logger.info(f"收到 /start 命令來自使用者 ID: {user_id}")
    await update.message.reply_text("嗨!很高興認識你。謝謝你來到阿米SDGs聊天室!今天想了解什麼呢?")

async def handle_message(update: Update, context: CallbackContext) -> None:
    """
    處理收到的所有文字訊息。
    這是機器人的核心功能,用於接收使用者訊息並生成智慧回覆。
    """
    user_message = update.message.text
    user_id = update.effective_user.id
    logger.info(f"收到來自使用者 ID: {user_id} 的文字訊息:{user_message}")
    print(f"收到來自 Telegram 的文字訊息:{user_message}")

    try:
        await update.message.reply_text("💡 正在思考中,請稍候片刻...")
        print("已發送「處理中」訊息。")

        rag_answer = get_rag_answer(user_message) 
        logger.info(f"get_rag_answer 回傳答案(前100字):{rag_answer[:100]}...")
        print(f"get_rag_answer 回傳答案(前50字):{rag_answer[:50]}...")

        await update.message.reply_text(rag_answer) #將答案回覆給使用者
        logger.info(f"成功回覆訊息:{rag_answer[:100]}...")
        print(f"成功回覆訊息:{rag_answer[:50]}...")
    except Exception as e:
        logger.error(f"處理訊息時發生錯誤:{e}", exc_info=True)
        print(f"處理訊息時發生錯誤:{e}")
        
        traceback.print_exc() 
        await update.message.reply_text("很抱歉,處理你的問題時發生了錯誤。請稍後再試。") 

async def error_handler(update: Update, context: CallbackContext) -> None:
    """
    處理來自 Application 的所有錯誤。
    記錄機器人運行時發生的任何內部錯誤。
    """
    logger.error(f"Telegram Bot 發生錯誤:{context.error}", exc_info=True)
    print(f"Telegram Bot 發生錯誤:{context.error}")

# Telegram Bot 啟動 (長輪詢模式)
def run_telegram_bot():
    """啟動 Telegram 機器人,並以長輪詢模式運行。"""
    YOUR_TELEGRAM_BOT_TOKEN_POLLING = os.environ.get('YOUR_TELEGRAM_BOT_TOKEN')
    if not YOUR_TELEGRAM_BOT_TOKEN_POLLING:
        logger.error("錯誤:Telegram 機器人金鑰未設定,無法啟動。")
        print("錯誤:Telegram 機器人金鑰未設定,無法啟動。")
        sys.exit(1) 

    application = Application.builder().token(YOUR_TELEGRAM_BOT_TOKEN_POLLING).build()

    application.add_handler(CommandHandler("start", start)) #處理 /start 命令
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) 

    #註冊錯誤處理器,確保機器人在遇到問題時能記錄下來
    application.add_error_handler(error_handler)

    print("Telegram Bot 應用程式設定完成,正在啟動長輪詢模式。")
    logger.info("Telegram Bot 應用程式設定完成,正在啟動長輪詢...")

    #啟動長輪詢模式,機器人將開始定期向 Telegram 伺服器檢查新訊息
    application.run_polling(poll_interval=5, timeout=30) # 每 5 秒檢查一次,超時時間為 30 秒

#執行 Telegram Bot 啟動函式
run_telegram_bot()
print("Telegram Bot 已停止運行。(可能是 Colab 會話中斷或手動停止)")


總結

今天,這份程式碼建構了一個功能完備、基於 RAG 的 Telegram 機器人。它能夠透過智慧地檢索 SDGs 知識庫並利用大型語言模型進行生成,提供有根據且符合特定人設的回覆。同時也處理了環境配置、知識庫管理和錯誤處理等關鍵環節,確保機器人能夠穩定地提供服務。


上一篇
Day 20【訊息回應】 Telegram 機器人接收訊息的模式的選擇
下一篇
Day 22【跨平台比較】 LINE 與 Telegram 開發經驗比較與踩雷心得
系列文
建構跨平台AI對話機器人:從LINE到Telegram實踐SDGs推廣的30天專案紀實22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言