iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
AI & Data

來都來了,那就做一個GCP從0到100的AI助理系列 第 24

基礎服務實作:從 chat-service 開始動手

  • 分享至 

  • xImage
  •  

前篇我們畫好了系統藍圖,這篇開始寫程式碼。我們先讓最小可用版本跑起來,再逐步加功能。

目標:30 分鐘內讓一個能對話的 AI 助手在本地跑起來,再花 30 分鐘部署到 GCP

1) 專案結構(先把骨架搭好)

ai-assistant/
├── services/
│   ├── chat/                 # chat-service 主服務
│   │   ├── app/
│   │   │   ├── __init__.py
│   │   │   ├── main.py       # FastAPI 應用入口
│   │   │   ├── models.py     # 資料模型
│   │   │   ├── handlers.py   # 業務邏輯
│   │   │   └── config.py     # 配置管理
│   │   ├── Dockerfile
│   │   └── requirements.txt
│   ├── memory/               # mem-service 記憶服務
│   │   ├── app/
│   │   │   ├── __init__.py
│   │   │   ├── main.py
│   │   │   ├── models.py
│   │   │   └── database.py   # 資料庫操作
│   │   ├── Dockerfile
│   │   └── requirements.txt
│   └── gateway/              # api-gateway(下一篇詳述)
├── shared/                   # 共用程式庫
│   ├── __init__.py
│   ├── auth.py              # JWT 驗證
│   ├── events.py            # Pub/Sub 事件
│   └── utils.py             # 工具函數
├── docker-compose.yml       # 本地開發環境
├── cloudbuild.yaml          # GCP 部署配置
└── README.md

2) chat-service 核心實作

資料模型(services/chat/app/models.py

from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum

class MessageRole(str, Enum):
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"

class ProcessingMode(str, Enum):
    SYNC = "sync"           # 同步處理(快速回應)
    ASYNC = "async"         # 非同步處理(複雜任務)
    HYBRID = "hybrid"       # 混合(先 ACK,後完整回應)

class ChatMessage(BaseModel):
    role: MessageRole
    content: str
    timestamp: datetime = Field(default_factory=datetime.now)
    metadata: Dict[str, Any] = Field(default_factory=dict)

class ChatRequest(BaseModel):
    message: str
    chat_id: Optional[str] = None
    user_id: str
    processing_mode: ProcessingMode = ProcessingMode.HYBRID
    context: Dict[str, Any] = Field(default_factory=dict)

class ChatResponse(BaseModel):
    message: str
    chat_id: str
    processing_mode: ProcessingMode
    is_complete: bool = False    # 是否為最終回應
    requires_followup: bool = False  # 是否需要後續處理
    metadata: Dict[str, Any] = Field(default_factory=dict)

class TaskEvent(BaseModel):
    """Pub/Sub 事件格式"""
    task_id: str
    chat_id: str
    user_id: str
    message: str
    context: Dict[str, Any]
    created_at: datetime = Field(default_factory=datetime.now)

配置管理(services/chat/app/config.py

import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # 服務基礎配置
    service_name: str = "chat-service"
    environment: str = os.getenv("ENVIRONMENT", "development")
    debug: bool = os.getenv("DEBUG", "false").lower() == "true"

    # Google Cloud 配置
    gcp_project_id: str = os.getenv("GCP_PROJECT_ID", "")
    vertex_location: str = os.getenv("VERTEX_LOCATION", "asia-east1")

    # Pub/Sub 配置
    pubsub_topic_chat_tasks: str = os.getenv("PUBSUB_TOPIC_CHAT_TASKS", "chat-tasks")

    # 外部服務 URL
    memory_service_url: str = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8080")
    rag_service_url: str = os.getenv("RAG_SERVICE_URL", "http://rag-service:8080")

    # API 配置
    gemini_api_key: str = os.getenv("GEMINI_API_KEY", "")
    jwt_secret: str = os.getenv("JWT_SECRET", "your-secret-key")

    # 效能配置
    response_timeout: int = int(os.getenv("RESPONSE_TIMEOUT", "2"))  # 秒
    max_context_length: int = int(os.getenv("MAX_CONTEXT_LENGTH", "4000"))

    class Config:
        env_file = ".env"
        case_sensitive = False

settings = Settings()

業務邏輯核心(services/chat/app/handlers.py

import uuid
import asyncio
import httpx
from datetime import datetime
from typing import Optional, Dict, Any, List

from google.cloud import pubsub_v1
import google.generativeai as genai

from .models import ChatRequest, ChatResponse, ChatMessage, ProcessingMode, TaskEvent
from .config import settings

class ChatHandler:
    def __init__(self):
        # 初始化 Gemini API
        genai.configure(api_key=settings.gemini_api_key)
        self.model = genai.GenerativeModel('gemini-pro')

        # 初始化 Pub/Sub(生產環境)
        if settings.environment != "development":
            self.publisher = pubsub_v1.PublisherClient()
            self.topic_path = self.publisher.topic_path(
                settings.gcp_project_id,
                settings.pubsub_topic_chat_tasks
            )
        else:
            self.publisher = None

        # HTTP 客戶端(用於服務間通訊)
        self.http_client = httpx.AsyncClient(timeout=5.0)

    async def process_chat(self, request: ChatRequest) -> ChatResponse:
        """主要對話處理邏輯"""

        # 1. 生成或使用現有 chat_id
        chat_id = request.chat_id or str(uuid.uuid4())

        # 2. 載入用戶記憶和上下文
        context = await self._load_user_context(request.user_id, chat_id)

        # 3. 決定處理模式
        processing_mode = self._determine_processing_mode(request.message, context)

        # 4. 根據模式處理
        if processing_mode == ProcessingMode.SYNC:
            return await self._handle_sync(request, chat_id, context)
        elif processing_mode == ProcessingMode.ASYNC:
            return await self._handle_async(request, chat_id, context)
        else:  # HYBRID
            return await self._handle_hybrid(request, chat_id, context)

    def _determine_processing_mode(self, message: str, context: Dict) -> ProcessingMode:
        """智慧判斷處理模式"""

        # 簡單規則判斷(生產環境可用 ML 模型優化)
        message_lower = message.lower()

        # 同步處理條件
        sync_keywords = ["你好", "hello", "謝謝", "thank", "再見", "bye"]
        if any(keyword in message_lower for keyword in sync_keywords):
            return ProcessingMode.SYNC

        # 非同步處理條件
        async_keywords = ["分析", "研究", "報告", "詳細", "複雜", "計算"]
        if any(keyword in message_lower for keyword in async_keywords):
            return ProcessingMode.ASYNC

        # 預設混合模式
        return ProcessingMode.HYBRID

    async def _handle_sync(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
        """同步處理:快速回應"""
        try:
            # 建立簡化的 prompt
            prompt = self._build_simple_prompt(request.message, context)

            # 呼叫 Gemini API(設定較短的 max_tokens)
            response = await asyncio.wait_for(
                self._call_gemini(prompt, max_tokens=150),
                timeout=settings.response_timeout
            )

            # 儲存對話到記憶服務
            await self._save_conversation(chat_id, request.user_id, request.message, response)

            return ChatResponse(
                message=response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.SYNC,
                is_complete=True,
                requires_followup=False
            )

        except asyncio.TimeoutError:
            # 超時轉為混合模式
            return await self._handle_hybrid(request, chat_id, context)
        except Exception as e:
            return ChatResponse(
                message="抱歉,我現在無法回應。請稍後再試。",
                chat_id=chat_id,
                processing_mode=ProcessingMode.SYNC,
                is_complete=True,
                requires_followup=False,
                metadata={"error": str(e)}
            )

    async def _handle_async(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
        """非同步處理:複雜任務"""

        # 立即發送 ACK
        await self._publish_task_event(request, chat_id, context)

        return ChatResponse(
            message="我收到你的問題,正在仔細思考中。請稍等片刻,我會盡快回覆完整答案。",
            chat_id=chat_id,
            processing_mode=ProcessingMode.ASYNC,
            is_complete=False,
            requires_followup=True,
            metadata={"estimated_time": "10-30秒"}
        )

    async def _handle_hybrid(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
        """混合處理:快速 ACK + 後續完整回應"""

        # 先給一個快速回應
        quick_response = self._generate_quick_response(request.message)

        # 發送完整處理任務到 Pub/Sub
        await self._publish_task_event(request, chat_id, context)

        return ChatResponse(
            message=quick_response,
            chat_id=chat_id,
            processing_mode=ProcessingMode.HYBRID,
            is_complete=False,
            requires_followup=True,
            metadata={"status": "initial_response"}
        )

    def _generate_quick_response(self, message: str) -> str:
        """生成快速回應(無需 AI 推理)"""
        quick_responses = {
            "分析": "我正在分析相關資料...",
            "解釋": "讓我為你詳細解釋...",
            "幫助": "我正在查找相關資訊來幫助你...",
            "報告": "我正在準備詳細報告...",
        }

        for keyword, response in quick_responses.items():
            if keyword in message:
                return response

        return "我正在處理你的問題,請稍等片刻..."

    async def _load_user_context(self, user_id: str, chat_id: str) -> Dict[str, Any]:
        """從記憶服務載入用戶上下文"""
        try:
            response = await self.http_client.get(
                f"{settings.memory_service_url}/memory/{user_id}",
                params={"chat_id": chat_id, "limit": 5}
            )

            if response.status_code == 200:
                return response.json()
            else:
                return {"recent_messages": [], "user_preferences": {}}

        except Exception as e:
            print(f"載入用戶上下文失敗: {e}")
            return {"recent_messages": [], "user_preferences": {}}

    async def _save_conversation(self, chat_id: str, user_id: str, user_message: str, assistant_message: str):
        """儲存對話到記憶服務"""
        try:
            payload = {
                "chat_id": chat_id,
                "user_id": user_id,
                "messages": [
                    {"role": "user", "content": user_message, "timestamp": datetime.now().isoformat()},
                    {"role": "assistant", "content": assistant_message, "timestamp": datetime.now().isoformat()}
                ]
            }

            await self.http_client.post(
                f"{settings.memory_service_url}/memory/save",
                json=payload
            )
        except Exception as e:
            print(f"儲存對話失敗: {e}")

    async def _call_gemini(self, prompt: str, max_tokens: int = 1000) -> str:
        """呼叫 Gemini API"""
        try:
            response = self.model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    max_output_tokens=max_tokens,
                    temperature=0.7,
                )
            )
            return response.text
        except Exception as e:
            raise Exception(f"Gemini API 呼叫失敗: {e}")

    def _build_simple_prompt(self, message: str, context: Dict) -> str:
        """建立簡化的 prompt(同步處理用)"""
        recent_messages = context.get("recent_messages", [])

        prompt = "你是一個友善的 AI 助手。請簡潔回應用戶的問題。\n\n"

        # 加入最近對話
        if recent_messages:
            prompt += "最近對話:\n"
            for msg in recent_messages[-3:]:  # 只取最近 3 則
                prompt += f"{msg['role']}: {msg['content']}\n"
            prompt += "\n"

        prompt += f"用戶問題: {message}\n回答:"
        return prompt

    async def _publish_task_event(self, request: ChatRequest, chat_id: str, context: Dict):
        """發送任務事件到 Pub/Sub"""
        if not self.publisher:
            print(f"開發環境:模擬發送任務事件 - {request.message}")
            return

        try:
            event = TaskEvent(
                task_id=str(uuid.uuid4()),
                chat_id=chat_id,
                user_id=request.user_id,
                message=request.message,
                context=context
            )

            message_data = event.model_dump_json().encode('utf-8')
            future = self.publisher.publish(self.topic_path, message_data)

            print(f"任務事件已發送: {future.result()}")

        except Exception as e:
            print(f"發送任務事件失敗: {e}")

FastAPI 應用主程式(services/chat/app/main.py

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import uvicorn

from .models import ChatRequest, ChatResponse
from .handlers import ChatHandler
from .config import settings

# 應用生命週期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 啟動時初始化
    app.state.chat_handler = ChatHandler()
    print(f"🚀 {settings.service_name} 服務啟動完成")

    yield

    # 關閉時清理
    if hasattr(app.state.chat_handler, 'http_client'):
        await app.state.chat_handler.http_client.aclose()
    print(f"🛑 {settings.service_name} 服務關閉完成")

# 建立 FastAPI 應用
app = FastAPI(
    title="AI Chat Service",
    description="智慧對話服務 - 支援同步/非同步處理",
    version="1.0.0",
    lifespan=lifespan
)

# CORS 設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"] if settings.debug else ["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 依賴注入
def get_chat_handler() -> ChatHandler:
    return app.state.chat_handler

@app.get("/health")
async def health_check():
    """健康檢查端點"""
    return {
        "status": "healthy",
        "service": settings.service_name,
        "environment": settings.environment,
        "timestamp": "2024-01-01T00:00:00Z"
    }

@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
    request: ChatRequest,
    handler: ChatHandler = Depends(get_chat_handler)
):
    """主要對話端點"""
    try:
        response = await handler.process_chat(request)
        return response

    except Exception as e:
        print(f"對話處理錯誤: {e}")
        raise HTTPException(
            status_code=500,
            detail="服務暫時無法使用,請稍後再試"
        )

@app.get("/chat/{chat_id}/status")
async def get_chat_status(chat_id: str):
    """查詢對話狀態"""
    # TODO: 實作對話狀態查詢
    return {
        "chat_id": chat_id,
        "status": "active",
        "last_activity": "2024-01-01T00:00:00Z"
    }

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8080,
        reload=settings.debug,
        log_level="info"
    )

3) memory-service 基礎實作

資料庫模型(services/memory/app/models.py

from sqlalchemy import Column, String, Text, DateTime, Integer, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from datetime import datetime

Base = declarative_base()

class ConversationHistory(Base):
    __tablename__ = "conversation_history"

    id = Column(Integer, primary_key=True, autoincrement=True)
    chat_id = Column(String(36), nullable=False, index=True)
    user_id = Column(String(36), nullable=False, index=True)
    role = Column(String(20), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    metadata = Column(JSON, default={})
    created_at = Column(DateTime(timezone=True), server_default=func.now())

class UserMemory(Base):
    __tablename__ = "user_memory"

    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(String(36), nullable=False, unique=True, index=True)
    short_term_summary = Column(Text)  # 最近對話摘要
    long_term_memory = Column(JSON, default={})  # 用戶偏好、重要資訊
    preferences = Column(JSON, default={})  # 個人化設定
    updated_at = Column(DateTime(timezone=True), onupdate=func.now(), server_default=func.now())

# Pydantic 模型(API 交換格式)
class MessageInput(BaseModel):
    role: str
    content: str
    timestamp: Optional[datetime] = None
    metadata: Dict[str, Any] = {}

class SaveConversationRequest(BaseModel):
    chat_id: str
    user_id: str
    messages: List[MessageInput]

class MemoryResponse(BaseModel):
    recent_messages: List[Dict[str, Any]]
    user_preferences: Dict[str, Any]
    summary: Optional[str] = None

資料庫操作(services/memory/app/database.py

import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from typing import List, Dict, Any, Optional

from .models import Base, ConversationHistory, UserMemory

class MemoryDatabase:
    def __init__(self):
        # 資料庫連線設定
        database_url = os.getenv("DATABASE_URL", "sqlite:///./memory.db")

        if database_url.startswith("sqlite"):
            # 開發環境用 SQLite
            self.engine = create_engine(database_url, echo=False)
            self.SessionLocal = sessionmaker(bind=self.engine)
        else:
            # 生產環境用 PostgreSQL
            self.engine = create_engine(database_url, echo=False, pool_pre_ping=True)
            self.SessionLocal = sessionmaker(bind=self.engine)

        # 建立資料表
        Base.metadata.create_all(bind=self.engine)

    def get_session(self) -> Session:
        return self.SessionLocal()

    async def save_conversation(self, chat_id: str, user_id: str, messages: List[Dict[str, Any]]):
        """儲存對話記錄"""
        with self.get_session() as session:
            try:
                for msg in messages:
                    conversation = ConversationHistory(
                        chat_id=chat_id,
                        user_id=user_id,
                        role=msg["role"],
                        content=msg["content"],
                        metadata=msg.get("metadata", {})
                    )
                    session.add(conversation)

                session.commit()

                # 更新用戶記憶摘要
                await self._update_user_memory(session, user_id, messages)

            except Exception as e:
                session.rollback()
                raise e

    async def get_user_context(self, user_id: str, chat_id: Optional[str] = None, limit: int = 10) -> Dict[str, Any]:
        """取得用戶上下文"""
        with self.get_session() as session:
            try:
                # 取得最近對話
                query = session.query(ConversationHistory).filter(
                    ConversationHistory.user_id == user_id
                )

                if chat_id:
                    query = query.filter(ConversationHistory.chat_id == chat_id)

                recent_messages = query.order_by(
                    ConversationHistory.created_at.desc()
                ).limit(limit).all()

                # 取得用戶記憶
                user_memory = session.query(UserMemory).filter(
                    UserMemory.user_id == user_id
                ).first()

                return {
                    "recent_messages": [
                        {
                            "role": msg.role,
                            "content": msg.content,
                            "timestamp": msg.created_at.isoformat(),
                            "metadata": msg.metadata
                        } for msg in reversed(recent_messages)
                    ],
                    "user_preferences": user_memory.preferences if user_memory else {},
                    "summary": user_memory.short_term_summary if user_memory else None
                }

            except Exception as e:
                print(f"取得用戶上下文失敗: {e}")
                return {"recent_messages": [], "user_preferences": {}, "summary": None}

    async def _update_user_memory(self, session: Session, user_id: str, new_messages: List[Dict[str, Any]]):
        """更新用戶記憶摘要"""
        try:
            user_memory = session.query(UserMemory).filter(
                UserMemory.user_id == user_id
            ).first()

            if not user_memory:
                user_memory = UserMemory(
                    user_id=user_id,
                    short_term_summary="",
                    long_term_memory={},
                    preferences={}
                )
                session.add(user_memory)

            # 簡單的摘要更新(生產環境應該用 LLM 生成更好的摘要)
            latest_content = " ".join([msg["content"] for msg in new_messages[-3:]])
            user_memory.short_term_summary = f"最近討論: {latest_content[:200]}..."

            session.commit()

        except Exception as e:
            print(f"更新用戶記憶失敗: {e}")

Memory Service 主程式(services/memory/app/main.py

from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager

from .models import SaveConversationRequest, MemoryResponse
from .database import MemoryDatabase

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 初始化資料庫
    app.state.db = MemoryDatabase()
    print("🧠 Memory Service 啟動完成")
    yield
    print("🧠 Memory Service 關閉完成")

app = FastAPI(
    title="Memory Service",
    description="用戶記憶與對話歷史管理",
    version="1.0.0",
    lifespan=lifespan
)

def get_database() -> MemoryDatabase:
    return app.state.db

@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "memory-service"}

@app.post("/memory/save")
async def save_conversation(
    request: SaveConversationRequest,
    db: MemoryDatabase = Depends(get_database)
):
    """儲存對話記錄"""
    try:
        await db.save_conversation(
            request.chat_id,
            request.user_id,
            [msg.model_dump() for msg in request.messages]
        )
        return {"status": "success"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/memory/{user_id}", response_model=MemoryResponse)
async def get_user_memory(
    user_id: str,
    chat_id: str = None,
    limit: int = 10,
    db: MemoryDatabase = Depends(get_database)
):
    """取得用戶記憶"""
    try:
        context = await db.get_user_context(user_id, chat_id, limit)
        return MemoryResponse(**context)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

4) 本地開發環境設定

Docker Compose(docker-compose.yml

version: '3.8'

services:
  chat-service:
    build:
      context: .
      dockerfile: services/chat/Dockerfile
    ports:
      - "8080:8080"
    environment:
      - ENVIRONMENT=development
      - DEBUG=true
      - MEMORY_SERVICE_URL=http://memory-service:8081
      - GEMINI_API_KEY=${GEMINI_API_KEY}
      - JWT_SECRET=dev-secret-key
    depends_on:
      - memory-service
    volumes:
      - ./services/chat:/app
      - ./shared:/app/shared

  memory-service:
    build:
      context: .
      dockerfile: services/memory/Dockerfile
    ports:
      - "8081:8080"
    environment:
      - DATABASE_URL=sqlite:///./data/memory.db
    volumes:
      - ./services/memory:/app
      - ./data:/app/data

  # 開發用資料庫(可選)
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=ai_assistant
      - POSTGRES_USER=dev
      - POSTGRES_PASSWORD=devpassword
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

Chat Service Dockerfile(services/chat/Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安裝系統依賴
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 複製需求檔案
COPY services/chat/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 複製應用程式
COPY services/chat/app ./app
COPY shared ./shared

# 設定環境變數
ENV PYTHONPATH=/app
ENV PORT=8080

# 暴露連接埠
EXPOSE 8080

# 啟動命令
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

需求檔案(services/chat/requirements.txt

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
httpx==0.25.2
google-generativeai==0.3.2
google-cloud-pubsub==2.18.4
python-multipart==0.0.6
python-jose[cryptography]==3.3.0

5) 快速測試

啟動服務

# 1. 設定環境變數
export GEMINI_API_KEY="your-gemini-api-key"

# 2. 啟動服務
docker-compose up --build

# 3. 測試健康檢查
curl http://localhost:8080/health
curl http://localhost:8081/health

測試對話

# 同步對話測試
curl -X POST "http://localhost:8080/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "message": "你好",
    "user_id": "test-user-123",
    "processing_mode": "sync"
  }'

# 混合模式測試
curl -X POST "http://localhost:8080/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "message": "幫我分析一下人工智慧的發展趨勢",
    "user_id": "test-user-123",
    "processing_mode": "hybrid"
  }'

Python 客戶端測試

import requests
import json

def test_chat_service():
    url = "http://localhost:8080/chat"

    # 測試資料
    test_cases = [
        {
            "message": "你好",
            "user_id": "test-user-123",
            "processing_mode": "sync"
        },
        {
            "message": "幫我寫一份關於 AI 的詳細報告",
            "user_id": "test-user-123",
            "processing_mode": "async"
        }
    ]

    for i, test_data in enumerate(test_cases):
        print(f"\n測試案例 {i+1}: {test_data['message']}")

        response = requests.post(url, json=test_data)

        if response.status_code == 200:
            result = response.json()
            print(f"✅ 回應: {result['message']}")
            print(f"📋 模式: {result['processing_mode']}")
            print(f"🔄 需要後續: {result['requires_followup']}")
        else:
            print(f"❌ 錯誤: {response.status_code} - {response.text}")

if __name__ == "__main__":
    test_chat_service()

6) 部署到 GCP

Cloud Build 配置(cloudbuild.yaml

steps:
  # 建置 chat-service
  - name: 'gcr.io/cloud-builders/docker'
    args: [
      'build',
      '-t', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA',
      '-f', 'services/chat/Dockerfile',
      '.'
    ]

  # 建置 memory-service
  - name: 'gcr.io/cloud-builders/docker'
    args: [
      'build',
      '-t', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA',
      '-f', 'services/memory/Dockerfile',
      '.'
    ]

  # 推送映像
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA']

  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA']

  # 部署 memory-service
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    entrypoint: 'gcloud'
    args:
      - 'run'
      - 'deploy'
      - 'memory-service'
      - '--image=asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA'
      - '--region=asia-east1'
      - '--service-account=memory-sa@$PROJECT_ID.iam.gserviceaccount.com'
      - '--set-secrets=DATABASE_URL=database-url:latest'
      - '--min-instances=1'
      - '--max-instances=5'
      - '--cpu=1'
      - '--memory=512Mi'

  # 部署 chat-service
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    entrypoint: 'gcloud'
    args:
      - 'run'
      - 'deploy'
      - 'chat-service'
      - '--image=asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA'
      - '--region=asia-east1'
      - '--service-account=chat-sa@$PROJECT_ID.iam.gserviceaccount.com'
      - '--set-secrets=GEMINI_API_KEY=gemini-api-key:latest,JWT_SECRET=jwt-secret:latest'
      - '--set-env-vars=MEMORY_SERVICE_URL=https://memory-service-xxx.run.app,ENVIRONMENT=production'
      - '--min-instances=1'
      - '--max-instances=10'
      - '--cpu=2'
      - '--memory=1Gi'

options:
  logging: CLOUD_LOGGING_ONLY

部署腳本(deploy.sh

#!/bin/bash

# 設定變數
PROJECT_ID="your-gcp-project"
REGION="asia-east1"

echo "🚀 開始部署 AI Assistant 服務..."

# 1. 設定 GCP 專案
gcloud config set project $PROJECT_ID

# 2. 啟用必要的 API
gcloud services enable run.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable secretmanager.googleapis.com
gcloud services enable pubsub.googleapis.com

# 3. 建立 Artifact Registry
gcloud artifacts repositories create ai-assistant \
  --repository-format=docker \
  --location=$REGION

# 4. 建立 Service Accounts
gcloud iam service-accounts create chat-sa --display-name="Chat Service Account"
gcloud iam service-accounts create memory-sa --display-name="Memory Service Account"

# 5. 建立 Secrets
echo -n "your-gemini-api-key" | gcloud secrets create gemini-api-key --data-file=-
echo -n "your-jwt-secret" | gcloud secrets create jwt-secret --data-file=-
echo -n "postgresql://user:pass@host:5432/db" | gcloud secrets create database-url --data-file=-

# 6. 設定 IAM 權限
gcloud secrets add-iam-policy-binding gemini-api-key \
  --member="serviceAccount:chat-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

gcloud secrets add-iam-policy-binding database-url \
  --member="serviceAccount:memory-sa@$PROJECT_ID.iam.gserviceaccount.com" \
  --role="roles/secretmanager.secretAccessor"

# 7. 建立 Pub/Sub Topic
gcloud pubsub topics create chat-tasks

# 8. 執行 Cloud Build
gcloud builds submit --config cloudbuild.yaml

echo "✅ 部署完成!"
echo "🔗 Chat Service: https://chat-service-xxx.run.app"
echo "🔗 Memory Service: https://memory-service-xxx.run.app"

7) 監控與除錯

結構化日誌

import json
import logging
from datetime import datetime

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)

    def log(self, level: str, message: str, **kwargs):
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "service": self.service_name,
            "level": level,
            "message": message,
            **kwargs
        }

        if level.upper() == "ERROR":
            self.logger.error(json.dumps(log_entry))
        elif level.upper() == "WARNING":
            self.logger.warning(json.dumps(log_entry))
        else:
            self.logger.info(json.dumps(log_entry))

# 使用範例
logger = StructuredLogger("chat-service")

# 在 handlers.py 中使用
logger.log("INFO", "處理對話請求",
          user_id=request.user_id,
          chat_id=chat_id,
          processing_mode=processing_mode.value)

效能監控

import time
from functools import wraps

def monitor_performance(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            duration = time.time() - start_time

            logger.log("INFO", "函數執行完成",
                      function=func.__name__,
                      duration_ms=round(duration * 1000, 2),
                      status="success")
            return result

        except Exception as e:
            duration = time.time() - start_time
            logger.log("ERROR", "函數執行失敗",
                      function=func.__name__,
                      duration_ms=round(duration * 1000, 2),
                      error=str(e),
                      status="error")
            raise

    return wrapper

# 在重要函數上使用
@monitor_performance
async def process_chat(self, request: ChatRequest) -> ChatResponse:
    # ... 原有邏輯

8) 下一步擴展

這個基礎版本已經能處理基本對話,接下來可以:

  1. 加入 WebSocket 支援:即時推播完整回應
  2. 實作 RAG 服務:文件檢索與向量搜尋
  3. 加入 API Gateway:統一認證與限流
  4. Worker Service:處理 Pub/Sub 非同步任務
  5. 監控系統:Grafana + Prometheus 儀表板

下一篇我們來實作 RAG 檢索服務Worker 非同步處理。


上一篇
用 GCP 觀念重畫系統藍圖
系列文
來都來了,那就做一個GCP從0到100的AI助理24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言