前面我們建立了基礎的對話系統。這篇要用 GCP 既有的 AI 服務 打造 RAG 檢索系統
目標:利用 GCP 託管服務,用最少的程式碼實現 RAG 功能。
flowchart TB
subgraph "用戶層"
USER[用戶查詢] --> CHAT[chat-service]
end
subgraph "GCP 託管 AI 服務"
AGENT[Vertex AI Agent Builder<br/>統一 RAG 入口]
SEARCH[Vertex AI Search<br/>企業搜尋引擎]
CONV[Vertex AI Conversation<br/>對話管理]
DOC_AI[Document AI<br/>文件解析]
DISCOVERY[Discovery Engine<br/>知識發現]
end
subgraph "資料存儲"
GCS[Cloud Storage<br/>原始文件]
BQ[BigQuery<br/>結構化資料 + 向量]
FIRESTORE[Firestore<br/>元資料]
end
subgraph "處理流程"
WORKFLOW[Workflows<br/>文件處理流程]
FUNC[Cloud Functions<br/>觸發器]
end
USER --> AGENT
AGENT --> SEARCH
AGENT --> CONV
CHAT --> AGENT
GCS --> DOC_AI
DOC_AI --> SEARCH
SEARCH --> BQ
FUNC --> WORKFLOW
WORKFLOW --> DOC_AI
WORKFLOW --> SEARCH
層級 | GCP 服務 | 我們的角色 |
---|---|---|
文件解析 | Document AI | 配置 + 觸發 |
向量檢索 | Vertex AI Search | 配置 + 查詢 |
對話管理 | Agent Builder | 配置 + 整合 |
工作流程 | Workflows | YAML 配置 |
總計 | 80% 託管服務 | 20% 膠水程式碼 |
Vertex AI Agent Builder 是 Google 的企業級對話 AI 平台,內建:
#!/bin/bash
# scripts/setup-agent-builder.sh
PROJECT_ID="your-project-id"
LOCATION="global" # Agent Builder 目前只支援 global
APP_ID="ai-assistant-rag"
echo "🤖 建立 Vertex AI Agent Builder 應用..."
# 1. 啟用 API
gcloud services enable discoveryengine.googleapis.com
gcloud services enable aiplatform.googleapis.com
# 2. 建立 Search App(搜尋應用)
gcloud alpha discovery-engine search-applications create $APP_ID \
--location=$LOCATION \
--display-name="AI Assistant RAG" \
--solution-type=SOLUTION_TYPE_SEARCH \
--industry-vertical=GENERIC
# 3. 建立 Data Store(資料存儲)
gcloud alpha discovery-engine data-stores create "${APP_ID}-datastore" \
--location=$LOCATION \
--display-name="AI Assistant Documents" \
--content-config=CONTENT_REQUIRED \
--solution-type=SOLUTION_TYPE_SEARCH \
--industry-vertical=GENERIC
# 4. 建立 Conversation App(對話應用)
gcloud alpha discovery-engine chat-applications create "${APP_ID}-chat" \
--location=$LOCATION \
--display-name="AI Assistant Chat" \
--search-application="projects/$PROJECT_ID/locations/$LOCATION/searchApplications/$APP_ID"
echo "✅ Agent Builder 應用建立完成"
echo "📝 應用 ID: $APP_ID"
echo "🔗 管理控制台: https://console.cloud.google.com/ai/discovery-engine"
# config/agent-builder-config.yaml
# 對話應用設定
displayName: "AI Assistant RAG"
description: "智慧文件問答助手"
# 搜尋配置
searchConfig:
searchTier: SEARCH_TIER_ENTERPRISE # 企業級搜尋
extractiveContent: true # 提取式內容
maxExtractiveAnswerCount: 3 # 最多3個答案片段
maxExtractiveSegmentCount: 5 # 最多5個內容段落
# 對話配置
conversationConfig:
agentCreation:
business: "企業知識助手"
description: "我是專業的企業知識助手,可以幫您查詢文件、政策、流程等資訊"
# 自訂指令
agentInstructions: |
你是一個專業的企業知識助手。請遵循以下指導原則:
1. 基於檢索到的文件內容回答問題
2. 如果資訊不足,請誠實說明
3. 總是提供資訊來源
4. 用友善、專業的語調回應
5. 支援中文和英文查詢
# 安全與過濾
safetySettings:
- category: HARM_CATEGORY_HATE_SPEECH
threshold: BLOCK_MEDIUM_AND_ABOVE
- category: HARM_CATEGORY_DANGEROUS_CONTENT
threshold: BLOCK_MEDIUM_AND_ABOVE
# workflows/document-processing.yaml
main:
params: [event]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location: "us-central1"
- processor_id: "your-document-ai-processor-id"
- bucket_name: ${event.bucket}
- file_name: ${event.name}
- check_file_type:
switch:
- condition: ${text.match_regex(file_name, ".*\\.(pdf|docx|txt)$")}
next: process_document
next: end
- process_document:
call: process_with_document_ai
args:
project_id: ${project_id}
location: ${location}
processor_id: ${processor_id}
gcs_uri: ${"gs://" + bucket_name + "/" + file_name}
result: processed_content
- extract_metadata:
assign:
- document_metadata:
filename: ${file_name}
processing_time: ${time.format(sys.now())}
content_length: ${len(processed_content.text)}
page_count: ${len(processed_content.pages)}
- index_in_discovery_engine:
call: http.post
args:
url: ${"https://discoveryengine.googleapis.com/v1/projects/" + project_id + "/locations/global/dataStores/ai-assistant-datastore/documents:import"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
documents:
- id: ${text.replace_all(file_name, "[^a-zA-Z0-9]", "_")}
struct_data:
title: ${file_name}
content: ${processed_content.text}
metadata: ${document_metadata}
result: import_result
- log_completion:
call: sys.log
args:
text: ${"Document processed successfully: " + file_name}
severity: INFO
# Document AI 處理子流程
process_with_document_ai:
params: [project_id, location, processor_id, gcs_uri]
steps:
- call_document_ai:
call: http.post
args:
url: ${"https://documentai.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/processors/" + processor_id + ":process"}
auth:
type: OAuth2
headers:
Content-Type: "application/json"
body:
raw_document:
gcs_document:
gcs_uri: ${gcs_uri}
mime_type: "application/pdf"
result: api_response
- extract_text:
return:
text: ${api_response.body.document.text}
pages: ${api_response.body.document.pages}
#!/bin/bash
# scripts/setup-document-triggers.sh
echo "🔧 設定文件處理觸發器..."
# 1. 部署 Workflow
gcloud workflows deploy document-processing \
--source=workflows/document-processing.yaml \
--location=asia-east1
# 2. 建立 Cloud Function 觸發器
cat > /tmp/trigger-function.py << 'EOF'
import functions_framework
from google.cloud import workflows_v1
import json
@functions_framework.cloud_event
def trigger_document_processing(cloud_event):
# 當檔案上傳到 GCS 時觸發
data = cloud_event.data
# 只處理特定檔案類型
if not data['name'].endswith(('.pdf', '.docx', '.txt')):
return
# 觸發 Workflow
client = workflows_v1.ExecutionsClient()
execution = {
'argument': json.dumps({
'bucket': data['bucket'],
'name': data['name'],
'time_created': data['timeCreated']
})
}
parent = f"projects/{PROJECT_ID}/locations/asia-east1/workflows/document-processing"
response = client.create_execution(
parent=parent,
execution=execution
)
print(f"Started workflow execution: {response.name}")
EOF
# 3. 部署 Cloud Function
gcloud functions deploy trigger-document-processing \
--runtime=python310 \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=your-documents-bucket" \
--source=/tmp \
--entry-point=trigger_document_processing \
--region=asia-east1
echo "✅ 觸發器設定完成"
# services/rag/app/agent_builder_client.py
from google.cloud import discoveryengine_v1 as discoveryengine
from typing import Dict, Any, List, Optional
import logging
logger = logging.getLogger(__name__)
class AgentBuilderClient:
"""Vertex AI Agent Builder 客戶端"""
def __init__(self, project_id: str, location: str = "global"):
self.project_id = project_id
self.location = location
# 初始化客戶端
self.search_client = discoveryengine.SearchServiceClient()
self.conversation_client = discoveryengine.ConversationalSearchServiceClient()
# 設定路徑
self.search_app_path = f"projects/{project_id}/locations/{location}/searchApplications/ai-assistant-rag"
self.conversation_app_path = f"projects/{project_id}/locations/{location}/conversationApps/ai-assistant-rag-chat"
async def search_documents(
self,
query: str,
page_size: int = 10,
filters: Optional[str] = None
) -> Dict[str, Any]:
"""搜尋文件"""
try:
# 建立搜尋請求
request = discoveryengine.SearchRequest(
serving_config=f"{self.search_app_path}/servingConfigs/default_config",
query=query,
page_size=page_size,
query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO
),
spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
)
)
# 加入過濾器
if filters:
request.filter = filters
# 執行搜尋
response = self.search_client.search(request)
# 處理結果
results = []
for result in response.results:
document = result.document
results.append({
"id": document.id,
"title": document.struct_data.get("title", ""),
"content": self._extract_content(document),
"metadata": dict(document.struct_data.get("metadata", {})),
"score": getattr(result, 'relevance_score', 0.0)
})
return {
"query": query,
"results": results,
"total_results": len(results),
"search_mode": "agent_builder"
}
except Exception as e:
logger.error(f"Agent Builder 搜尋失敗: {e}")
raise
async def conversational_search(
self,
query: str,
conversation_id: Optional[str] = None,
user_pseudo_id: Optional[str] = None
) -> Dict[str, Any]:
"""對話式搜尋"""
try:
# 建立對話請求
request = discoveryengine.ConverseConversationRequest(
name=f"{self.conversation_app_path}/conversations/{conversation_id or '-'}",
query=discoveryengine.TextInput(input=query),
serving_config=f"{self.conversation_app_path}/servingConfigs/default_config"
)
if user_pseudo_id:
request.user_pseudo_id = user_pseudo_id
# 執行對話搜尋
response = self.conversation_client.converse_conversation(request)
# 處理回應
return {
"conversation_id": response.conversation.name.split('/')[-1],
"reply": response.reply.summary.summary_text,
"search_results": [
{
"title": result.title,
"content": result.document.struct_data.get("content", ""),
"uri": result.uri
}
for result in response.search_results
],
"conversation_state": response.conversation.state.name
}
except Exception as e:
logger.error(f"對話式搜尋失敗: {e}")
raise
def _extract_content(self, document) -> str:
"""提取文件內容"""
if hasattr(document, 'struct_data') and 'content' in document.struct_data:
return document.struct_data['content']
elif hasattr(document, 'json_data'):
return document.json_data
else:
return str(document)
class DocumentUploadService:
"""文件上傳服務"""
def __init__(self, project_id: str, bucket_name: str):
self.project_id = project_id
self.bucket_name = bucket_name
from google.cloud import storage
self.storage_client = storage.Client()
self.bucket = self.storage_client.bucket(bucket_name)
async def create_upload_url(self, filename: str, user_id: str) -> Dict[str, str]:
"""建立預簽名上傳 URL"""
try:
# 生成唯一檔名
import uuid
unique_filename = f"{user_id}/{uuid.uuid4()}_{filename}"
# 建立 blob
blob = self.bucket.blob(unique_filename)
# 生成預簽名 URL
upload_url = blob.generate_signed_url(
version="v4",
expiration=3600, # 1小時
method="PUT",
content_type="application/octet-stream"
)
return {
"upload_url": upload_url,
"file_path": unique_filename,
"expires_in": 3600
}
except Exception as e:
logger.error(f"建立上傳 URL 失敗: {e}")
raise
async def list_documents(self, user_id: str) -> List[Dict[str, Any]]:
"""列出用戶文件"""
try:
blobs = self.bucket.list_blobs(prefix=f"{user_id}/")
documents = []
for blob in blobs:
documents.append({
"name": blob.name.split('/')[-1],
"size": blob.size,
"uploaded_at": blob.time_created.isoformat(),
"content_type": blob.content_type
})
return documents
except Exception as e:
logger.error(f"列出文件失敗: {e}")
raise
# services/rag/app/main.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging
from .agent_builder_client import AgentBuilderClient, DocumentUploadService
from .models import SearchRequest, SearchResponse
from .config import settings
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 初始化服務
app.state.agent_builder = AgentBuilderClient(
project_id=settings.gcp_project_id,
location="global"
)
app.state.upload_service = DocumentUploadService(
project_id=settings.gcp_project_id,
bucket_name=settings.documents_bucket
)
logger.info("✅ RAG Service (Agent Builder) 啟動完成")
yield
logger.info("🛑 RAG Service 關閉完成")
app = FastAPI(
title="RAG Service (Agent Builder)",
description="基於 Vertex AI Agent Builder 的智慧檢索服務",
version="2.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 依賴注入
def get_agent_builder() -> AgentBuilderClient:
return app.state.agent_builder
def get_upload_service() -> DocumentUploadService:
return app.state.upload_service
@app.get("/health")
async def health_check():
"""健康檢查"""
return {
"status": "healthy",
"service": "rag-service-agent-builder",
"version": "2.0.0"
}
@app.post("/search", response_model=SearchResponse)
async def search_documents(
request: SearchRequest,
agent_builder: AgentBuilderClient = Depends(get_agent_builder)
):
"""文件檢索(使用 Agent Builder)"""
try:
# 建立過濾器
filters = None
if request.document_filters:
filter_parts = []
for key, value in request.document_filters.items():
filter_parts.append(f'{key}: "{value}"')
filters = " AND ".join(filter_parts)
# 執行搜尋
result = await agent_builder.search_documents(
query=request.query,
page_size=request.top_k,
filters=filters
)
# 轉換為標準格式
search_results = [
{
"chunk_id": r["id"],
"document_id": r["id"],
"content": r["content"][:1000], # 限制長度
"score": r["score"],
"document_metadata": r["metadata"],
"chunk_metadata": {}
}
for r in result["results"]
]
return SearchResponse(
query=request.query,
results=search_results,
total_results=len(search_results),
search_time_ms=0.0, # Agent Builder 不提供時間資訊
search_mode="agent_builder"
)
except Exception as e:
logger.error(f"搜尋失敗: {e}")
raise HTTPException(status_code=500, detail="搜尋失敗")
@app.post("/chat")
async def conversational_search(
query: str,
conversation_id: str = None,
user_id: str = None,
agent_builder: AgentBuilderClient = Depends(get_agent_builder)
):
"""對話式搜尋"""
try:
result = await agent_builder.conversational_search(
query=query,
conversation_id=conversation_id,
user_pseudo_id=user_id
)
return result
except Exception as e:
logger.error(f"對話式搜尋失敗: {e}")
raise HTTPException(status_code=500, detail="對話式搜尋失敗")
@app.post("/documents/upload")
async def create_upload_url(
filename: str,
user_id: str,
upload_service: DocumentUploadService = Depends(get_upload_service)
):
"""建立文件上傳 URL"""
try:
result = await upload_service.create_upload_url(filename, user_id)
return result
except Exception as e:
logger.error(f"建立上傳 URL 失敗: {e}")
raise HTTPException(status_code=500, detail="建立上傳 URL 失敗")
@app.get("/documents")
async def list_documents(
user_id: str,
upload_service: DocumentUploadService = Depends(get_upload_service)
):
"""列出用戶文件"""
try:
documents = await upload_service.list_documents(user_id)
return {"documents": documents}
except Exception as e:
logger.error(f"列出文件失敗: {e}")
raise HTTPException(status_code=500, detail="列出文件失敗")
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True)
-- BigQuery 向量搜尋設定
-- scripts/setup-bigquery-vectors.sql
-- 建立資料集
CREATE SCHEMA IF NOT EXISTS `ai_assistant.rag_data`;
-- 建立文件表
CREATE OR REPLACE TABLE `ai_assistant.rag_data.documents` (
document_id STRING NOT NULL,
filename STRING NOT NULL,
content TEXT NOT NULL,
embedding ARRAY<FLOAT64>, -- 向量欄位
metadata JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
-- 建立向量搜尋函數
CREATE OR REPLACE FUNCTION `ai_assistant.rag_data.cosine_similarity`(
vec1 ARRAY<FLOAT64>,
vec2 ARRAY<FLOAT64>
) AS (
-- 計算餘弦相似度
(
SELECT
SAFE_DIVIDE(
dot_product,
SQRT(norm1 * norm2)
)
FROM (
SELECT
SUM(v1 * v2) as dot_product,
SUM(v1 * v1) as norm1,
SUM(v2 * v2) as norm2
FROM UNNEST(vec1) AS v1 WITH OFFSET pos1
JOIN UNNEST(vec2) AS v2 WITH OFFSET pos2
ON pos1 = pos2
)
)
);
-- 向量搜尋預存程序
CREATE OR REPLACE PROCEDURE `ai_assistant.rag_data.vector_search`(
query_embedding ARRAY<FLOAT64>,
top_k INT64
)
BEGIN
SELECT
document_id,
filename,
content,
metadata,
`ai_assistant.rag_data.cosine_similarity`(embedding, query_embedding) as similarity_score
FROM `ai_assistant.rag_data.documents`
WHERE embedding IS NOT NULL
ORDER BY similarity_score DESC
LIMIT top_k;
END;
# services/rag/app/bigquery_client.py
from google.cloud import bigquery
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class BigQueryVectorClient:
"""BigQuery 向量檢索客戶端"""
def __init__(self, project_id: str, dataset_id: str = "rag_data"):
self.project_id = project_id
self.dataset_id = dataset_id
self.client = bigquery.Client(project=project_id)
async def search_vectors(
self,
query_embedding: List[float],
top_k: int = 10
) -> List[Dict[str, Any]]:
"""向量檢索"""
try:
# 建立查詢
query = f"""
CALL `{self.project_id}.{self.dataset_id}.vector_search`(
{query_embedding},
{top_k}
);
"""
# 執行查詢
query_job = self.client.query(query)
results = query_job.result()
# 處理結果
search_results = []
for row in results:
search_results.append({
"document_id": row.document_id,
"filename": row.filename,
"content": row.content,
"metadata": dict(row.metadata) if row.metadata else {},
"score": float(row.similarity_score)
})
return search_results
except Exception as e:
logger.error(f"BigQuery 向量檢索失敗: {e}")
raise
async def insert_document(
self,
document_id: str,
filename: str,
content: str,
embedding: List[float],
metadata: Dict[str, Any] = None
):
"""插入文件和向量"""
try:
table_ref = self.client.dataset(self.dataset_id).table("documents")
table = self.client.get_table(table_ref)
rows = [{
"document_id": document_id,
"filename": filename,
"content": content,
"embedding": embedding,
"metadata": metadata or {}
}]
errors = self.client.insert_rows_json(table, rows)
if errors:
raise Exception(f"插入失敗: {errors}")
except Exception as e:
logger.error(f"插入文件失敗: {e}")
raise
# services/rag/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 只需要基本依賴
COPY services/rag/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY services/rag/app ./app
COPY shared ./shared
ENV PYTHONPATH=/app
ENV PORT=8080
EXPOSE 8080
CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
# services/rag/requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
# GCP 服務
google-cloud-discoveryengine==0.11.0
google-cloud-storage==2.10.0
google-cloud-bigquery==3.11.4
# 基本工具
aiofiles==23.2.1
python-multipart==0.0.6
#!/bin/bash
# scripts/deploy-agent-builder-rag.sh
PROJECT_ID="your-project-id"
REGION="asia-east1"
echo "🚀 部署 Agent Builder RAG 服務..."
# 1. 設定 Agent Builder
./scripts/setup-agent-builder.sh
# 2. 設定文件處理流程
./scripts/setup-document-triggers.sh
# 3. 部署 RAG 服務
gcloud run deploy rag-service-v2 \
--image=gcr.io/$PROJECT_ID/rag-service:latest \
--region=$REGION \
--set-env-vars="GCP_PROJECT_ID=$PROJECT_ID,DOCUMENTS_BUCKET=${PROJECT_ID}-documents" \
--service-account=rag-service-sa@$PROJECT_ID.iam.gserviceaccount.com \
--min-instances=1 \
--max-instances=10 \
--memory=1Gi \
--cpu=1 \
--timeout=300s \
--allow-unauthenticated
echo "✅ 部署完成!"
echo "📊 Agent Builder 控制台: https://console.cloud.google.com/ai/discovery-engine"
echo "🔗 RAG API: https://rag-service-v2-xxx.run.app"
項目 | 傳統自建 RAG | Agent Builder RAG |
---|---|---|
開發時間 | 2-4 週 | 2-4 天 |
程式碼量 | 2000+ 行 | < 500 行 |
維護成本 | 高(自己維護向量DB、索引) | 低(GCP 託管) |
擴展性 | 需自己處理 | 自動擴展 |
準確率 | 取決於實作品質 | Google 調優過的模型 |
多語言支援 | 需額外開發 | 內建支援 |
監控除錯 | 需自建 | GCP 控制台 |
組件 | 自建成本 | Agent Builder 成本 |
---|---|---|
運算資源 | Cloud Run: $50 | Cloud Run: $20 |
向量資料庫 | Vector Search: $200 | 包含在 Agent Builder |
文件處理 | Document AI: $100 | 包含在 Agent Builder |
檢索 API | 自建維護: $100/月 | $0.002/查詢 |
總計 | ~$450/月 | ~$150/月(1萬查詢) |
# services/chat/app/handlers.py (Agent Builder 版本)
class ChatHandler:
def __init__(self):
# ... 原有代碼 ...
# Agent Builder 客戶端
from google.cloud import discoveryengine_v1 as discoveryengine
self.conversation_client = discoveryengine.ConversationalSearchServiceClient()
self.conversation_app_path = f"projects/{settings.gcp_project_id}/locations/global/conversationApps/ai-assistant-rag-chat"
async def _agent_builder_response(self, message: str, user_id: str, conversation_id: str = None) -> str:
"""使用 Agent Builder 進行對話"""
try:
# 直接呼叫 Agent Builder 對話 API
request = discoveryengine.ConverseConversationRequest(
name=f"{self.conversation_app_path}/conversations/{conversation_id or '-'}",
query=discoveryengine.TextInput(input=message),
serving_config=f"{self.conversation_app_path}/servingConfigs/default_config",
user_pseudo_id=user_id
)
response = self.conversation_client.converse_conversation(request)
# Agent Builder 已經整合了檢索 + 生成
return response.reply.summary.summary_text
except Exception as e:
logger.error(f"Agent Builder 對話失敗: {e}")
# 降級到一般 Gemini
return await self._call_gemini(message)