iT邦幫忙

2025 iThome 鐵人賽

DAY 20
0

在昨天,我們完成了智慧記憶管理器,讓 AI 能夠記住和回憶對話歷史。但在實際應用中,AI 助手還需要能夠查詢外部知識庫——比如公司的 FAQ 文件、產品說明書、政策手冊等。

想像一下這樣的場景:

  • 客服 AI 需要查詢最新的退換貨政策
  • 技術支援 AI 要找到特定產品的故障排除步驟
  • HR AI 助手需要回答員工手冊中的規定

今天我們將為記憶系統添加文件知識整合能力,讓 AI 不僅有記憶,還能「翻書查資料」!

系統架構擴展

新增模組架構

📁 ai_memory_system/
├── config.py              # 🔧 系統配置模組
├── utils.py               # 🛠 文本處理工具模組  
├── memory_manager.py      # 🧠 記憶管理核心模組
├── chatbot.py            # 🤖 AI對話整合模組
├── advanced_features.py   # ⚡ 進階功能模組
├── document_processor.py  # 📄 文件預處理模組 (新增)
├── faq_manager.py         # 📚 FAQ管理模組 (新增)
├── knowledge_integrator.py # 🔗 知識整合模組 (新增)
└── main.py               # 🚀 主程序模組

整體資料流程

用戶問題 → 記憶搜尋 + FAQ搜尋 → 知識整合 → 生成回應
    ↓           ↓         ↓         ↓         ↓
   NLP      個人記憶    文件庫    智慧融合   個性化回答

模組一:文件預處理模組 (document_processor.py)

文件預處理是知識整合的基礎,我們需要將各種格式的文件轉換為可搜尋的結構化資料。

核心設計理念

import os
import re
import hashlib
from typing import List, Dict, Any, Tuple
from datetime import datetime
import logging

# 安裝必要套件
# pip install python-docx pypdf2 python-pptx openpyxl

import docx
import PyPDF2
from pptx import Presentation
import openpyxl

class DocumentProcessor:
    """文件預處理器 - 將各種格式文件轉換為可搜尋的結構"""
    
    def __init__(self):
        self.supported_formats = {
            '.txt': self._process_txt,
            '.pdf': self._process_pdf,
            '.docx': self._process_docx,
            '.pptx': self._process_pptx,
            '.xlsx': self._process_xlsx
        }
        
        # 文本分段參數
        self.chunk_size = 500  # 每段文字長度
        self.overlap_size = 50  # 段落重疊長度
        
        # 設置日誌
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def process_document(self, file_path: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
        """
        處理單個文件,返回結構化資料
        
        Returns:
            {
                'doc_id': 文件唯一ID,
                'title': 文件標題,
                'content': 完整內容,
                'chunks': 分段內容列表,
                'metadata': 元資料,
                'processed_at': 處理時間
            }
        """
        
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        # 取得文件副檔名
        file_ext = os.path.splitext(file_path)[1].lower()
        
        if file_ext not in self.supported_formats:
            raise ValueError(f"不支援的文件格式: {file_ext}")
        
        try:
            # 提取文件內容
            content = self.supported_formats[file_ext](file_path)
            
            # 生成文件ID
            doc_id = self._generate_doc_id(file_path, content)
            
            # 分段處理
            chunks = self._split_into_chunks(content)
            
            # 整理結果
            result = {
                'doc_id': doc_id,
                'title': os.path.basename(file_path),
                'file_path': file_path,
                'content': content,
                'chunks': chunks,
                'metadata': metadata or {},
                'processed_at': datetime.now().isoformat()
            }
            
            self.logger.info(f"成功處理文件: {file_path}, 生成 {len(chunks)} 個文字段落")
            return result
            
        except Exception as e:
            self.logger.error(f"處理文件失敗 {file_path}: {str(e)}")
            raise

各種格式的處理方法

def _process_txt(self, file_path: str) -> str:
        """處理純文字文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except UnicodeDecodeError:
            # 嘗試其他編碼
            with open(file_path, 'r', encoding='big5') as file:
                return file.read()
    
    def _process_pdf(self, file_path: str) -> str:
        """處理PDF文件"""
        content = ""
        try:
            with open(file_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                
                for page_num in range(len(pdf_reader.pages)):
                    page = pdf_reader.pages[page_num]
                    content += page.extract_text() + "\n"
                    
            return content.strip()
        except Exception as e:
            self.logger.warning(f"PDF處理警告 {file_path}: {str(e)}")
            return ""
    
    def _process_docx(self, file_path: str) -> str:
        """處理Word文件"""
        try:
            doc = docx.Document(file_path)
            content = ""
            
            for paragraph in doc.paragraphs:
                content += paragraph.text + "\n"
            
            # 處理表格
            for table in doc.tables:
                for row in table.rows:
                    row_data = [cell.text.strip() for cell in row.cells]
                    content += " | ".join(row_data) + "\n"
            
            return content.strip()
        except Exception as e:
            self.logger.warning(f"Word文件處理警告 {file_path}: {str(e)}")
            return ""
    
    def _process_pptx(self, file_path: str) -> str:
        """處理PowerPoint文件"""
        try:
            prs = Presentation(file_path)
            content = ""
            
            for slide_num, slide in enumerate(prs.slides, 1):
                content += f"\n--- 投影片 {slide_num} ---\n"
                
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        content += shape.text + "\n"
            
            return content.strip()
        except Exception as e:
            self.logger.warning(f"PowerPoint處理警告 {file_path}: {str(e)}")
            return ""
    
    def _process_xlsx(self, file_path: str) -> str:
        """處理Excel文件"""
        try:
            workbook = openpyxl.load_workbook(file_path)
            content = ""
            
            for sheet_name in workbook.sheetnames:
                worksheet = workbook[sheet_name]
                content += f"\n--- 工作表: {sheet_name} ---\n"
                
                for row in worksheet.iter_rows(values_only=True):
                    row_data = [str(cell) if cell is not None else "" for cell in row]
                    if any(row_data):  # 跳過空行
                        content += " | ".join(row_data) + "\n"
            
            return content.strip()
        except Exception as e:
            self.logger.warning(f"Excel處理警告 {file_path}: {str(e)}")
            return ""

智慧文本分段

def _split_into_chunks(self, content: str) -> List[Dict[str, Any]]:
        """
        將長文本智慧分段,保持語意完整性
        
        分段策略:
        1. 優先按段落分割
        2. 保持適當的段落重疊
        3. 避免在句子中間分割
        """
        
        if len(content) <= self.chunk_size:
            return [{
                'chunk_id': 0,
                'content': content.strip(),
                'char_start': 0,
                'char_end': len(content)
            }]
        
        chunks = []
        paragraphs = content.split('\n\n')  # 按雙換行分段
        current_chunk = ""
        current_start = 0
        chunk_id = 0
        
        for paragraph in paragraphs:
            paragraph = paragraph.strip()
            if not paragraph:
                continue
            
            # 檢查是否加入當前段落會超過長度限制
            test_chunk = current_chunk + "\n\n" + paragraph if current_chunk else paragraph
            
            if len(test_chunk) <= self.chunk_size:
                current_chunk = test_chunk
            else:
                # 儲存當前chunk
                if current_chunk:
                    chunks.append({
                        'chunk_id': chunk_id,
                        'content': current_chunk.strip(),
                        'char_start': current_start,
                        'char_end': current_start + len(current_chunk)
                    })
                    chunk_id += 1
                
                # 處理重疊
                if len(current_chunk) > self.overlap_size:
                    overlap_text = current_chunk[-self.overlap_size:]
                    current_start = current_start + len(current_chunk) - self.overlap_size
                    current_chunk = overlap_text + "\n\n" + paragraph
                else:
                    current_start = current_start + len(current_chunk)
                    current_chunk = paragraph
        
        # 處理最後一個chunk
        if current_chunk:
            chunks.append({
                'chunk_id': chunk_id,
                'content': current_chunk.strip(),
                'char_start': current_start,
                'char_end': current_start + len(current_chunk)
            })
        
        return chunks
    
    def _generate_doc_id(self, file_path: str, content: str) -> str:
        """生成文件的唯一ID"""
        # 使用檔案路徑和內容的hash來生成唯一ID
        combined = f"{file_path}:{len(content)}:{content[:100]}"
        return hashlib.md5(combined.encode()).hexdigest()

批量處理功能

def process_directory(self, directory_path: str, 
                        file_patterns: List[str] = None) -> List[Dict[str, Any]]:
        """
        批量處理目錄中的文件
        
        Args:
            directory_path: 目錄路徑
            file_patterns: 檔案名稱模式過濾器 (例如: ['*FAQ*', '*說明*'])
        """
        
        if not os.path.exists(directory_path):
            raise FileNotFoundError(f"目錄不存在: {directory_path}")
        
        processed_docs = []
        failed_files = []
        
        # 遍歷目錄
        for root, dirs, files in os.walk(directory_path):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file)[1].lower()
                
                # 檢查格式支援
                if file_ext not in self.supported_formats:
                    continue
                
                # 檢查檔案名稱模式
                if file_patterns and not self._match_patterns(file, file_patterns):
                    continue
                
                try:
                    # 加入相對路徑作為元資料
                    relative_path = os.path.relpath(file_path, directory_path)
                    metadata = {
                        'relative_path': relative_path,
                        'directory': os.path.dirname(relative_path),
                        'file_size': os.path.getsize(file_path)
                    }
                    
                    doc_data = self.process_document(file_path, metadata)
                    processed_docs.append(doc_data)
                    
                except Exception as e:
                    failed_files.append({'file': file_path, 'error': str(e)})
                    self.logger.error(f"處理失敗: {file_path} - {str(e)}")
        
        self.logger.info(f"批量處理完成: 成功 {len(processed_docs)} 個文件, 失敗 {len(failed_files)} 個文件")
        
        return {
            'processed_docs': processed_docs,
            'failed_files': failed_files,
            'summary': {
                'total_processed': len(processed_docs),
                'total_failed': len(failed_files),
                'total_chunks': sum(len(doc['chunks']) for doc in processed_docs)
            }
        }
    
    def _match_patterns(self, filename: str, patterns: List[str]) -> bool:
        """檢查檔案名稱是否符合模式"""
        import fnmatch
        return any(fnmatch.fnmatch(filename.lower(), pattern.lower()) for pattern in patterns)

模組二:FAQ管理模組 (faq_manager.py)
FAQ 管理模組負責建立文件索引、執行語意搜尋,並管理知識庫的更新。核心架構設計

import sqlite3
import json
import numpy as np
from datetime import datetime
from typing import List, Dict, Any, Tuple
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import jieba

from document_processor import DocumentProcessor
from config import MemoryConfig

class FAQManager:
    """FAQ文件管理器 - 知識庫的大腦"""
    
    def __init__(self, db_path: str = "faq_knowledge.db"):
        self.db_path = db_path
        self.document_processor = DocumentProcessor()
        
        # 初始化TF-IDF向量化器
        self.vectorizer = TfidfVectorizer(
            max_features=MemoryConfig.TFIDF_CONFIG['max_features'],
            ngram_range=MemoryConfig.TFIDF_CONFIG['ngram_range'],
            stop_words=MemoryConfig.TFIDF_CONFIG['stop_words']
        )
        
        # 快取向量資料
        self._vector_cache = {}
        self._last_vector_update = None
        
        # 初始化資料庫
        self._init_database()
    
    def _init_database(self):
        """初始化FAQ資料庫結構"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 文件表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS documents (
            doc_id TEXT PRIMARY KEY,
            title TEXT NOT NULL,
            file_path TEXT,
            content TEXT,
            metadata TEXT,
            processed_at TEXT,
            last_updated TEXT
        )
        ''')
        
        # 文件片段表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS document_chunks (
            chunk_id TEXT PRIMARY KEY,
            doc_id TEXT,
            chunk_index INTEGER,
            content TEXT,
            char_start INTEGER,
            char_end INTEGER,
            vector_data TEXT,
            FOREIGN KEY (doc_id) REFERENCES documents (doc_id)
        )
        ''')
        
        # 搜尋記錄表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS search_logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            query TEXT,
            results_count INTEGER,
            search_time REAL,
            timestamp TEXT
        )
        ''')
        
        conn.commit()
        conn.close()

文件索引建立

def add_document(self, file_path: str, metadata: Dict[str, Any] = None) -> bool:
        """新增文件到知識庫"""
        try:
            # 處理文件
            doc_data = self.document_processor.process_document(file_path, metadata)
            
            # 儲存到資料庫
            self._save_document_to_db(doc_data)
            
            # 更新向量索引
            self._update_vector_index()
            
            return True
            
        except Exception as e:
            print(f"新增文件失敗 {file_path}: {str(e)}")
            return False
    
    def add_documents_from_directory(self, directory_path: str, 
                                   file_patterns: List[str] = None) -> Dict[str, Any]:
        """批量新增目錄中的文件"""
        result = self.document_processor.process_directory(directory_path, file_patterns)
        
        successful_docs = 0
        failed_docs = []
        
        for doc_data in result['processed_docs']:
            try:
                self._save_document_to_db(doc_data)
                successful_docs += 1
            except Exception as e:
                failed_docs.append({
                    'file': doc_data['file_path'],
                    'error': str(e)
                })
        
        # 更新向量索引
        if successful_docs > 0:
            self._update_vector_index()
        
        return {
            'total_processed': len(result['processed_docs']),
            'successful': successful_docs,
            'failed': len(failed_docs),
            'failed_docs': failed_docs,
            'total_chunks': result['summary']['total_chunks']
        }
    
    def _save_document_to_db(self, doc_data: Dict[str, Any]):
        """將文件資料儲存到資料庫"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # 儲存文件主資料
            cursor.execute('''
            INSERT OR REPLACE INTO documents 
            (doc_id, title, file_path, content, metadata, processed_at, last_updated)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                doc_data['doc_id'],
                doc_data['title'],
                doc_data['file_path'],
                doc_data['content'],
                json.dumps(doc_data['metadata'], ensure_ascii=False),
                doc_data['processed_at'],
                datetime.now().isoformat()
            ))
            
            # 刪除舊的chunk資料
            cursor.execute('DELETE FROM document_chunks WHERE doc_id = ?', (doc_data['doc_id'],))
            
            # 儲存文件片段
            for chunk in doc_data['chunks']:
                chunk_id = f"{doc_data['doc_id']}_chunk_{chunk['chunk_id']}"
                cursor.execute('''
                INSERT INTO document_chunks 
                (chunk_id, doc_id, chunk_index, content, char_start, char_end)
                VALUES (?, ?, ?, ?, ?, ?)
                ''', (
                    chunk_id,
                    doc_data['doc_id'],
                    chunk['chunk_id'],
                    chunk['content'],
                    chunk['char_start'],
                    chunk['char_end']
                ))
            
            conn.commit()
            
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

智慧搜尋引擎

def search(self, query: str, max_results: int = 5, 
           similarity_threshold: float = 0.1) -> List[Dict[str, Any]]:
        """
        智慧搜尋功能 - 結合關鍵字和語意搜尋
        
        搜尋策略:
        1. TF-IDF 向量相似度搜尋
        2. 關鍵字精確匹配加權
        3. 結果相關性排序
        """
        start_time = time.time()
        
        try:
            # 預處理查詢
            processed_query = self._preprocess_query(query)
            
            # 執行向量搜尋
            vector_results = self._vector_search(processed_query, max_results * 2)
            
            # 執行關鍵字搜尋
            keyword_results = self._keyword_search(query, max_results * 2)
            
            # 整合和排序結果
            final_results = self._merge_and_rank_results(
                vector_results, keyword_results, query, similarity_threshold
            )
            
            # 記錄搜尋
            search_time = time.time() - start_time
            self._log_search(query, len(final_results), search_time)
            
            return final_results[:max_results]
            
        except Exception as e:
            print(f"搜尋失敗: {str(e)}")
            return []
    
    def _preprocess_query(self, query: str) -> str:
        """預處理查詢文字"""
        # 使用jieba分詞
        words = jieba.cut(query)
        return " ".join(words)
    
    def _vector_search(self, query: str, max_results: int) -> List[Dict[str, Any]]:
        """向量相似度搜尋"""
        if not self._vector_cache:
            self._update_vector_index()
        
        if not self._vector_cache:
            return []
        
        try:
            # 將查詢轉換為向量
            query_vector = self.vectorizer.transform([query])
            
            # 計算相似度
            chunk_vectors = self._vector_cache['vectors']
            similarities = cosine_similarity(query_vector, chunk_vectors)[0]
            
            # 取得最相似的結果
            top_indices = np.argsort(similarities)[::-1][:max_results]
            
            results = []
            for idx in top_indices:
                chunk_data = self._vector_cache['chunks'][idx]
                similarity = similarities[idx]
                
                if similarity > 0:  # 過濾完全不相關的結果
                    results.append({
                        'chunk_id': chunk_data['chunk_id'],
                        'doc_id': chunk_data['doc_id'],
                        'content': chunk_data['content'],
                        'similarity': float(similarity),
                        'search_type': 'vector'
                    })
            
            return results
            
        except Exception as e:
            print(f"向量搜尋失敗: {str(e)}")
            return []
    
    def _keyword_search(self, query: str, max_results: int) -> List[Dict[str, Any]]:
        """關鍵字精確搜尋"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # 執行全文搜尋
            cursor.execute('''
            SELECT chunk_id, doc_id, content,
                   (LENGTH(content) - LENGTH(REPLACE(LOWER(content), LOWER(?), ''))) / LENGTH(?) as keyword_score
            FROM document_chunks 
            WHERE LOWER(content) LIKE LOWER(?)
            ORDER BY keyword_score DESC
            LIMIT ?
            ''', (query, query, f'%{query}%', max_results))
            
            results = []
            for row in cursor.fetchall():
                results.append({
                    'chunk_id': row[0],
                    'doc_id': row[1],
                    'content': row[2],
                    'similarity': min(row[3], 1.0),  # 正規化分數
                    'search_type': 'keyword'
                })
            
            return results
            
        except Exception as e:
            print(f"關鍵字搜尋失敗: {str(e)}")
            return []
        finally:
            conn.close()
    
    def _merge_and_rank_results(self, vector_results: List[Dict], 
                              keyword_results: List[Dict], 
                              query: str, threshold: float) -> List[Dict]:
        """整合和排序搜尋結果"""
        
        # 合併結果,避免重複
        merged_results = {}
        
        # 加入向量搜尋結果
        for result in vector_results:
            chunk_id = result['chunk_id']
            result['combined_score'] = result['similarity'] * 0.7  # 向量搜尋權重70%
            merged_results[chunk_id] = result
        
        # 加入關鍵字搜尋結果
        for result in keyword_results:
            chunk_id = result['chunk_id']
            keyword_score = result['similarity'] * 0.3  # 關鍵字搜尋權重30%
            
            if chunk_id in merged_results:
                # 結合分數
                merged_results[chunk_id]['combined_score'] += keyword_score
                merged_results[chunk_id]['search_type'] = 'hybrid'
            else:
                result['combined_score'] = keyword_score
                merged_results[chunk_id] = result
        
        # 過濾低相關性結果並排序
        final_results = [
            result for result in merged_results.values() 
            if result['combined_score'] >= threshold
        ]
        
        final_results.sort(key=lambda x: x['combined_score'], reverse=True)
        
        # 添加文件元資料
        self._enrich_results_with_metadata(final_results)
        
        return final_results
    
    def _enrich_results_with_metadata(self, results: List[Dict]):
        """為搜尋結果添加文件元資料"""
        if not results:
            return
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            doc_ids = list(set(result['doc_id'] for result in results))
            placeholders = ','.join('?' * len(doc_ids))
            
            cursor.execute(f'''
            SELECT doc_id, title, file_path, metadata
            FROM documents 
            WHERE doc_id IN ({placeholders})
            ''', doc_ids)
            
            doc_metadata = {row[0]: {
                'title': row[1],
                'file_path': row[2],
                'metadata': json.loads(row[3]) if row[3] else {}
            } for row in cursor.fetchall()}
            
            # 將元資料加入結果
            for result in results:
                doc_id = result['doc_id']
                if doc_id in doc_metadata:
                    result['document'] = doc_metadata[doc_id]
            
        except Exception as e:
            print(f"添加元資料失敗: {str(e)}")
        finally:
            conn.close()

向量索引管理

def _update_vector_index(self):
        """更新TF-IDF向量索引"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # 取得所有文件片段
            cursor.execute('SELECT chunk_id, doc_id, content FROM document_chunks')
            chunks = cursor.fetchall()
            
            if not chunks:
                return
            
            # 準備文本資料
            chunk_texts = [chunk[2] for chunk in chunks]
            chunk_info = [{'chunk_id': chunk[0], 'doc_id': chunk[1], 'content': chunk[2]} 
                         for chunk in chunks]
            
            # 建立TF-IDF向量
            vectors = self.vectorizer.fit_transform(chunk_texts)
            
            # 更新快取
            self._vector_cache = {
                'vectors': vectors,
                'chunks': chunk_info,
                'last_updated': datetime.now()
            }
            
            self._last_vector_update = datetime.now()
            print(f"向量索引更新完成: {len(chunks)} 個文件片段")
            
        except Exception as e:
            print(f"更新向量索引失敗: {str(e)}")
        finally:
            conn.close()
    
    def _log_search(self, query: str, results_count: int, search_time: float):
        """記錄搜尋歷史"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            cursor.execute('''
            INSERT INTO search_logs (query, results_count, search_time, timestamp)
            VALUES (?, ?, ?, ?)
            ''', (query, results_count, search_time, datetime.now().isoformat()))
            
            conn.commit()
        except Exception as e:
            print(f"記錄搜尋失敗: {str(e)}")
        finally:
            conn.close()

上一篇
長期記憶與短期記憶:打造 AI 的智慧記憶系統 - 2
下一篇
整合 FAQ 文件:讓 AI 查資料-2
系列文
來都來了,那就做一個GCP從0到100的AI助理24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言