iT邦幫忙

2025 iThome 鐵人賽

DAY 28
0
AI & Data

來都來了,那就做一個GCP從0到100的AI助理系列 第 28

Vertex AI Agent Builder 深度整合篇:打造會使用工具的 AI 助手

  • 分享至 

  • xImage
  •  

前面我們建立了基礎對話系統、RAG 檢索和即時推播。但目前的 AI 助手還只能「聊天」和「查資料」。這篇要用 Vertex AI Agent Builder + Gemini Function Calling 讓 AI 真正變聰明:會使用工具、能執行複雜任務、可以自主規劃工作流程。

目標:打造一個會寫代碼、會發郵件、會分析數據、會安排會議的超級 AI 助手

1) 系統架構:AI 智慧大腦升級

flowchart TB
    subgraph "用戶交互層"
        USER[用戶請求]
        FIREBASE[Firebase 即時推播]
    end

    subgraph "AI 雙核心架構"
        GEMINI[Gemini Function Calling<br/>工具調用決策中心]
        DISCOVERY[Discovery Engine<br/>知識檢索引擎]
    end

    subgraph "工具生態系統"
        WORKSPACE[Google Workspace<br/>Gmail, Calendar]
        ANALYSIS[數據分析工具<br/>BigQuery, Sheets]
        CODE[代碼執行環境<br/>安全沙盒]
        EXTERNAL[外部系統<br/>CRM, ERP, APIs]
    end

    subgraph "原有系統整合"
        CHAT[chat-service]
        MEMORY[記憶系統]
    end

    USER --> CHAT
    CHAT --> GEMINI
    GEMINI --> WORKSPACE
    GEMINI --> ANALYSIS
    GEMINI --> CODE
    GEMINI --> EXTERNAL

    GEMINI <--> DISCOVERY
    GEMINI <--> MEMORY
    GEMINI --> FIREBASE

核心設計理念:雙軌協同

服務 職責 技術實現
Gemini Function Calling 工具調用、任務規劃、邏輯推理 Vertex AI GenerativeModel + FunctionDeclaration
Discovery Engine 知識檢索、文件問答 正確的 DataStore 路徑 + ConversationalSearch
工具執行器 實際操作執行 Cloud Run Jobs + 安全沙盒
Firebase 推播 即時進度更新 與前篇完美整合

2) Discovery Engine 正確配置

資料存儲建立

#!/bin/bash
# scripts/setup-discovery-engine.sh

PROJECT_ID="your-project-id"
LOCATION="global"
DATASTORE_ID="ai-assistant-knowledge"

echo "🔍 建立 Discovery Engine 資料存儲..."

# 1. 啟用 API
gcloud services enable discoveryengine.googleapis.com

# 2. 建立資料存儲(使用 Console 或 API,不是 gcloud 指令)
echo "📝 請前往 Discovery Engine Console 建立資料存儲:"
echo "1. 前往: https://console.cloud.google.com/ai/discovery-engine"
echo "2. 建立新的搜尋應用"
echo "3. 選擇「通用」垂直領域"
echo "4. 資料存儲 ID: $DATASTORE_ID"
echo "5. 內容配置: 需要內容"
echo "6. 啟用「企業搜尋服務層級」"

# 3. 設定服務帳號權限
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:chat-service-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/discoveryengine.viewer"

echo "✅ Discovery Engine 設定完成"
echo "📋 記住資料存儲 ID: $DATASTORE_ID"

Discovery Engine 客戶端實作

# services/chat/app/discovery_client.py
from google.cloud import discoveryengine_v1 as discoveryengine
from typing import Dict, Any, List, Optional
import logging

logger = logging.getLogger(__name__)

class DiscoveryEngineClient:
    """正確的 Discovery Engine 客戶端"""

    def __init__(self, project_id: str, location: str = "global", datastore_id: str = "ai-assistant-knowledge"):
        self.project_id = project_id
        self.location = location
        self.datastore_id = datastore_id

        # 正確的 DataStore 路徑格式
        self.datastore_path = f"projects/{project_id}/locations/{location}/collections/default_collection/dataStores/{datastore_id}"

        # 初始化客戶端
        self.search_client = discoveryengine.SearchServiceClient()
        self.conversation_client = discoveryengine.ConversationalSearchServiceClient()

    async def search_documents(self, query: str, page_size: int = 5) -> Dict[str, Any]:
        """企業文件檢索"""
        try:
            # 正確的 serving_config 路徑
            serving_config = f"{self.datastore_path}/servingConfigs/default_config"

            request = discoveryengine.SearchRequest(
                serving_config=serving_config,
                query=query,
                page_size=page_size,
                query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
                    condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO
                ),
                spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
                    mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
                )
            )

            response = self.search_client.search(request)

            results = []
            for result in response.results:
                document = result.document
                results.append({
                    "id": document.id,
                    "title": document.struct_data.get("title", ""),
                    "content": self._extract_content(document),
                    "uri": getattr(document, 'uri', ''),
                    "score": getattr(result, 'relevance_score', 0.0)
                })

            return {
                "results": results,
                "total_results": len(results)
            }

        except Exception as e:
            logger.error(f"Discovery Engine 檢索失敗: {e}")
            return {"results": [], "total_results": 0}

    async def conversational_search(
        self,
        query: str,
        conversation_id: str,
        user_pseudo_id: str
    ) -> Dict[str, Any]:
        """對話式檢索"""
        try:
            # 正確的資源路徑
            conversation_name = f"{self.datastore_path}/conversations/{conversation_id}"
            serving_config = f"{self.datastore_path}/servingConfigs/default_config"

            request = discoveryengine.ConverseConversationRequest(
                name=conversation_name,
                query=discoveryengine.TextInput(input=query),
                serving_config=serving_config,
                user_pseudo_id=user_pseudo_id
            )

            response = self.conversation_client.converse_conversation(request)

            return {
                "conversation_id": response.conversation.name.split('/')[-1],
                "reply": response.reply.summary.summary_text if response.reply.summary else "",
                "search_results": [
                    {
                        "title": result.title,
                        "content": result.document.struct_data.get("content", ""),
                        "uri": result.uri
                    }
                    for result in response.search_results
                ]
            }

        except Exception as e:
            logger.error(f"對話式檢索失敗: {e}")
            return {
                "conversation_id": conversation_id,
                "reply": "抱歉,檢索服務暫時無法使用。",
                "search_results": []
            }

    def _extract_content(self, document) -> str:
        """提取文件內容"""
        if hasattr(document, 'struct_data') and 'content' in document.struct_data:
            return document.struct_data['content']
        elif hasattr(document, 'json_data'):
            return document.json_data
        else:
            return str(document)

3) Gemini Function Calling 核心實作

工具宣告與模型初始化

# services/chat/app/gemini_function_handler.py
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional
from vertexai.generative_models import GenerativeModel, FunctionDeclaration, Tool, Part
import vertexai

from shared.firebase_client import get_firebase_client
from .discovery_client import DiscoveryEngineClient

logger = logging.getLogger(__name__)

class GeminiFunctionHandler:
    """Gemini Function Calling 處理器"""

    def __init__(self, project_id: str, location: str = "asia-east1"):
        self.project_id = project_id
        self.location = location

        # 初始化 Vertex AI
        vertexai.init(project=project_id, location=location)

        # 初始化服務
        self.firebase = get_firebase_client()
        self.discovery = DiscoveryEngineClient(project_id)

        # 定義工具宣告
        self.tools = self._create_tools()

        # 初始化模型
        self.model = GenerativeModel(
            "gemini-1.5-pro",
            tools=self.tools,
            system_instruction=self._get_system_instruction()
        )

    def _create_tools(self) -> List[Tool]:
        """建立 Function Declarations"""

        # 數據分析工具
        analyze_data_fn = FunctionDeclaration(
            name="analyze_data",
            description="分析數據並生成報告",
            parameters={
                "type": "object",
                "properties": {
                    "data_source": {
                        "type": "string",
                        "description": "數據來源(BigQuery 表名、Sheets URL 等)"
                    },
                    "analysis_type": {
                        "type": "string",
                        "enum": ["descriptive", "predictive", "comparative"],
                        "description": "分析類型:描述性、預測性、比較性"
                    },
                    "output_format": {
                        "type": "string",
                        "enum": ["summary", "chart", "table"],
                        "description": "輸出格式:摘要、圖表、表格"
                    }
                },
                "required": ["data_source", "analysis_type"]
            }
        )

        # 郵件發送工具
        send_email_fn = FunctionDeclaration(
            name="send_email",
            description="發送郵件給指定收件人",
            parameters={
                "type": "object",
                "properties": {
                    "recipients": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "收件人郵箱地址列表"
                    },
                    "subject": {
                        "type": "string",
                        "description": "郵件主題"
                    },
                    "body": {
                        "type": "string",
                        "description": "郵件內容"
                    },
                    "body_type": {
                        "type": "string",
                        "enum": ["text", "html"],
                        "description": "郵件內容格式",
                        "default": "html"
                    }
                },
                "required": ["recipients", "subject", "body"]
            }
        )

        # 會議安排工具
        schedule_meeting_fn = FunctionDeclaration(
            name="schedule_meeting",
            description="安排會議並邀請與會者",
            parameters={
                "type": "object",
                "properties": {
                    "title": {
                        "type": "string",
                        "description": "會議標題"
                    },
                    "attendees": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "與會者郵箱地址"
                    },
                    "start_time": {
                        "type": "string",
                        "description": "開始時間(本地時間,格式:YYYY-MM-DDTHH:MM:SS)"
                    },
                    "duration_minutes": {
                        "type": "integer",
                        "description": "會議時長(分鐘)",
                        "default": 60
                    },
                    "description": {
                        "type": "string",
                        "description": "會議描述"
                    },
                    "timezone": {
                        "type": "string",
                        "description": "時區",
                        "default": "Asia/Taipei"
                    }
                },
                "required": ["title", "attendees", "start_time"]
            }
        )

        # 代碼執行工具
        execute_code_fn = FunctionDeclaration(
            name="execute_code",
            description="在安全環境中執行 Python 代碼",
            parameters={
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "要執行的 Python 代碼"
                    },
                    "libraries": {
                        "type": "array",
                        "items": {
                            "type": "string",
                            "enum": ["pandas", "numpy", "matplotlib", "seaborn", "scipy"]
                        },
                        "description": "需要的 Python 庫"
                    },
                    "description": {
                        "type": "string",
                        "description": "代碼功能描述"
                    }
                },
                "required": ["code"]
            }
        )

        # 知識檢索工具
        search_knowledge_fn = FunctionDeclaration(
            name="search_knowledge",
            description="搜索企業知識庫",
            parameters={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "搜索查詢"
                    },
                    "max_results": {
                        "type": "integer",
                        "description": "最大結果數",
                        "default": 5
                    }
                },
                "required": ["query"]
            }
        )

        return [Tool(function_declarations=[
            analyze_data_fn,
            send_email_fn,
            schedule_meeting_fn,
            execute_code_fn,
            search_knowledge_fn
        ])]

    def _get_system_instruction(self) -> str:
        """系統指令"""
        return """
你是一個企業級智慧助手,具備以下核心能力:

## 🧠 智慧分析與決策
- 能夠分析複雜任務並分解為可執行的步驟
- 會根據情況選擇最適合的工具組合
- 具備邏輯推理和問題解決能力

## 🛠️ 工具使用原則
1. 執行任何操作前,先確認用戶需求和權限
2. 選擇最有效率的工具組合完成任務
3. 執行前說明計劃,執行中報告進度
4. 出錯時提供替代方案和解決建議
5. 涉及敏感操作時需要明確確認

## 📊 專業能力
- 數據分析:Excel、CSV、BigQuery 數據處理
- 辦公自動化:郵件發送、會議安排、文件管理
- 程式開發:Python 代碼編寫和執行
- 知識管理:企業文件檢索和問答

## 🔒 安全與合規
- 嚴格遵守數據保護和隱私規定
- 不處理敏感個資或機密信息
- 執行重要操作前需要用戶確認
- 記錄所有操作日誌用於審計

請始終保持專業、準確、有用的回應風格。
        """.strip()

    async def process_with_tools(
        self,
        user_message: str,
        chat_id: str,
        user_id: str,
        conversation_history: List[Dict] = None
    ) -> Dict[str, Any]:
        """處理帶工具調用的對話"""

        try:
            # 更新進度
            await self.firebase.update_chat_progress(
                chat_id, 'gemini_thinking', 10, 'processing', 'AI 正在分析任務...'
            )

            # 建立對話歷史
            conversation = []
            if conversation_history:
                for msg in conversation_history[-5:]:  # 只保留最近 5 輪對話
                    conversation.append({
                        "role": "user" if msg["role"] == "user" else "model",
                        "parts": [msg["content"]]
                    })

            # 添加當前用戶訊息
            conversation.append({
                "role": "user",
                "parts": [user_message]
            })

            # 第一次調用 Gemini
            response = self.model.generate_content(conversation)

            await self.firebase.update_chat_progress(
                chat_id, 'analyzing_request', 20, 'processing', '正在分析是否需要使用工具...'
            )

            # 檢查是否有 function calls
            if self._has_function_calls(response):
                return await self._handle_function_calls(
                    response, conversation, chat_id, user_id
                )
            else:
                # 沒有工具調用,直接回應
                await self.firebase.complete_chat(
                    chat_id,
                    response.text,
                    processing_time_ms=0
                )

                return {
                    "message": response.text,
                    "has_tool_calls": False,
                    "is_complete": True
                }

        except Exception as e:
            logger.error(f"Gemini 處理失敗: {e}")
            await self.firebase.error_chat(chat_id, str(e))
            raise

    def _has_function_calls(self, response) -> bool:
        """檢查回應是否包含 function calls"""
        try:
            if not response.candidates:
                return False

            candidate = response.candidates[0]
            if not candidate.content or not candidate.content.parts:
                return False

            for part in candidate.content.parts:
                if hasattr(part, 'function_call') and part.function_call:
                    return True

            return False
        except:
            return False

    async def _handle_function_calls(
        self,
        response,
        conversation: List[Dict],
        chat_id: str,
        user_id: str
    ) -> Dict[str, Any]:
        """處理 function calls"""

        # 提取所有 function calls
        function_calls = []
        for candidate in response.candidates:
            if not candidate.content or not candidate.content.parts:
                continue

            for part in candidate.content.parts:
                if hasattr(part, 'function_call') and part.function_call:
                    fc = part.function_call
                    function_calls.append({
                        "name": fc.name,
                        "args": dict(fc.args) if fc.args else {}
                    })

        if not function_calls:
            return {
                "message": response.text or "我無法處理這個請求。",
                "has_tool_calls": False,
                "is_complete": True
            }

        await self.firebase.update_chat_progress(
            chat_id, 'executing_tools', 40, 'processing',
            f'正在執行 {len(function_calls)} 個工具...'
        )

        # 執行工具並收集結果
        tool_results = []
        for i, func_call in enumerate(function_calls):
            progress = 40 + int((i / len(function_calls)) * 40)

            await self.firebase.update_chat_progress(
                chat_id, 'executing_tools', progress, 'processing',
                f'正在執行: {func_call["name"]}'
            )

            # 執行工具
            if func_call["name"] == "search_knowledge":
                # 使用 Discovery Engine
                result = await self._handle_knowledge_search(func_call["args"])
            else:
                # 其他工具使用工具執行器
                result = await self._execute_tool(
                    func_call["name"],
                    func_call["args"],
                    {"chat_id": chat_id, "user_id": user_id}
                )

            tool_results.append({
                "name": func_call["name"],
                "result": result
            })

        # 準備 function responses
        function_responses = []
        for tool_result in tool_results:
            function_responses.append(
                Part.from_function_response(
                    name=tool_result["name"],
                    response=tool_result["result"]
                )
            )

        # 添加到對話歷史
        conversation.append({
            "role": "model",
            "parts": [part for part in response.candidates[0].content.parts]
        })

        conversation.append({
            "role": "function",
            "parts": function_responses
        })

        await self.firebase.update_chat_progress(
            chat_id, 'generating_response', 90, 'processing',
            '正在整合結果並生成最終回應...'
        )

        # 生成最終回應
        final_response = self.model.generate_content(conversation)

        await self.firebase.complete_chat(
            chat_id,
            final_response.text,
            processing_time_ms=0
        )

        return {
            "message": final_response.text,
            "has_tool_calls": True,
            "tool_results": tool_results,
            "is_complete": True
        }

    async def _handle_knowledge_search(self, args: Dict[str, Any]) -> Dict[str, Any]:
        """處理知識檢索"""
        try:
            query = args.get("query", "")
            max_results = args.get("max_results", 5)

            results = await self.discovery.search_documents(query, max_results)

            return {
                "success": True,
                "results": results["results"],
                "total_found": results["total_results"]
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    async def _execute_tool(
        self,
        tool_name: str,
        parameters: Dict[str, Any],
        context: Dict[str, Any]
    ) -> Dict[str, Any]:
        """執行工具(調用工具執行器服務)"""
        # 這裡會調用實際的工具執行器
        # 為簡化示例,直接返回模擬結果
        return {
            "success": True,
            "message": f"工具 {tool_name} 執行完成",
            "parameters": parameters
        }

4) Google Workspace 整合(Domain-wide Delegation)

Workspace 工具實作

# shared/workspace_tools.py
import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pytz

from google.oauth2 import service_account
from googleapiclient.discovery import build
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import base64

logger = logging.getLogger(__name__)

class WorkspaceTools:
    """Google Workspace 工具(使用 Domain-wide Delegation)"""

    def __init__(self, service_account_file: str, admin_email: str):
        self.service_account_file = service_account_file
        self.admin_email = admin_email

        # Gmail 和 Calendar 的 OAuth scopes
        self.scopes = [
            'https://www.googleapis.com/auth/gmail.send',
            'https://www.googleapis.com/auth/calendar'
        ]

    def _get_delegated_credentials(self, user_email: str):
        """獲取代理憑證"""
        credentials = service_account.Credentials.from_service_account_file(
            self.service_account_file,
            scopes=self.scopes
        )

        # 代理特定用戶
        delegated_credentials = credentials.with_subject(user_email)
        return delegated_credentials

    async def send_email(
        self,
        sender_email: str,
        recipients: List[str],
        subject: str,
        body: str,
        body_type: str = "html"
    ) -> Dict[str, Any]:
        """發送郵件"""
        try:
            # 獲取代理憑證
            credentials = self._get_delegated_credentials(sender_email)

            # 建立 Gmail 服務
            service = build('gmail', 'v1', credentials=credentials)

            # 建立郵件
            message = MIMEMultipart()
            message['to'] = ', '.join(recipients)
            message['subject'] = subject

            # 添加郵件內容
            msg_body = MIMEText(body, body_type, 'utf-8')
            message.attach(msg_body)

            # 編碼郵件
            raw_message = base64.urlsafe_b64encode(
                message.as_bytes()
            ).decode('utf-8')

            # 發送郵件
            send_result = service.users().messages().send(
                userId='me',
                body={'raw': raw_message}
            ).execute()

            return {
                "success": True,
                "message_id": send_result.get('id'),
                "recipients": recipients,
                "subject": subject
            }

        except Exception as e:
            logger.error(f"發送郵件失敗: {e}")
            return {
                "success": False,
                "error": str(e)
            }

    async def schedule_meeting(
        self,
        organizer_email: str,
        title: str,
        attendees: List[str],
        start_time: str,
        duration_minutes: int = 60,
        timezone: str = "Asia/Taipei",
        description: str = ""
    ) -> Dict[str, Any]:
        """安排會議"""
        try:
            # 獲取代理憑證
            credentials = self._get_delegated_credentials(organizer_email)

            # 建立 Calendar 服務
            service = build('calendar', 'v3', credentials=credentials)

            # 處理時間(統一使用本地時間 + timeZone)
            tz = pytz.timezone(timezone)

            # 解析開始時間
            start_dt = datetime.fromisoformat(start_time)
            if start_dt.tzinfo is None:
                start_dt = tz.localize(start_dt)

            # 計算結束時間
            end_dt = start_dt + timedelta(minutes=duration_minutes)

            # 建立會議事件(使用正確的時區格式)
            event = {
                'summary': title,
                'description': description,
                'start': {
                    'dateTime': start_dt.strftime('%Y-%m-%dT%H:%M:%S'),
                    'timeZone': timezone
                },
                'end': {
                    'dateTime': end_dt.strftime('%Y-%m-%dT%H:%M:%S'),
                    'timeZone': timezone
                },
                'attendees': [
                    {'email': email, 'responseStatus': 'needsAction'}
                    for email in attendees
                ],
                'reminders': {
                    'useDefault': False,
                    'overrides': [
                        {'method': 'email', 'minutes': 24 * 60},
                        {'method': 'popup', 'minutes': 10}
                    ]
                },
                'conferenceData': {
                    'createRequest': {
                        'requestId': f"meet-{int(datetime.now().timestamp())}",
                        'conferenceSolutionKey': {'type': 'hangoutsMeet'}
                    }
                }
            }

            # 建立會議
            created_event = service.events().insert(
                calendarId='primary',
                body=event,
                conferenceDataVersion=1
            ).execute()

            return {
                "success": True,
                "event_id": created_event.get('id'),
                "title": title,
                "start_time": start_dt.isoformat(),
                "end_time": end_dt.isoformat(),
                "attendees": attendees,
                "meet_link": created_event.get('hangoutLink', ''),
                "calendar_link": created_event.get('htmlLink', '')
            }

        except Exception as e:
            logger.error(f"安排會議失敗: {e}")
            return {
                "success": False,
                "error": str(e)
            }

    async def check_calendar_conflicts(
        self,
        user_email: str,
        start_time: str,
        end_time: str,
        timezone: str = "Asia/Taipei"
    ) -> Dict[str, Any]:
        """檢查日曆衝突"""
        try:
            credentials = self._get_delegated_credentials(user_email)
            service = build('calendar', 'v3', credentials=credentials)

            # 查詢忙碌時間
            body = {
                "timeMin": start_time,
                "timeMax": end_time,
                "timeZone": timezone,
                "items": [{"id": user_email}]
            }

            busy_result = service.freebusy().query(body=body).execute()

            busy_times = busy_result.get('calendars', {}).get(user_email, {}).get('busy', [])

            return {
                "success": True,
                "has_conflicts": len(busy_times) > 0,
                "conflicts": busy_times
            }

        except Exception as e:
            logger.error(f"檢查日曆衝突失敗: {e}")
            return {
                "success": False,
                "error": str(e)
            }

Workspace 權限設定指南

#!/bin/bash
# scripts/setup-workspace-integration.sh

PROJECT_ID="your-project-id"
SERVICE_ACCOUNT_NAME="workspace-integration-sa"
SERVICE_ACCOUNT_EMAIL="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com"

echo "🔧 設定 Google Workspace 整合..."

# 1. 建立服務帳號
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \
    --display-name="Workspace Integration Service Account" \
    --description="用於 Gmail 和 Calendar API 的服務帳號"

# 2. 生成金鑰檔案
gcloud iam service-accounts keys create workspace-service-account.json \
    --iam-account=$SERVICE_ACCOUNT_EMAIL

# 3. 啟用必要的 API
gcloud services enable gmail.googleapis.com
gcloud services enable calendar-json.googleapis.com

# 4. 儲存金鑰到 Secret Manager
gcloud secrets create workspace-sa-key --data-file=workspace-service-account.json

echo "✅ 服務帳號建立完成"
echo ""
echo "📋 接下來請在 Google Workspace Admin Console 完成 Domain-wide Delegation 設定:"
echo ""
echo "1. 前往 Admin Console: https://admin.google.com"
echo "2. 進入「安全性」>「API 控制項」>「管理網域級委派」"
echo "3. 點擊「新增」並填入以下資訊:"
echo ""
echo "   客戶端 ID:"
gcloud iam service-accounts describe $SERVICE_ACCOUNT_EMAIL --format='value(uniqueId)'
echo ""
echo "   OAuth 範圍(複製整行):"
echo "   https://www.googleapis.com/auth/gmail.send,https://www.googleapis.com/auth/calendar"
echo ""
echo "4. 儲存設定並等待生效(最多 24 小時)"
echo ""
echo "🔑 服務帳號金鑰已儲存到 Secret Manager: workspace-sa-key"
echo "⚠️  原始檔案 workspace-service-account.json 請妥善保管或刪除"

5) 安全代碼執行環境

容器化代碼執行器

# shared/code_executor.py
import asyncio
import logging
import tempfile
import subprocess
import os
import time
from typing import Dict, Any, List
import docker
import json

logger = logging.getLogger(__name__)

class SecureCodeExecutor:
    """安全的代碼執行器(使用容器隔離)"""

    def __init__(self):
        self.docker_client = docker.from_env()
        self.max_execution_time = 30  # 秒
        self.max_memory = "128m"
        self.allowed_libraries = {
            'pandas', 'numpy', 'matplotlib', 'seaborn',
            'scipy', 'scikit-learn', 'plotly'
        }

    async def execute_code(
        self,
        code: str,
        libraries: List[str] = None,
        description: str = ""
    ) -> Dict[str, Any]:
        """在安全容器中執行代碼"""

        start_time = time.time()

        try:
            # 1. 安全性檢查
            if not self._is_safe_code(code):
                return {
                    "success": False,
                    "error": "代碼包含不安全的操作"
                }

            # 2. 驗證庫依賴
            libraries = libraries or []
            invalid_libs = set(libraries) - self.allowed_libraries
            if invalid_libs:
                return {
                    "success": False,
                    "error": f"不允許的庫: {', '.join(invalid_libs)}"
                }

            # 3. 在容器中執行
            result = await self._execute_in_container(code, libraries)

            execution_time = time.time() - start_time

            return {
                "success": True,
                "output": result["output"],
                "execution_time": round(execution_time, 3),
                "container_stats": result.get("stats", {})
            }

        except Exception as e:
            logger.error(f"代碼執行失敗: {e}")
            return {
                "success": False,
                "error": str(e)
            }

    def _is_safe_code(self, code: str) -> bool:
        """多層安全檢查"""

        # 危險操作黑名單
        dangerous_patterns = [
            # 系統操作
            'import os', 'import sys', 'import subprocess',
            'os.', 'sys.', 'subprocess.',

            # 檔案操作
            'open(', 'file(', 'input(', 'eval(', 'exec(',

            # 網路操作
            'import socket', 'import urllib', 'import requests',
            'socket.', 'urllib.', 'requests.',

            # 危險模組
            'import shutil', 'import glob', 'shutil.', 'glob.',
            '__import__', 'getattr', 'setattr', 'delattr',

            # 系統呼叫
            'system(', 'popen(', 'spawn'
        ]

        code_lower = code.lower()

        # 檢查危險模式
        for pattern in dangerous_patterns:
            if pattern in code_lower:
                logger.warning(f"發現危險模式: {pattern}")
                return False

        # 檢查代碼長度
        if len(code) > 10000:  # 10KB 限制
            logger.warning("代碼過長")
            return False

        return True

    async def _execute_in_container(self, code: str, libraries: List[str]) -> Dict[str, Any]:
        """在 Docker 容器中執行代碼"""

        # 建立執行腳本
        script_content = self._build_execution_script(code, libraries)

        try:
            # 執行容器
            container = self.docker_client.containers.run(
                image="python:3.11-slim",
                command=["python", "-c", script_content],
                mem_limit=self.max_memory,
                network_disabled=True,  # 禁用網路
                read_only=True,         # 唯讀檔案系統
                remove=True,            # 自動清理
                stdout=True,
                stderr=True,
                detach=False,
                timeout=self.max_execution_time
            )

            # 解碼輸出
            output = container.decode('utf-8')

            return {
                "output": output,
                "stats": {"container_used": True}
            }

        except docker.errors.ContainerError as e:
            # 容器執行錯誤
            return {
                "output": f"執行錯誤: {e.stderr.decode('utf-8') if e.stderr else str(e)}",
                "stats": {"error": "container_error"}
            }

        except Exception as e:
            # 其他錯誤
            return {
                "output": f"系統錯誤: {str(e)}",
                "stats": {"error": "system_error"}
            }

    def _build_execution_script(self, code: str, libraries: List[str]) -> str:
        """建立執行腳本"""

        # 導入允許的庫
        imports = []
        for lib in libraries:
            if lib in self.allowed_libraries:
                imports.append(f"import {lib}")

        # 組合完整腳本
        script = f"""
import sys
import io
from contextlib import redirect_stdout, redirect_stderr

# 導入允許的庫
{chr(10).join(imports)}

# 捕獲輸出
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()

try:
    with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
        # 用戶代碼
{chr(10).join('        ' + line for line in code.split(chr(10)))}

    output = stdout_buffer.getvalue()
    errors = stderr_buffer.getvalue()

    if errors:
        print(f"警告: {{errors}}")

    if output:
        print(output)
    else:
        print("代碼執行完成,無輸出內容")

except Exception as e:
    print(f"執行錯誤: {{type(e).__name__}}: {{str(e)}}")
"""

        return script.strip()

6) 主服務整合

更新後的 chat-service 處理器

# services/chat/app/enhanced_chat_handler.py
import asyncio
import uuid
import time
import logging
from typing import Dict, Any

from shared.firebase_client import get_firebase_client
from .gemini_function_handler import GeminiFunctionHandler
from .discovery_client import DiscoveryEngineClient
from .models import ChatRequest, ChatResponse, ProcessingMode

logger = logging.getLogger(__name__)

class EnhancedChatHandler:
    """增強版聊天處理器(整合 Gemini + Discovery Engine)"""

    def __init__(self, project_id: str):
        self.project_id = project_id

        # 初始化服務
        self.firebase = get_firebase_client()
        self.gemini_handler = GeminiFunctionHandler(project_id)
        self.discovery = DiscoveryEngineClient(project_id)

    async def process_enhanced_chat(self, request: ChatRequest) -> ChatResponse:
        """處理增強版對話"""

        chat_id = request.chat_id or str(uuid.uuid4())
        start_time = time.time()

        try:
            # 1. 建立 Firebase 會話
            await self.firebase.create_chat_session(
                chat_id,
                request.user_id,
                request.message
            )

            # 2. 智慧判斷處理模式
            processing_mode = await self._determine_processing_mode(request.message)

            if processing_mode == ProcessingMode.SYNC:
                return await self._handle_sync_enhanced(request, chat_id, start_time)
            else:
                return await self._handle_async_enhanced(request, chat_id, start_time)

        except Exception as e:
            logger.error(f"增強對話處理失敗: {e}")
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _determine_processing_mode(self, message: str) -> ProcessingMode:
        """智慧判斷處理模式"""

        # 檢查是否需要複雜工具調用
        complex_keywords = [
            '分析數據', '發送郵件', '安排會議', '執行代碼',
            '生成報告', '處理文件', '計算', '統計'
        ]

        # 檢查是否為簡單查詢
        simple_keywords = ['什麼是', '如何', '為什麼', '解釋']

        message_lower = message.lower()

        # 複雜任務 -> 非同步處理
        if any(keyword in message for keyword in complex_keywords):
            return ProcessingMode.ASYNC

        # 簡單查詢 -> 同步處理
        if any(keyword in message for keyword in simple_keywords):
            return ProcessingMode.SYNC

        # 預設混合模式
        return ProcessingMode.HYBRID

    async def _handle_sync_enhanced(
        self,
        request: ChatRequest,
        chat_id: str,
        start_time: float
    ) -> ChatResponse:
        """同步處理(簡單查詢)"""

        try:
            # 優先使用 Discovery Engine 進行知識檢索
            search_result = await self.discovery.conversational_search(
                request.message,
                chat_id,
                request.user_id
            )

            if search_result["reply"]:
                # Discovery Engine 有好的回答
                response_text = search_result["reply"]

                # 添加來源資訊
                if search_result["search_results"]:
                    sources = []
                    for result in search_result["search_results"][:2]:
                        sources.append(result["title"])

                    if sources:
                        response_text += f"\n\n📚 參考來源: {', '.join(sources)}"
            else:
                # 降級到 Gemini 回答
                gemini_result = await self.gemini_handler.process_with_tools(
                    request.message,
                    chat_id,
                    request.user_id
                )
                response_text = gemini_result["message"]

            # 完成處理
            processing_time = int((time.time() - start_time) * 1000)
            await self.firebase.complete_chat(
                chat_id,
                response_text,
                processing_time
            )

            return ChatResponse(
                message=response_text,
                chat_id=chat_id,
                processing_mode=ProcessingMode.SYNC,
                is_complete=True,
                requires_followup=False
            )

        except Exception as e:
            logger.error(f"同步處理失敗: {e}")
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _handle_async_enhanced(
        self,
        request: ChatRequest,
        chat_id: str,
        start_time: float
    ) -> ChatResponse:
        """非同步處理(複雜任務)"""

        try:
            # 立即給用戶 ACK
            quick_response = "我正在分析您的需求並準備執行相關工具,請稍等片刻..."

            # 啟動背景處理
            asyncio.create_task(
                self._background_enhanced_processing(request, chat_id)
            )

            return ChatResponse(
                message=quick_response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.ASYNC,
                is_complete=False,
                requires_followup=True,
                metadata={"estimated_time": "30-60秒"}
            )

        except Exception as e:
            logger.error(f"非同步處理失敗: {e}")
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _background_enhanced_processing(
        self,
        request: ChatRequest,
        chat_id: str
    ):
        """背景增強處理"""

        try:
            # 載入對話歷史
            conversation_history = await self._load_conversation_history(
                request.user_id,
                chat_id
            )

            # 使用 Gemini Function Calling 處理
            result = await self.gemini_handler.process_with_tools(
                request.message,
                chat_id,
                request.user_id,
                conversation_history
            )

            logger.info(f"背景處理完成: {chat_id}")

        except Exception as e:
            logger.error(f"背景處理失敗: {e}")
            await self.firebase.error_chat(chat_id, str(e))

    async def _load_conversation_history(
        self,
        user_id: str,
        chat_id: str
    ) -> List[Dict[str, Any]]:
        """載入對話歷史"""
        try:
            # 從 Firebase 載入最近對話
            # 這裡簡化處理,實際可以從 memory-service 載入
            return []
        except Exception as e:
            logger.error(f"載入對話歷史失敗: {e}")
            return []

7) 完整部署配置

生產級部署腳本

# cloudbuild-enhanced.yaml
steps:
  # 建置增強版 chat-service
  - name: 'gcr.io/cloud-builders/docker'
    args: [
      'build',
      '-t', '${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA',
      '-f', 'services/chat/Dockerfile.enhanced',
      '.'
    ]

  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', '${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA']

  # 部署到 Cloud Run(Gen2 + 安全配置)
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    entrypoint: 'gcloud'
    args:
      - 'run'
      - 'deploy'
      - 'chat-service-enhanced'
      - '--image=${_REGION}-docker.pkg.dev/$PROJECT_ID/ai-assistant/chat-service-enhanced:$SHORT_SHA'
      - '--region=${_REGION}'
      - '--platform=managed'
      - '--execution-environment=gen2'
      - '--service-account=chat-service-sa@$PROJECT_ID.iam.gserviceaccount.com'
      - '--set-secrets=WORKSPACE_SA_KEY=workspace-sa-key:latest'
      - '--set-env-vars=GCP_PROJECT_ID=$PROJECT_ID,VERTEX_LOCATION=${_REGION},DISCOVERY_DATASTORE_ID=ai-assistant-knowledge'
      - '--min-instances=2'
      - '--max-instances=20'
      - '--cpu=2'
      - '--memory=4Gi'
      - '--concurrency=100'
      - '--timeout=540s'
      - '--no-allow-unauthenticated'
      - '--ingress=internal-and-cloud-load-balancing'

substitutions:
  _REGION: 'asia-east1'

options:
  logging: CLOUD_LOGGING_ONLY
  machineType: 'E2_HIGHCPU_8'

增強版 Dockerfile

# services/chat/Dockerfile.enhanced
FROM python:3.11-slim

WORKDIR /app

# 安裝系統依賴
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 安裝 Docker(用於代碼執行)
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add - \
    && echo "deb [arch=amd64] https://download.docker.com/linux/debian bullseye stable" | tee /etc/apt/sources.list.d/docker.list \
    && apt-get update \
    && apt-get install -y docker-ce-cli \
    && rm -rf /var/lib/apt/lists/*

# 複製需求檔案
COPY services/chat/requirements-enhanced.txt .
RUN pip install --no-cache-dir -r requirements-enhanced.txt

# 複製應用程式
COPY services/chat/app ./app
COPY shared ./shared

# 設定環境變數
ENV PYTHONPATH=/app
ENV PORT=8080

EXPOSE 8080

CMD ["python", "-m", "uvicorn", "app.main_enhanced:app", "--host", "0.0.0.0", "--port", "8080"]

需求檔案

# services/chat/requirements-enhanced.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0

# Google Cloud 服務
google-cloud-discoveryengine==0.11.0
google-cloud-aiplatform==1.38.0
google-cloud-firestore==2.13.1
google-api-python-client==2.108.0
google-auth==2.23.4

# Vertex AI
google-cloud-aiplatform[langchain]==1.38.0

# 工具執行
docker==6.1.3
pandas==2.1.3
numpy==1.25.2
matplotlib==3.8.1

# 其他依賴
httpx==0.25.2
pytz==2023.3
python-multipart==0.0.6
aiofiles==23.2.1

8) 完整測試與驗證

端到端測試腳本

#!/bin/bash
# scripts/test-enhanced-system.sh

PROJECT_ID="your-project-id"
REGION="asia-east1"
API_BASE="https://chat-service-enhanced-xxx.run.app"

echo "🧪 測試增強版 AI 助手系統..."

# 取得認證 token
TOKEN=$(gcloud auth print-identity-token)

# 測試函數定義
test_scenario() {
    local test_name="$1"
    local user_message="$2"
    local expected_tools="$3"

    echo "🔍 測試場景: $test_name"
    echo "📝 用戶訊息: $user_message"

    local chat_id="test-$(date +%s)-$(shuf -i 1000-9999 -n 1)"

    # 發送請求
    local response=$(curl -s -H "Authorization: Bearer $TOKEN" \
        -H "Content-Type: application/json" \
        -X POST "$API_BASE/enhanced/chat" \
        -d "{
            \"message\": \"$user_message\",
            \"user_id\": \"test-user-enhanced\",
            \"chat_id\": \"$chat_id\"
        }")

    echo "📨 回應: $response"
    echo "🔗 Firebase 監控: https://console.firebase.google.com/project/$PROJECT_ID/firestore/data/~2Fchats~2F$chat_id"
    echo "---"

    # 等待處理
    sleep 5
}

# 執行測試場景
echo "🚀 開始測試..."

# 測試 1: 知識檢索
test_scenario \
    "知識檢索" \
    "什麼是人工智慧?請詳細解釋" \
    "search_knowledge"

# 測試 2: 數據分析
test_scenario \
    "數據分析" \
    "分析銷售數據並生成圖表報告" \
    "analyze_data,execute_code"

# 測試 3: 郵件發送
test_scenario \
    "郵件自動化" \
    "發送項目進度報告給團隊成員" \
    "send_email"

# 測試 4: 會議安排
test_scenario \
    "會議管理" \
    "安排明天下午2點的產品檢討會議" \
    "schedule_meeting"

# 測試 5: 複合任務
test_scenario \
    "複合工作流程" \
    "分析本月業績數據,生成報告並發給管理層,同時安排檢討會議" \
    "analyze_data,execute_code,send_email,schedule_meeting"

# 測試 6: 代碼執行
test_scenario \
    "代碼執行" \
    "用 Python 計算斐波那契數列前20項並繪製圖表" \
    "execute_code"

echo "✅ 所有測試場景已發送"
echo "📊 請檢查 Firebase Console 查看即時處理進度"
echo "📈 請檢查 Cloud Logging 查看詳細執行日誌"

# 檢查系統健康度
echo "🔍 檢查系統健康度..."
curl -s -H "Authorization: Bearer $TOKEN" "$API_BASE/health" | jq '.'

echo "🎉 測試完成!"

監控儀表板配置

{
  "displayName": "AI 助手增強版監控",
  "widgets": [
    {
      "title": "Function Calling 成功率",
      "xyChart": {
        "dataSets": [{
          "timeSeriesQuery": {
            "timeSeriesFilter": {
              "filter": "resource.type=\"cloud_run_revision\" AND resource.labels.service_name=\"chat-service-enhanced\"",
              "aggregation": {
                "alignmentPeriod": "60s",
                "perSeriesAligner": "ALIGN_RATE",
                "crossSeriesReducer": "REDUCE_MEAN"
              }
            }
          }
        }]
      }
    },
    {
      "title": "工具執行延遲",
      "xyChart": {
        "dataSets": [{
          "timeSeriesQuery": {
            "timeSeriesFilter": {
              "filter": "metric.type=\"custom.googleapis.com/ai_assistant/tool_execution_time\"",
              "aggregation": {
                "alignmentPeriod": "300s",
                "perSeriesAligner": "ALIGN_DELTA",
                "crossSeriesReducer": "REDUCE_PERCENTILE_95"
              }
            }
          }
        }]
      }
    },
    {
      "title": "用戶滿意度",
      "scorecard": {
        "timeSeriesQuery": {
          "timeSeriesFilter": {
            "filter": "metric.type=\"custom.googleapis.com/ai_assistant/user_satisfaction\"",
            "aggregation": {
              "alignmentPeriod": "3600s",
              "crossSeriesReducer": "REDUCE_MEAN"
            }
          }
        }
      }
    }
  ]
}


上一篇
Firebase + Pub/Sub 即時推播:GCP 即時對話
下一篇
Vertex AI Agent Builder 監控與觀測性
系列文
來都來了,那就做一個GCP從0到100的AI助理29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言