iT邦幫忙

2025 iThome 鐵人賽

DAY 9
0

Day 9: 用LangGraph實作添加記憶功能

昨天我們學習了上下文管理的理論基礎,今天讓我們用 LangGraph 實作一個具備智能記憶功能的 AI 助理!我們將建立一個能夠記住使用者偏好、學習模式,並提供個性化服務的記憶系統。

🧠 記憶功能架構設計

我們的記憶系統將包含三個層次:

  • 即時記憶:當前對話的上下文
  • 短期記憶:單次會話的重要資訊
  • 長期記憶:使用者偏好、習慣和歷史互動模式

🏗 專案結構

memory_assistant/
├── main.py                      # 主程式入口
├── models/
│   ├── __init__.py
│   ├── memory_state.py          # 記憶狀態定義
│   └── user_profile.py          # 使用者檔案模型
├── memory/
│   ├── __init__.py
│   ├── memory_manager.py        # 記憶管理核心
│   ├── immediate_memory.py      # 即時記憶處理
│   ├── short_term_memory.py     # 短期記憶處理
│   └── long_term_memory.py      # 長期記憶處理
├── workflows/
│   ├── __init__.py
│   └── memory_workflow.py       # LangGraph 記憶工作流程
├── utils/
│   ├── __init__.py
│   └── memory_serializer.py     # 記憶序列化工具
└── config.py                    # 設定檔

🔧 核心程式碼實作

1. 記憶狀態定義 (models/memory_state.py)

from typing import TypedDict, List, Dict, Optional, Any
from datetime import datetime
from dataclasses import dataclass, field

@dataclass
class MemoryItem:
    content: str
    timestamp: datetime
    memory_type: str  # immediate, short_term, long_term
    importance: float = 0.5
    access_count: int = 0
    tags: List[str] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def decay_importance(self, decay_rate: float = 0.95):
        """重要性隨時間衰減"""
        days_old = (datetime.now() - self.timestamp).days
        self.importance *= (decay_rate ** days_old)

class MemoryWorkflowState(TypedDict):
    # 輸入資訊
    user_input: str
    user_id: str
    session_id: str
    
    # 記憶查詢結果
    relevant_memories: List[MemoryItem]
    memory_context: str
    
    # 處理結果
    response: str
    new_memories: List[MemoryItem]
    memory_updates: List[Dict]
    
    # 流程控制
    current_node: str
    processing_mode: str  # remember, recall, update
    confidence_score: float

2. 記憶管理核心 (memory/memory_manager.py)

from typing import List, Dict, Optional
from models.memory_state import MemoryItem
from datetime import datetime, timedelta
import json
import os

class InMemoryStore:
    """記憶體內儲存系統(不使用資料庫)"""
    
    def __init__(self):
        self.immediate_memory: Dict[str, List[MemoryItem]] = {}  # session_id -> memories
        self.short_term_memory: Dict[str, List[MemoryItem]] = {}  # user_id -> memories  
        self.long_term_memory: Dict[str, List[MemoryItem]] = {}   # user_id -> memories
        self.user_profiles: Dict[str, Dict] = {}
    
    def add_immediate_memory(self, session_id: str, memory: MemoryItem):
        """添加即時記憶"""
        if session_id not in self.immediate_memory:
            self.immediate_memory[session_id] = []
        self.immediate_memory[session_id].append(memory)
        
        # 限制即時記憶數量(最多保留20條)
        if len(self.immediate_memory[session_id]) > 20:
            self.immediate_memory[session_id] = self.immediate_memory[session_id][-20:]
    
    def add_short_term_memory(self, user_id: str, memory: MemoryItem):
        """添加短期記憶"""
        if user_id not in self.short_term_memory:
            self.short_term_memory[user_id] = []
        self.short_term_memory[user_id].append(memory)
    
    def add_long_term_memory(self, user_id: str, memory: MemoryItem):
        """添加長期記憶"""
        if user_id not in self.long_term_memory:
            self.long_term_memory[user_id] = []
        self.long_term_memory[user_id].append(memory)
    
    def get_relevant_memories(self, user_id: str, session_id: str, 
                            query: str, limit: int = 5) -> List[MemoryItem]:
        """獲取相關記憶"""
        all_memories = []
        
        # 收集所有相關記憶
        if session_id in self.immediate_memory:
            all_memories.extend(self.immediate_memory[session_id])
        
        if user_id in self.short_term_memory:
            all_memories.extend(self.short_term_memory[user_id])
            
        if user_id in self.long_term_memory:
            all_memories.extend(self.long_term_memory[user_id])
        
        # 簡單的相關性評分(基於關鍵詞匹配)
        query_words = set(query.lower().split())
        scored_memories = []
        
        for memory in all_memories:
            content_words = set(memory.content.lower().split())
            # 計算交集比例作為相關性分數
            if query_words and content_words:
                relevance = len(query_words.intersection(content_words)) / len(query_words)
                scored_memories.append((memory, relevance + memory.importance))
        
        # 按分數排序並返回前N個
        scored_memories.sort(key=lambda x: x[1], reverse=True)
        return [memory for memory, score in scored_memories[:limit]]

class MemoryManager:
    """記憶管理器"""
    
    def __init__(self):
        self.store = InMemoryStore()
    
    def process_new_information(self, user_id: str, session_id: str, 
                              content: str, context: Dict = None) -> List[MemoryItem]:
        """處理新資訊並決定記憶策略"""
        new_memories = []
        
        # 分析資訊重要性
        importance = self._calculate_importance(content, context or {})
        
        # 建立記憶項目
        memory_item = MemoryItem(
            content=content,
            timestamp=datetime.now(),
            memory_type="immediate",
            importance=importance,
            tags=self._extract_tags(content),
            metadata=context or {}
        )
        
        # 決定記憶類型
        if importance > 0.8:
            # 高重要性:直接存入長期記憶
            memory_item.memory_type = "long_term"
            self.store.add_long_term_memory(user_id, memory_item)
        elif importance > 0.5:
            # 中等重要性:存入短期記憶
            memory_item.memory_type = "short_term" 
            self.store.add_short_term_memory(user_id, memory_item)
        else:
            # 低重要性:即時記憶
            self.store.add_immediate_memory(session_id, memory_item)
        
        new_memories.append(memory_item)
        return new_memories
    
    def retrieve_memories(self, user_id: str, session_id: str, query: str) -> List[MemoryItem]:
        """檢索相關記憶"""
        memories = self.store.get_relevant_memories(user_id, session_id, query)
        
        # 更新存取計數
        for memory in memories:
            memory.access_count += 1
            # 被頻繁存取的記憶重要性提升
            if memory.access_count > 3:
                memory.importance = min(1.0, memory.importance + 0.1)
        
        return memories
    
    def _calculate_importance(self, content: str, context: Dict) -> float:
        """計算資訊重要性"""
        importance = 0.3  # 基礎重要性
        
        # 長度加權
        if len(content) > 50:
            importance += 0.2
        
        # 關鍵詞加權
        important_keywords = ['重要', '記住', '下次', '偏好', '不喜歡', '經常', '習慣']
        for keyword in important_keywords:
            if keyword in content:
                importance += 0.3
                break
        
        # 使用者明確要求記住
        if any(phrase in content for phrase in ['請記住', '記錄一下', '以後要用']):
            importance += 0.4
        
        return min(1.0, importance)
    
    def _extract_tags(self, content: str) -> List[str]:
        """提取內容標籤"""
        tags = []
        
        # 簡單的關鍵詞提取
        tag_patterns = {
            'preference': ['喜歡', '偏好', '習慣'],
            'skill': ['擅長', '會', '學過'],  
            'goal': ['想要', '希望', '計劃'],
            'experience': ['經驗', '做過', '用過']
        }
        
        content_lower = content.lower()
        for tag, keywords in tag_patterns.items():
            if any(keyword in content_lower for keyword in keywords):
                tags.append(tag)
        
        return tags

3. LangGraph 記憶工作流程 (workflows/memory_workflow.py)

from langgraph.graph import StateGraph, END
from models.memory_state import MemoryWorkflowState
from memory.memory_manager import MemoryManager
from typing import Literal
import google.generativeai as genai
import os

# 初始化記憶管理器和 Gemini
memory_manager = MemoryManager()
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
model = genai.GenerativeModel('gemini-pro')

def analyze_memory_needs(state: MemoryWorkflowState) -> MemoryWorkflowState:
    """分析記憶需求節點"""
    user_input = state["user_input"]
    
    # 使用關鍵詞判斷處理模式
    if any(keyword in user_input.lower() for keyword in ['記住', '記錄', '保存']):
        processing_mode = "remember"
    elif any(keyword in user_input.lower() for keyword in ['之前', '記得', '剛才', '上次']):
        processing_mode = "recall"  
    else:
        processing_mode = "update"
    
    return {
        **state,
        "processing_mode": processing_mode,
        "current_node": "memory_analysis"
    }

def remember_information(state: MemoryWorkflowState) -> MemoryWorkflowState:
    """記住資訊節點"""
    user_input = state["user_input"]
    user_id = state["user_id"]
    session_id = state["session_id"]
    
    # 處理新資訊
    new_memories = memory_manager.process_new_information(
        user_id, session_id, user_input, {"source": "user_request"}
    )
    
    response = f"✅ 我已經記住了:{user_input}\n這個資訊已被標記為重要並妥善保存。"
    
    return {
        **state,
        "response": response,
        "new_memories": new_memories,
        "current_node": "remember_complete",
        "confidence_score": 0.9
    }

def recall_memories(state: MemoryWorkflowState) -> MemoryWorkflowState:
    """回憶記憶節點"""
    user_input = state["user_input"]
    user_id = state["user_id"] 
    session_id = state["session_id"]
    
    # 檢索相關記憶
    relevant_memories = memory_manager.retrieve_memories(user_id, session_id, user_input)
    
    if relevant_memories:
        # 構建記憶上下文
        memory_context = "\n".join([
            f"[{mem.memory_type}] {mem.content} (重要性: {mem.importance:.2f})"
            for mem in relevant_memories
        ])
        
        # 使用 Gemini 基於記憶生成回應
        prompt = f"""
        基於以下記憶資訊回答使用者問題:
        
        使用者問題:{user_input}
        
        相關記憶:
        {memory_context}
        
        請自然地整合這些記憶資訊來回答問題。
        """
        
        try:
            gemini_response = model.generate_content(prompt)
            response = f"🧠 根據我的記憶:\n{gemini_response.text}"
        except:
            response = f"🧠 我記得相關的內容:\n{memory_context}"
        
        confidence = 0.8
    else:
        response = "🤔 抱歉,我沒有找到相關的記憶。可能我們之前沒有討論過這個話題。"
        memory_context = ""
        confidence = 0.3
    
    return {
        **state,
        "response": response,
        "memory_context": memory_context,
        "relevant_memories": relevant_memories,
        "current_node": "recall_complete",
        "confidence_score": confidence
    }

def update_memory_context(state: MemoryWorkflowState) -> MemoryWorkflowState:
    """更新記憶上下文節點"""
    user_input = state["user_input"]
    user_id = state["user_id"]
    session_id = state["session_id"]
    
    # 自動分析並記錄有價值的資訊
    new_memories = memory_manager.process_new_information(
        user_id, session_id, user_input, {"source": "conversation"}
    )
    
    # 檢索可能相關的記憶
    relevant_memories = memory_manager.retrieve_memories(user_id, session_id, user_input)
    
    # 使用 Gemini 生成回應
    if relevant_memories:
        memory_context = "\n".join([mem.content for mem in relevant_memories[:3]])
        prompt = f"""
        考慮以下背景資訊回答問題:
        
        背景資訊:{memory_context}
        使用者問題:{user_input}
        
        請提供有幫助的回應,適當時可以參考背景資訊。
        """
    else:
        prompt = user_input
    
    try:
        gemini_response = model.generate_content(prompt)
        response = gemini_response.text
        confidence = 0.7
    except:
        response = "我理解了您的問題,讓我為您提供協助。"
        confidence = 0.5
    
    return {
        **state,
        "response": response,
        "new_memories": new_memories,
        "relevant_memories": relevant_memories,
        "current_node": "update_complete", 
        "confidence_score": confidence
    }

def route_memory_processing(state: MemoryWorkflowState) -> Literal["remember", "recall", "update"]:
    """路由記憶處理模式"""
    return state["processing_mode"]

def create_memory_workflow():
    """建立記憶工作流程"""
    workflow = StateGraph(MemoryWorkflowState)
    
    # 添加節點
    workflow.add_node("analyze", analyze_memory_needs)
    workflow.add_node("remember", remember_information)
    workflow.add_node("recall", recall_memories)
    workflow.add_node("update", update_memory_context)
    
    # 設定流程
    workflow.set_entry_point("analyze")
    
    # 條件路由
    workflow.add_conditional_edges(
        "analyze",
        route_memory_processing,
        {
            "remember": "remember",
            "recall": "recall", 
            "update": "update"
        }
    )
    
    # 結束節點
    workflow.add_edge("remember", END)
    workflow.add_edge("recall", END)
    workflow.add_edge("update", END)
    
    return workflow.compile()

4. 主程式 (main.py)

from workflows.memory_workflow import create_memory_workflow
import uuid
from dotenv import load_dotenv

load_dotenv()

def main():
    """記憶功能示例程式"""
    print("🧠 智能記憶助理")
    print("💾 我能記住我們的對話並在適當時候回憶起來")
    print("📝 嘗試說:'記住我喜歡喝咖啡'、'我之前說過什麼偏好?'")
    print("=" * 60)
    
    # 建立工作流程
    app = create_memory_workflow()
    
    # 生成使用者和會話 ID
    user_id = input("請輸入您的名字(或按Enter使用預設):").strip() or "user_001"
    session_id = str(uuid.uuid4())[:8]
    
    print(f"👤 使用者ID:{user_id}")
    print(f"🆔 會話ID:{session_id}")
    print("💡 輸入 'quit' 結束對話\n")
    
    while True:
        try:
            user_input = input(f"💬 {user_id}:").strip()
            
            if not user_input:
                continue
                
            if user_input.lower() in ['quit', 'exit', '退出']:
                print("👋 再見!您的記憶已被保存。")
                break
            
            # 執行記憶工作流程
            initial_state = {
                "user_input": user_input,
                "user_id": user_id,
                "session_id": session_id,
                "relevant_memories": [],
                "memory_context": "",
                "response": "",
                "new_memories": [],
                "memory_updates": [],
                "current_node": "",
                "processing_mode": "",
                "confidence_score": 0.0
            }
            
            result = app.invoke(initial_state)
            
            print(f"🤖 助理:{result['response']}")
            
            # 顯示處理資訊
            if result.get('confidence_score', 0) > 0:
                print(f"🎯 信心度:{result['confidence_score']:.2f}")
            
            if result.get('new_memories'):
                print(f"💾 新增記憶:{len(result['new_memories'])} 項")
                
            print("-" * 40)
            
        except KeyboardInterrupt:
            print("\n👋 再見!您的記憶已被保存。")
            break
        except Exception as e:
            print(f"❌ 發生錯誤:{e}")
            continue

if __name__ == "__main__":
    main()

🎮 使用示範

python main.py

互動範例:

💬 user_001:記住我喜歡早上喝黑咖啡
🤖 助理:✅ 我已經記住了:記住我喜歡早上喝黑咖啡
這個資訊已被標記為重要並妥善保存。

💬 user_001:我之前說過什麼偏好?
🤖 助理:🧠 根據我的記憶:
您之前提到喜歡在早上喝黑咖啡,這是您的飲食偏好之一。

💬 user_001:推薦一些咖啡店
🤖 助理:基於您喜歡黑咖啡的偏好,我推薦以下咖啡店...

🚀 記憶系統特色

三層記憶架構:即時、短期、長期記憶分層管理
智能重要性評估:自動判斷資訊價值
上下文感知:基於記憶提供個性化回應
LangGraph 整合:圖形化工作流程清晰易懂
記憶體儲存:無需資料庫,輕量化部署

🎯 今日總結

今天我們用 LangGraph 建立了一個完整的記憶功能系統!透過三層記憶架構和智能工作流程,AI 助理現在能夠記住重要資訊並在適當時候回憶,提供更個性化的服務體驗。

明天我們將學習如何整合外部 API 服務,讓 AI 助理具備更豐富的功能和資料來源!


上一篇
Day 8: 多輪對話與上下文管理
下一篇
Day 10: 整合外部 API 服務
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手10
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言