在昨天,我們完成了智慧記憶管理器,讓 AI 能夠記住和回憶對話歷史。但在實際應用中,AI 助手還需要能夠查詢外部知識庫——比如公司的 FAQ 文件、產品說明書、政策手冊等。
想像一下這樣的場景:
今天我們將為記憶系統添加文件知識整合能力,讓 AI 不僅有記憶,還能「翻書查資料」!
新增模組架構
📁 ai_memory_system/
├── config.py # 🔧 系統配置模組
├── utils.py # 🛠 文本處理工具模組
├── memory_manager.py # 🧠 記憶管理核心模組
├── chatbot.py # 🤖 AI對話整合模組
├── advanced_features.py # ⚡ 進階功能模組
├── document_processor.py # 📄 文件預處理模組 (新增)
├── faq_manager.py # 📚 FAQ管理模組 (新增)
├── knowledge_integrator.py # 🔗 知識整合模組 (新增)
└── main.py # 🚀 主程序模組
整體資料流程
用戶問題 → 記憶搜尋 + FAQ搜尋 → 知識整合 → 生成回應
↓ ↓ ↓ ↓ ↓
NLP 個人記憶 文件庫 智慧融合 個性化回答
模組一:文件預處理模組 (document_processor.py)
文件預處理是知識整合的基礎,我們需要將各種格式的文件轉換為可搜尋的結構化資料。
核心設計理念
import os
import re
import hashlib
from typing import List, Dict, Any, Tuple
from datetime import datetime
import logging
# 安裝必要套件
# pip install python-docx pypdf2 python-pptx openpyxl
import docx
import PyPDF2
from pptx import Presentation
import openpyxl
class DocumentProcessor:
"""文件預處理器 - 將各種格式文件轉換為可搜尋的結構"""
def __init__(self):
self.supported_formats = {
'.txt': self._process_txt,
'.pdf': self._process_pdf,
'.docx': self._process_docx,
'.pptx': self._process_pptx,
'.xlsx': self._process_xlsx
}
# 文本分段參數
self.chunk_size = 500 # 每段文字長度
self.overlap_size = 50 # 段落重疊長度
# 設置日誌
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def process_document(self, file_path: str, metadata: Dict[str, Any] = None) -> Dict[str, Any]:
"""
處理單個文件,返回結構化資料
Returns:
{
'doc_id': 文件唯一ID,
'title': 文件標題,
'content': 完整內容,
'chunks': 分段內容列表,
'metadata': 元資料,
'processed_at': 處理時間
}
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
# 取得文件副檔名
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in self.supported_formats:
raise ValueError(f"不支援的文件格式: {file_ext}")
try:
# 提取文件內容
content = self.supported_formats[file_ext](file_path)
# 生成文件ID
doc_id = self._generate_doc_id(file_path, content)
# 分段處理
chunks = self._split_into_chunks(content)
# 整理結果
result = {
'doc_id': doc_id,
'title': os.path.basename(file_path),
'file_path': file_path,
'content': content,
'chunks': chunks,
'metadata': metadata or {},
'processed_at': datetime.now().isoformat()
}
self.logger.info(f"成功處理文件: {file_path}, 生成 {len(chunks)} 個文字段落")
return result
except Exception as e:
self.logger.error(f"處理文件失敗 {file_path}: {str(e)}")
raise
各種格式的處理方法
def _process_txt(self, file_path: str) -> str:
"""處理純文字文件"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except UnicodeDecodeError:
# 嘗試其他編碼
with open(file_path, 'r', encoding='big5') as file:
return file.read()
def _process_pdf(self, file_path: str) -> str:
"""處理PDF文件"""
content = ""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
content += page.extract_text() + "\n"
return content.strip()
except Exception as e:
self.logger.warning(f"PDF處理警告 {file_path}: {str(e)}")
return ""
def _process_docx(self, file_path: str) -> str:
"""處理Word文件"""
try:
doc = docx.Document(file_path)
content = ""
for paragraph in doc.paragraphs:
content += paragraph.text + "\n"
# 處理表格
for table in doc.tables:
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
content += " | ".join(row_data) + "\n"
return content.strip()
except Exception as e:
self.logger.warning(f"Word文件處理警告 {file_path}: {str(e)}")
return ""
def _process_pptx(self, file_path: str) -> str:
"""處理PowerPoint文件"""
try:
prs = Presentation(file_path)
content = ""
for slide_num, slide in enumerate(prs.slides, 1):
content += f"\n--- 投影片 {slide_num} ---\n"
for shape in slide.shapes:
if hasattr(shape, "text"):
content += shape.text + "\n"
return content.strip()
except Exception as e:
self.logger.warning(f"PowerPoint處理警告 {file_path}: {str(e)}")
return ""
def _process_xlsx(self, file_path: str) -> str:
"""處理Excel文件"""
try:
workbook = openpyxl.load_workbook(file_path)
content = ""
for sheet_name in workbook.sheetnames:
worksheet = workbook[sheet_name]
content += f"\n--- 工作表: {sheet_name} ---\n"
for row in worksheet.iter_rows(values_only=True):
row_data = [str(cell) if cell is not None else "" for cell in row]
if any(row_data): # 跳過空行
content += " | ".join(row_data) + "\n"
return content.strip()
except Exception as e:
self.logger.warning(f"Excel處理警告 {file_path}: {str(e)}")
return ""
智慧文本分段
def _split_into_chunks(self, content: str) -> List[Dict[str, Any]]:
"""
將長文本智慧分段,保持語意完整性
分段策略:
1. 優先按段落分割
2. 保持適當的段落重疊
3. 避免在句子中間分割
"""
if len(content) <= self.chunk_size:
return [{
'chunk_id': 0,
'content': content.strip(),
'char_start': 0,
'char_end': len(content)
}]
chunks = []
paragraphs = content.split('\n\n') # 按雙換行分段
current_chunk = ""
current_start = 0
chunk_id = 0
for paragraph in paragraphs:
paragraph = paragraph.strip()
if not paragraph:
continue
# 檢查是否加入當前段落會超過長度限制
test_chunk = current_chunk + "\n\n" + paragraph if current_chunk else paragraph
if len(test_chunk) <= self.chunk_size:
current_chunk = test_chunk
else:
# 儲存當前chunk
if current_chunk:
chunks.append({
'chunk_id': chunk_id,
'content': current_chunk.strip(),
'char_start': current_start,
'char_end': current_start + len(current_chunk)
})
chunk_id += 1
# 處理重疊
if len(current_chunk) > self.overlap_size:
overlap_text = current_chunk[-self.overlap_size:]
current_start = current_start + len(current_chunk) - self.overlap_size
current_chunk = overlap_text + "\n\n" + paragraph
else:
current_start = current_start + len(current_chunk)
current_chunk = paragraph
# 處理最後一個chunk
if current_chunk:
chunks.append({
'chunk_id': chunk_id,
'content': current_chunk.strip(),
'char_start': current_start,
'char_end': current_start + len(current_chunk)
})
return chunks
def _generate_doc_id(self, file_path: str, content: str) -> str:
"""生成文件的唯一ID"""
# 使用檔案路徑和內容的hash來生成唯一ID
combined = f"{file_path}:{len(content)}:{content[:100]}"
return hashlib.md5(combined.encode()).hexdigest()
批量處理功能
def process_directory(self, directory_path: str,
file_patterns: List[str] = None) -> List[Dict[str, Any]]:
"""
批量處理目錄中的文件
Args:
directory_path: 目錄路徑
file_patterns: 檔案名稱模式過濾器 (例如: ['*FAQ*', '*說明*'])
"""
if not os.path.exists(directory_path):
raise FileNotFoundError(f"目錄不存在: {directory_path}")
processed_docs = []
failed_files = []
# 遍歷目錄
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
# 檢查格式支援
if file_ext not in self.supported_formats:
continue
# 檢查檔案名稱模式
if file_patterns and not self._match_patterns(file, file_patterns):
continue
try:
# 加入相對路徑作為元資料
relative_path = os.path.relpath(file_path, directory_path)
metadata = {
'relative_path': relative_path,
'directory': os.path.dirname(relative_path),
'file_size': os.path.getsize(file_path)
}
doc_data = self.process_document(file_path, metadata)
processed_docs.append(doc_data)
except Exception as e:
failed_files.append({'file': file_path, 'error': str(e)})
self.logger.error(f"處理失敗: {file_path} - {str(e)}")
self.logger.info(f"批量處理完成: 成功 {len(processed_docs)} 個文件, 失敗 {len(failed_files)} 個文件")
return {
'processed_docs': processed_docs,
'failed_files': failed_files,
'summary': {
'total_processed': len(processed_docs),
'total_failed': len(failed_files),
'total_chunks': sum(len(doc['chunks']) for doc in processed_docs)
}
}
def _match_patterns(self, filename: str, patterns: List[str]) -> bool:
"""檢查檔案名稱是否符合模式"""
import fnmatch
return any(fnmatch.fnmatch(filename.lower(), pattern.lower()) for pattern in patterns)
模組二:FAQ管理模組 (faq_manager.py)
FAQ 管理模組負責建立文件索引、執行語意搜尋,並管理知識庫的更新。核心架構設計
import sqlite3
import json
import numpy as np
from datetime import datetime
from typing import List, Dict, Any, Tuple
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import jieba
from document_processor import DocumentProcessor
from config import MemoryConfig
class FAQManager:
"""FAQ文件管理器 - 知識庫的大腦"""
def __init__(self, db_path: str = "faq_knowledge.db"):
self.db_path = db_path
self.document_processor = DocumentProcessor()
# 初始化TF-IDF向量化器
self.vectorizer = TfidfVectorizer(
max_features=MemoryConfig.TFIDF_CONFIG['max_features'],
ngram_range=MemoryConfig.TFIDF_CONFIG['ngram_range'],
stop_words=MemoryConfig.TFIDF_CONFIG['stop_words']
)
# 快取向量資料
self._vector_cache = {}
self._last_vector_update = None
# 初始化資料庫
self._init_database()
def _init_database(self):
"""初始化FAQ資料庫結構"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 文件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
doc_id TEXT PRIMARY KEY,
title TEXT NOT NULL,
file_path TEXT,
content TEXT,
metadata TEXT,
processed_at TEXT,
last_updated TEXT
)
''')
# 文件片段表
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_chunks (
chunk_id TEXT PRIMARY KEY,
doc_id TEXT,
chunk_index INTEGER,
content TEXT,
char_start INTEGER,
char_end INTEGER,
vector_data TEXT,
FOREIGN KEY (doc_id) REFERENCES documents (doc_id)
)
''')
# 搜尋記錄表
cursor.execute('''
CREATE TABLE IF NOT EXISTS search_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT,
results_count INTEGER,
search_time REAL,
timestamp TEXT
)
''')
conn.commit()
conn.close()
文件索引建立
def add_document(self, file_path: str, metadata: Dict[str, Any] = None) -> bool:
"""新增文件到知識庫"""
try:
# 處理文件
doc_data = self.document_processor.process_document(file_path, metadata)
# 儲存到資料庫
self._save_document_to_db(doc_data)
# 更新向量索引
self._update_vector_index()
return True
except Exception as e:
print(f"新增文件失敗 {file_path}: {str(e)}")
return False
def add_documents_from_directory(self, directory_path: str,
file_patterns: List[str] = None) -> Dict[str, Any]:
"""批量新增目錄中的文件"""
result = self.document_processor.process_directory(directory_path, file_patterns)
successful_docs = 0
failed_docs = []
for doc_data in result['processed_docs']:
try:
self._save_document_to_db(doc_data)
successful_docs += 1
except Exception as e:
failed_docs.append({
'file': doc_data['file_path'],
'error': str(e)
})
# 更新向量索引
if successful_docs > 0:
self._update_vector_index()
return {
'total_processed': len(result['processed_docs']),
'successful': successful_docs,
'failed': len(failed_docs),
'failed_docs': failed_docs,
'total_chunks': result['summary']['total_chunks']
}
def _save_document_to_db(self, doc_data: Dict[str, Any]):
"""將文件資料儲存到資料庫"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 儲存文件主資料
cursor.execute('''
INSERT OR REPLACE INTO documents
(doc_id, title, file_path, content, metadata, processed_at, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
doc_data['doc_id'],
doc_data['title'],
doc_data['file_path'],
doc_data['content'],
json.dumps(doc_data['metadata'], ensure_ascii=False),
doc_data['processed_at'],
datetime.now().isoformat()
))
# 刪除舊的chunk資料
cursor.execute('DELETE FROM document_chunks WHERE doc_id = ?', (doc_data['doc_id'],))
# 儲存文件片段
for chunk in doc_data['chunks']:
chunk_id = f"{doc_data['doc_id']}_chunk_{chunk['chunk_id']}"
cursor.execute('''
INSERT INTO document_chunks
(chunk_id, doc_id, chunk_index, content, char_start, char_end)
VALUES (?, ?, ?, ?, ?, ?)
''', (
chunk_id,
doc_data['doc_id'],
chunk['chunk_id'],
chunk['content'],
chunk['char_start'],
chunk['char_end']
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
智慧搜尋引擎
def search(self, query: str, max_results: int = 5,
similarity_threshold: float = 0.1) -> List[Dict[str, Any]]:
"""
智慧搜尋功能 - 結合關鍵字和語意搜尋
搜尋策略:
1. TF-IDF 向量相似度搜尋
2. 關鍵字精確匹配加權
3. 結果相關性排序
"""
start_time = time.time()
try:
# 預處理查詢
processed_query = self._preprocess_query(query)
# 執行向量搜尋
vector_results = self._vector_search(processed_query, max_results * 2)
# 執行關鍵字搜尋
keyword_results = self._keyword_search(query, max_results * 2)
# 整合和排序結果
final_results = self._merge_and_rank_results(
vector_results, keyword_results, query, similarity_threshold
)
# 記錄搜尋
search_time = time.time() - start_time
self._log_search(query, len(final_results), search_time)
return final_results[:max_results]
except Exception as e:
print(f"搜尋失敗: {str(e)}")
return []
def _preprocess_query(self, query: str) -> str:
"""預處理查詢文字"""
# 使用jieba分詞
words = jieba.cut(query)
return " ".join(words)
def _vector_search(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""向量相似度搜尋"""
if not self._vector_cache:
self._update_vector_index()
if not self._vector_cache:
return []
try:
# 將查詢轉換為向量
query_vector = self.vectorizer.transform([query])
# 計算相似度
chunk_vectors = self._vector_cache['vectors']
similarities = cosine_similarity(query_vector, chunk_vectors)[0]
# 取得最相似的結果
top_indices = np.argsort(similarities)[::-1][:max_results]
results = []
for idx in top_indices:
chunk_data = self._vector_cache['chunks'][idx]
similarity = similarities[idx]
if similarity > 0: # 過濾完全不相關的結果
results.append({
'chunk_id': chunk_data['chunk_id'],
'doc_id': chunk_data['doc_id'],
'content': chunk_data['content'],
'similarity': float(similarity),
'search_type': 'vector'
})
return results
except Exception as e:
print(f"向量搜尋失敗: {str(e)}")
return []
def _keyword_search(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""關鍵字精確搜尋"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 執行全文搜尋
cursor.execute('''
SELECT chunk_id, doc_id, content,
(LENGTH(content) - LENGTH(REPLACE(LOWER(content), LOWER(?), ''))) / LENGTH(?) as keyword_score
FROM document_chunks
WHERE LOWER(content) LIKE LOWER(?)
ORDER BY keyword_score DESC
LIMIT ?
''', (query, query, f'%{query}%', max_results))
results = []
for row in cursor.fetchall():
results.append({
'chunk_id': row[0],
'doc_id': row[1],
'content': row[2],
'similarity': min(row[3], 1.0), # 正規化分數
'search_type': 'keyword'
})
return results
except Exception as e:
print(f"關鍵字搜尋失敗: {str(e)}")
return []
finally:
conn.close()
def _merge_and_rank_results(self, vector_results: List[Dict],
keyword_results: List[Dict],
query: str, threshold: float) -> List[Dict]:
"""整合和排序搜尋結果"""
# 合併結果,避免重複
merged_results = {}
# 加入向量搜尋結果
for result in vector_results:
chunk_id = result['chunk_id']
result['combined_score'] = result['similarity'] * 0.7 # 向量搜尋權重70%
merged_results[chunk_id] = result
# 加入關鍵字搜尋結果
for result in keyword_results:
chunk_id = result['chunk_id']
keyword_score = result['similarity'] * 0.3 # 關鍵字搜尋權重30%
if chunk_id in merged_results:
# 結合分數
merged_results[chunk_id]['combined_score'] += keyword_score
merged_results[chunk_id]['search_type'] = 'hybrid'
else:
result['combined_score'] = keyword_score
merged_results[chunk_id] = result
# 過濾低相關性結果並排序
final_results = [
result for result in merged_results.values()
if result['combined_score'] >= threshold
]
final_results.sort(key=lambda x: x['combined_score'], reverse=True)
# 添加文件元資料
self._enrich_results_with_metadata(final_results)
return final_results
def _enrich_results_with_metadata(self, results: List[Dict]):
"""為搜尋結果添加文件元資料"""
if not results:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
doc_ids = list(set(result['doc_id'] for result in results))
placeholders = ','.join('?' * len(doc_ids))
cursor.execute(f'''
SELECT doc_id, title, file_path, metadata
FROM documents
WHERE doc_id IN ({placeholders})
''', doc_ids)
doc_metadata = {row[0]: {
'title': row[1],
'file_path': row[2],
'metadata': json.loads(row[3]) if row[3] else {}
} for row in cursor.fetchall()}
# 將元資料加入結果
for result in results:
doc_id = result['doc_id']
if doc_id in doc_metadata:
result['document'] = doc_metadata[doc_id]
except Exception as e:
print(f"添加元資料失敗: {str(e)}")
finally:
conn.close()
向量索引管理
def _update_vector_index(self):
"""更新TF-IDF向量索引"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 取得所有文件片段
cursor.execute('SELECT chunk_id, doc_id, content FROM document_chunks')
chunks = cursor.fetchall()
if not chunks:
return
# 準備文本資料
chunk_texts = [chunk[2] for chunk in chunks]
chunk_info = [{'chunk_id': chunk[0], 'doc_id': chunk[1], 'content': chunk[2]}
for chunk in chunks]
# 建立TF-IDF向量
vectors = self.vectorizer.fit_transform(chunk_texts)
# 更新快取
self._vector_cache = {
'vectors': vectors,
'chunks': chunk_info,
'last_updated': datetime.now()
}
self._last_vector_update = datetime.now()
print(f"向量索引更新完成: {len(chunks)} 個文件片段")
except Exception as e:
print(f"更新向量索引失敗: {str(e)}")
finally:
conn.close()
def _log_search(self, query: str, results_count: int, search_time: float):
"""記錄搜尋歷史"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
cursor.execute('''
INSERT INTO search_logs (query, results_count, search_time, timestamp)
VALUES (?, ?, ?, ?)
''', (query, results_count, search_time, datetime.now().isoformat()))
conn.commit()
except Exception as e:
print(f"記錄搜尋失敗: {str(e)}")
finally:
conn.close()