前篇我們畫好了系統藍圖,這篇開始寫程式碼。我們先讓最小可用版本跑起來,再逐步加功能。
目標:30 分鐘內讓一個能對話的 AI 助手在本地跑起來,再花 30 分鐘部署到 GCP。
ai-assistant/
├── services/
│ ├── chat/ # chat-service 主服務
│ │ ├── app/
│ │ │ ├── __init__.py
│ │ │ ├── main.py # FastAPI 應用入口
│ │ │ ├── models.py # 資料模型
│ │ │ ├── handlers.py # 業務邏輯
│ │ │ └── config.py # 配置管理
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ ├── memory/ # mem-service 記憶服務
│ │ ├── app/
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ │ ├── models.py
│ │ │ └── database.py # 資料庫操作
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ └── gateway/ # api-gateway(下一篇詳述)
├── shared/ # 共用程式庫
│ ├── __init__.py
│ ├── auth.py # JWT 驗證
│ ├── events.py # Pub/Sub 事件
│ └── utils.py # 工具函數
├── docker-compose.yml # 本地開發環境
├── cloudbuild.yaml # GCP 部署配置
└── README.md
services/chat/app/models.py
)from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
class MessageRole(str, Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
class ProcessingMode(str, Enum):
SYNC = "sync" # 同步處理(快速回應)
ASYNC = "async" # 非同步處理(複雜任務)
HYBRID = "hybrid" # 混合(先 ACK,後完整回應)
class ChatMessage(BaseModel):
role: MessageRole
content: str
timestamp: datetime = Field(default_factory=datetime.now)
metadata: Dict[str, Any] = Field(default_factory=dict)
class ChatRequest(BaseModel):
message: str
chat_id: Optional[str] = None
user_id: str
processing_mode: ProcessingMode = ProcessingMode.HYBRID
context: Dict[str, Any] = Field(default_factory=dict)
class ChatResponse(BaseModel):
message: str
chat_id: str
processing_mode: ProcessingMode
is_complete: bool = False # 是否為最終回應
requires_followup: bool = False # 是否需要後續處理
metadata: Dict[str, Any] = Field(default_factory=dict)
class TaskEvent(BaseModel):
"""Pub/Sub 事件格式"""
task_id: str
chat_id: str
user_id: str
message: str
context: Dict[str, Any]
created_at: datetime = Field(default_factory=datetime.now)
services/chat/app/config.py
)import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# 服務基礎配置
service_name: str = "chat-service"
environment: str = os.getenv("ENVIRONMENT", "development")
debug: bool = os.getenv("DEBUG", "false").lower() == "true"
# Google Cloud 配置
gcp_project_id: str = os.getenv("GCP_PROJECT_ID", "")
vertex_location: str = os.getenv("VERTEX_LOCATION", "asia-east1")
# Pub/Sub 配置
pubsub_topic_chat_tasks: str = os.getenv("PUBSUB_TOPIC_CHAT_TASKS", "chat-tasks")
# 外部服務 URL
memory_service_url: str = os.getenv("MEMORY_SERVICE_URL", "http://memory-service:8080")
rag_service_url: str = os.getenv("RAG_SERVICE_URL", "http://rag-service:8080")
# API 配置
gemini_api_key: str = os.getenv("GEMINI_API_KEY", "")
jwt_secret: str = os.getenv("JWT_SECRET", "your-secret-key")
# 效能配置
response_timeout: int = int(os.getenv("RESPONSE_TIMEOUT", "2")) # 秒
max_context_length: int = int(os.getenv("MAX_CONTEXT_LENGTH", "4000"))
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()
services/chat/app/handlers.py
)import uuid
import asyncio
import httpx
from datetime import datetime
from typing import Optional, Dict, Any, List
from google.cloud import pubsub_v1
import google.generativeai as genai
from .models import ChatRequest, ChatResponse, ChatMessage, ProcessingMode, TaskEvent
from .config import settings
class ChatHandler:
def __init__(self):
# 初始化 Gemini API
genai.configure(api_key=settings.gemini_api_key)
self.model = genai.GenerativeModel('gemini-pro')
# 初始化 Pub/Sub(生產環境)
if settings.environment != "development":
self.publisher = pubsub_v1.PublisherClient()
self.topic_path = self.publisher.topic_path(
settings.gcp_project_id,
settings.pubsub_topic_chat_tasks
)
else:
self.publisher = None
# HTTP 客戶端(用於服務間通訊)
self.http_client = httpx.AsyncClient(timeout=5.0)
async def process_chat(self, request: ChatRequest) -> ChatResponse:
"""主要對話處理邏輯"""
# 1. 生成或使用現有 chat_id
chat_id = request.chat_id or str(uuid.uuid4())
# 2. 載入用戶記憶和上下文
context = await self._load_user_context(request.user_id, chat_id)
# 3. 決定處理模式
processing_mode = self._determine_processing_mode(request.message, context)
# 4. 根據模式處理
if processing_mode == ProcessingMode.SYNC:
return await self._handle_sync(request, chat_id, context)
elif processing_mode == ProcessingMode.ASYNC:
return await self._handle_async(request, chat_id, context)
else: # HYBRID
return await self._handle_hybrid(request, chat_id, context)
def _determine_processing_mode(self, message: str, context: Dict) -> ProcessingMode:
"""智慧判斷處理模式"""
# 簡單規則判斷(生產環境可用 ML 模型優化)
message_lower = message.lower()
# 同步處理條件
sync_keywords = ["你好", "hello", "謝謝", "thank", "再見", "bye"]
if any(keyword in message_lower for keyword in sync_keywords):
return ProcessingMode.SYNC
# 非同步處理條件
async_keywords = ["分析", "研究", "報告", "詳細", "複雜", "計算"]
if any(keyword in message_lower for keyword in async_keywords):
return ProcessingMode.ASYNC
# 預設混合模式
return ProcessingMode.HYBRID
async def _handle_sync(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
"""同步處理:快速回應"""
try:
# 建立簡化的 prompt
prompt = self._build_simple_prompt(request.message, context)
# 呼叫 Gemini API(設定較短的 max_tokens)
response = await asyncio.wait_for(
self._call_gemini(prompt, max_tokens=150),
timeout=settings.response_timeout
)
# 儲存對話到記憶服務
await self._save_conversation(chat_id, request.user_id, request.message, response)
return ChatResponse(
message=response,
chat_id=chat_id,
processing_mode=ProcessingMode.SYNC,
is_complete=True,
requires_followup=False
)
except asyncio.TimeoutError:
# 超時轉為混合模式
return await self._handle_hybrid(request, chat_id, context)
except Exception as e:
return ChatResponse(
message="抱歉,我現在無法回應。請稍後再試。",
chat_id=chat_id,
processing_mode=ProcessingMode.SYNC,
is_complete=True,
requires_followup=False,
metadata={"error": str(e)}
)
async def _handle_async(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
"""非同步處理:複雜任務"""
# 立即發送 ACK
await self._publish_task_event(request, chat_id, context)
return ChatResponse(
message="我收到你的問題,正在仔細思考中。請稍等片刻,我會盡快回覆完整答案。",
chat_id=chat_id,
processing_mode=ProcessingMode.ASYNC,
is_complete=False,
requires_followup=True,
metadata={"estimated_time": "10-30秒"}
)
async def _handle_hybrid(self, request: ChatRequest, chat_id: str, context: Dict) -> ChatResponse:
"""混合處理:快速 ACK + 後續完整回應"""
# 先給一個快速回應
quick_response = self._generate_quick_response(request.message)
# 發送完整處理任務到 Pub/Sub
await self._publish_task_event(request, chat_id, context)
return ChatResponse(
message=quick_response,
chat_id=chat_id,
processing_mode=ProcessingMode.HYBRID,
is_complete=False,
requires_followup=True,
metadata={"status": "initial_response"}
)
def _generate_quick_response(self, message: str) -> str:
"""生成快速回應(無需 AI 推理)"""
quick_responses = {
"分析": "我正在分析相關資料...",
"解釋": "讓我為你詳細解釋...",
"幫助": "我正在查找相關資訊來幫助你...",
"報告": "我正在準備詳細報告...",
}
for keyword, response in quick_responses.items():
if keyword in message:
return response
return "我正在處理你的問題,請稍等片刻..."
async def _load_user_context(self, user_id: str, chat_id: str) -> Dict[str, Any]:
"""從記憶服務載入用戶上下文"""
try:
response = await self.http_client.get(
f"{settings.memory_service_url}/memory/{user_id}",
params={"chat_id": chat_id, "limit": 5}
)
if response.status_code == 200:
return response.json()
else:
return {"recent_messages": [], "user_preferences": {}}
except Exception as e:
print(f"載入用戶上下文失敗: {e}")
return {"recent_messages": [], "user_preferences": {}}
async def _save_conversation(self, chat_id: str, user_id: str, user_message: str, assistant_message: str):
"""儲存對話到記憶服務"""
try:
payload = {
"chat_id": chat_id,
"user_id": user_id,
"messages": [
{"role": "user", "content": user_message, "timestamp": datetime.now().isoformat()},
{"role": "assistant", "content": assistant_message, "timestamp": datetime.now().isoformat()}
]
}
await self.http_client.post(
f"{settings.memory_service_url}/memory/save",
json=payload
)
except Exception as e:
print(f"儲存對話失敗: {e}")
async def _call_gemini(self, prompt: str, max_tokens: int = 1000) -> str:
"""呼叫 Gemini API"""
try:
response = self.model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
max_output_tokens=max_tokens,
temperature=0.7,
)
)
return response.text
except Exception as e:
raise Exception(f"Gemini API 呼叫失敗: {e}")
def _build_simple_prompt(self, message: str, context: Dict) -> str:
"""建立簡化的 prompt(同步處理用)"""
recent_messages = context.get("recent_messages", [])
prompt = "你是一個友善的 AI 助手。請簡潔回應用戶的問題。\n\n"
# 加入最近對話
if recent_messages:
prompt += "最近對話:\n"
for msg in recent_messages[-3:]: # 只取最近 3 則
prompt += f"{msg['role']}: {msg['content']}\n"
prompt += "\n"
prompt += f"用戶問題: {message}\n回答:"
return prompt
async def _publish_task_event(self, request: ChatRequest, chat_id: str, context: Dict):
"""發送任務事件到 Pub/Sub"""
if not self.publisher:
print(f"開發環境:模擬發送任務事件 - {request.message}")
return
try:
event = TaskEvent(
task_id=str(uuid.uuid4()),
chat_id=chat_id,
user_id=request.user_id,
message=request.message,
context=context
)
message_data = event.model_dump_json().encode('utf-8')
future = self.publisher.publish(self.topic_path, message_data)
print(f"任務事件已發送: {future.result()}")
except Exception as e:
print(f"發送任務事件失敗: {e}")
services/chat/app/main.py
)from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import uvicorn
from .models import ChatRequest, ChatResponse
from .handlers import ChatHandler
from .config import settings
# 應用生命週期管理
@asynccontextmanager
async def lifespan(app: FastAPI):
# 啟動時初始化
app.state.chat_handler = ChatHandler()
print(f"🚀 {settings.service_name} 服務啟動完成")
yield
# 關閉時清理
if hasattr(app.state.chat_handler, 'http_client'):
await app.state.chat_handler.http_client.aclose()
print(f"🛑 {settings.service_name} 服務關閉完成")
# 建立 FastAPI 應用
app = FastAPI(
title="AI Chat Service",
description="智慧對話服務 - 支援同步/非同步處理",
version="1.0.0",
lifespan=lifespan
)
# CORS 設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"] if settings.debug else ["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 依賴注入
def get_chat_handler() -> ChatHandler:
return app.state.chat_handler
@app.get("/health")
async def health_check():
"""健康檢查端點"""
return {
"status": "healthy",
"service": settings.service_name,
"environment": settings.environment,
"timestamp": "2024-01-01T00:00:00Z"
}
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
handler: ChatHandler = Depends(get_chat_handler)
):
"""主要對話端點"""
try:
response = await handler.process_chat(request)
return response
except Exception as e:
print(f"對話處理錯誤: {e}")
raise HTTPException(
status_code=500,
detail="服務暫時無法使用,請稍後再試"
)
@app.get("/chat/{chat_id}/status")
async def get_chat_status(chat_id: str):
"""查詢對話狀態"""
# TODO: 實作對話狀態查詢
return {
"chat_id": chat_id,
"status": "active",
"last_activity": "2024-01-01T00:00:00Z"
}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8080,
reload=settings.debug,
log_level="info"
)
services/memory/app/models.py
)from sqlalchemy import Column, String, Text, DateTime, Integer, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from datetime import datetime
Base = declarative_base()
class ConversationHistory(Base):
__tablename__ = "conversation_history"
id = Column(Integer, primary_key=True, autoincrement=True)
chat_id = Column(String(36), nullable=False, index=True)
user_id = Column(String(36), nullable=False, index=True)
role = Column(String(20), nullable=False) # user, assistant, system
content = Column(Text, nullable=False)
metadata = Column(JSON, default={})
created_at = Column(DateTime(timezone=True), server_default=func.now())
class UserMemory(Base):
__tablename__ = "user_memory"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String(36), nullable=False, unique=True, index=True)
short_term_summary = Column(Text) # 最近對話摘要
long_term_memory = Column(JSON, default={}) # 用戶偏好、重要資訊
preferences = Column(JSON, default={}) # 個人化設定
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), server_default=func.now())
# Pydantic 模型(API 交換格式)
class MessageInput(BaseModel):
role: str
content: str
timestamp: Optional[datetime] = None
metadata: Dict[str, Any] = {}
class SaveConversationRequest(BaseModel):
chat_id: str
user_id: str
messages: List[MessageInput]
class MemoryResponse(BaseModel):
recent_messages: List[Dict[str, Any]]
user_preferences: Dict[str, Any]
summary: Optional[str] = None
services/memory/app/database.py
)import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from typing import List, Dict, Any, Optional
from .models import Base, ConversationHistory, UserMemory
class MemoryDatabase:
def __init__(self):
# 資料庫連線設定
database_url = os.getenv("DATABASE_URL", "sqlite:///./memory.db")
if database_url.startswith("sqlite"):
# 開發環境用 SQLite
self.engine = create_engine(database_url, echo=False)
self.SessionLocal = sessionmaker(bind=self.engine)
else:
# 生產環境用 PostgreSQL
self.engine = create_engine(database_url, echo=False, pool_pre_ping=True)
self.SessionLocal = sessionmaker(bind=self.engine)
# 建立資料表
Base.metadata.create_all(bind=self.engine)
def get_session(self) -> Session:
return self.SessionLocal()
async def save_conversation(self, chat_id: str, user_id: str, messages: List[Dict[str, Any]]):
"""儲存對話記錄"""
with self.get_session() as session:
try:
for msg in messages:
conversation = ConversationHistory(
chat_id=chat_id,
user_id=user_id,
role=msg["role"],
content=msg["content"],
metadata=msg.get("metadata", {})
)
session.add(conversation)
session.commit()
# 更新用戶記憶摘要
await self._update_user_memory(session, user_id, messages)
except Exception as e:
session.rollback()
raise e
async def get_user_context(self, user_id: str, chat_id: Optional[str] = None, limit: int = 10) -> Dict[str, Any]:
"""取得用戶上下文"""
with self.get_session() as session:
try:
# 取得最近對話
query = session.query(ConversationHistory).filter(
ConversationHistory.user_id == user_id
)
if chat_id:
query = query.filter(ConversationHistory.chat_id == chat_id)
recent_messages = query.order_by(
ConversationHistory.created_at.desc()
).limit(limit).all()
# 取得用戶記憶
user_memory = session.query(UserMemory).filter(
UserMemory.user_id == user_id
).first()
return {
"recent_messages": [
{
"role": msg.role,
"content": msg.content,
"timestamp": msg.created_at.isoformat(),
"metadata": msg.metadata
} for msg in reversed(recent_messages)
],
"user_preferences": user_memory.preferences if user_memory else {},
"summary": user_memory.short_term_summary if user_memory else None
}
except Exception as e:
print(f"取得用戶上下文失敗: {e}")
return {"recent_messages": [], "user_preferences": {}, "summary": None}
async def _update_user_memory(self, session: Session, user_id: str, new_messages: List[Dict[str, Any]]):
"""更新用戶記憶摘要"""
try:
user_memory = session.query(UserMemory).filter(
UserMemory.user_id == user_id
).first()
if not user_memory:
user_memory = UserMemory(
user_id=user_id,
short_term_summary="",
long_term_memory={},
preferences={}
)
session.add(user_memory)
# 簡單的摘要更新(生產環境應該用 LLM 生成更好的摘要)
latest_content = " ".join([msg["content"] for msg in new_messages[-3:]])
user_memory.short_term_summary = f"最近討論: {latest_content[:200]}..."
session.commit()
except Exception as e:
print(f"更新用戶記憶失敗: {e}")
services/memory/app/main.py
)from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from .models import SaveConversationRequest, MemoryResponse
from .database import MemoryDatabase
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化資料庫
app.state.db = MemoryDatabase()
print("🧠 Memory Service 啟動完成")
yield
print("🧠 Memory Service 關閉完成")
app = FastAPI(
title="Memory Service",
description="用戶記憶與對話歷史管理",
version="1.0.0",
lifespan=lifespan
)
def get_database() -> MemoryDatabase:
return app.state.db
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "memory-service"}
@app.post("/memory/save")
async def save_conversation(
request: SaveConversationRequest,
db: MemoryDatabase = Depends(get_database)
):
"""儲存對話記錄"""
try:
await db.save_conversation(
request.chat_id,
request.user_id,
[msg.model_dump() for msg in request.messages]
)
return {"status": "success"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memory/{user_id}", response_model=MemoryResponse)
async def get_user_memory(
user_id: str,
chat_id: str = None,
limit: int = 10,
db: MemoryDatabase = Depends(get_database)
):
"""取得用戶記憶"""
try:
context = await db.get_user_context(user_id, chat_id, limit)
return MemoryResponse(**context)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
docker-compose.yml
)version: '3.8'
services:
chat-service:
build:
context: .
dockerfile: services/chat/Dockerfile
ports:
- "8080:8080"
environment:
- ENVIRONMENT=development
- DEBUG=true
- MEMORY_SERVICE_URL=http://memory-service:8081
- GEMINI_API_KEY=${GEMINI_API_KEY}
- JWT_SECRET=dev-secret-key
depends_on:
- memory-service
volumes:
- ./services/chat:/app
- ./shared:/app/shared
memory-service:
build:
context: .
dockerfile: services/memory/Dockerfile
ports:
- "8081:8080"
environment:
- DATABASE_URL=sqlite:///./data/memory.db
volumes:
- ./services/memory:/app
- ./data:/app/data
# 開發用資料庫(可選)
postgres:
image: postgres:15
environment:
- POSTGRES_DB=ai_assistant
- POSTGRES_USER=dev
- POSTGRES_PASSWORD=devpassword
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
services/chat/Dockerfile
)FROM python:3.11-slim
WORKDIR /app
# 安裝系統依賴
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 複製需求檔案
COPY services/chat/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 複製應用程式
COPY services/chat/app ./app
COPY shared ./shared
# 設定環境變數
ENV PYTHONPATH=/app
ENV PORT=8080
# 暴露連接埠
EXPOSE 8080
# 啟動命令
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
services/chat/requirements.txt
)fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
httpx==0.25.2
google-generativeai==0.3.2
google-cloud-pubsub==2.18.4
python-multipart==0.0.6
python-jose[cryptography]==3.3.0
# 1. 設定環境變數
export GEMINI_API_KEY="your-gemini-api-key"
# 2. 啟動服務
docker-compose up --build
# 3. 測試健康檢查
curl http://localhost:8080/health
curl http://localhost:8081/health
# 同步對話測試
curl -X POST "http://localhost:8080/chat" \
-H "Content-Type: application/json" \
-d '{
"message": "你好",
"user_id": "test-user-123",
"processing_mode": "sync"
}'
# 混合模式測試
curl -X POST "http://localhost:8080/chat" \
-H "Content-Type: application/json" \
-d '{
"message": "幫我分析一下人工智慧的發展趨勢",
"user_id": "test-user-123",
"processing_mode": "hybrid"
}'
import requests
import json
def test_chat_service():
url = "http://localhost:8080/chat"
# 測試資料
test_cases = [
{
"message": "你好",
"user_id": "test-user-123",
"processing_mode": "sync"
},
{
"message": "幫我寫一份關於 AI 的詳細報告",
"user_id": "test-user-123",
"processing_mode": "async"
}
]
for i, test_data in enumerate(test_cases):
print(f"\n測試案例 {i+1}: {test_data['message']}")
response = requests.post(url, json=test_data)
if response.status_code == 200:
result = response.json()
print(f"✅ 回應: {result['message']}")
print(f"📋 模式: {result['processing_mode']}")
print(f"🔄 需要後續: {result['requires_followup']}")
else:
print(f"❌ 錯誤: {response.status_code} - {response.text}")
if __name__ == "__main__":
test_chat_service()
cloudbuild.yaml
)steps:
# 建置 chat-service
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-t', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA',
'-f', 'services/chat/Dockerfile',
'.'
]
# 建置 memory-service
- name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-t', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA',
'-f', 'services/memory/Dockerfile',
'.'
]
# 推送映像
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA']
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA']
# 部署 memory-service
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args:
- 'run'
- 'deploy'
- 'memory-service'
- '--image=asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/memory-service:$SHORT_SHA'
- '--region=asia-east1'
- '--service-account=memory-sa@$PROJECT_ID.iam.gserviceaccount.com'
- '--set-secrets=DATABASE_URL=database-url:latest'
- '--min-instances=1'
- '--max-instances=5'
- '--cpu=1'
- '--memory=512Mi'
# 部署 chat-service
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args:
- 'run'
- 'deploy'
- 'chat-service'
- '--image=asia-east1-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service:$SHORT_SHA'
- '--region=asia-east1'
- '--service-account=chat-sa@$PROJECT_ID.iam.gserviceaccount.com'
- '--set-secrets=GEMINI_API_KEY=gemini-api-key:latest,JWT_SECRET=jwt-secret:latest'
- '--set-env-vars=MEMORY_SERVICE_URL=https://memory-service-xxx.run.app,ENVIRONMENT=production'
- '--min-instances=1'
- '--max-instances=10'
- '--cpu=2'
- '--memory=1Gi'
options:
logging: CLOUD_LOGGING_ONLY
deploy.sh
)#!/bin/bash
# 設定變數
PROJECT_ID="your-gcp-project"
REGION="asia-east1"
echo "🚀 開始部署 AI Assistant 服務..."
# 1. 設定 GCP 專案
gcloud config set project $PROJECT_ID
# 2. 啟用必要的 API
gcloud services enable run.googleapis.com
gcloud services enable cloudbuild.googleapis.com
gcloud services enable secretmanager.googleapis.com
gcloud services enable pubsub.googleapis.com
# 3. 建立 Artifact Registry
gcloud artifacts repositories create ai-assistant \
--repository-format=docker \
--location=$REGION
# 4. 建立 Service Accounts
gcloud iam service-accounts create chat-sa --display-name="Chat Service Account"
gcloud iam service-accounts create memory-sa --display-name="Memory Service Account"
# 5. 建立 Secrets
echo -n "your-gemini-api-key" | gcloud secrets create gemini-api-key --data-file=-
echo -n "your-jwt-secret" | gcloud secrets create jwt-secret --data-file=-
echo -n "postgresql://user:pass@host:5432/db" | gcloud secrets create database-url --data-file=-
# 6. 設定 IAM 權限
gcloud secrets add-iam-policy-binding gemini-api-key \
--member="serviceAccount:chat-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
gcloud secrets add-iam-policy-binding database-url \
--member="serviceAccount:memory-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/secretmanager.secretAccessor"
# 7. 建立 Pub/Sub Topic
gcloud pubsub topics create chat-tasks
# 8. 執行 Cloud Build
gcloud builds submit --config cloudbuild.yaml
echo "✅ 部署完成!"
echo "🔗 Chat Service: https://chat-service-xxx.run.app"
echo "🔗 Memory Service: https://memory-service-xxx.run.app"
import json
import logging
from datetime import datetime
class StructuredLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
def log(self, level: str, message: str, **kwargs):
log_entry = {
"timestamp": datetime.now().isoformat(),
"service": self.service_name,
"level": level,
"message": message,
**kwargs
}
if level.upper() == "ERROR":
self.logger.error(json.dumps(log_entry))
elif level.upper() == "WARNING":
self.logger.warning(json.dumps(log_entry))
else:
self.logger.info(json.dumps(log_entry))
# 使用範例
logger = StructuredLogger("chat-service")
# 在 handlers.py 中使用
logger.log("INFO", "處理對話請求",
user_id=request.user_id,
chat_id=chat_id,
processing_mode=processing_mode.value)
import time
from functools import wraps
def monitor_performance(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.log("INFO", "函數執行完成",
function=func.__name__,
duration_ms=round(duration * 1000, 2),
status="success")
return result
except Exception as e:
duration = time.time() - start_time
logger.log("ERROR", "函數執行失敗",
function=func.__name__,
duration_ms=round(duration * 1000, 2),
error=str(e),
status="error")
raise
return wrapper
# 在重要函數上使用
@monitor_performance
async def process_chat(self, request: ChatRequest) -> ChatResponse:
# ... 原有邏輯
這個基礎版本已經能處理基本對話,接下來可以:
下一篇我們來實作 RAG 檢索服務 和 Worker 非同步處理。