iT邦幫忙

2025 iThome 鐵人賽

DAY 14
0

前言

今天我們要深入了解如何將這兩個強大的AI服務進行整合,打造出更完整且靈活的AI應用架構。
這種整合不僅能發揮各自的優勢,還能創造出1+1>2的效果

簡單複習一下兩者的優勢

Bedrock的優勢

  • 即開即用的基礎模型:無需訓練即可使用Claude、Llama等先進模型
  • 快速部署:幾分鐘內就能建立AI應用原型
  • 成本效益:按使用量付費,無需管理基礎設施
  • 安全合規:內建企業級安全控制

SageMaker的優勢

  • 完整的ML生命週期管理:從資料準備到模型部署的全流程
  • 自定義模型訓練:可針對特定業務需求進行微調
  • 強大的資料處理能力:內建多種資料預處理工具
  • MLOps支持:完善的模型版本控制和自動化部署

整合架構

階段式處理架構(Staged Processing Architecture)

用戶輸入 → Bedrock(初步處理) → SageMaker(深度分析) → 結果輸出

應用場景:客戶服務系統

  • Bedrock:理解用戶意圖,進行初步分類(意圖分類)
  • SageMaker:基於企業特定資料進行精準回應

並行處理架構(Parallel Processing Architecture)

用戶輸入 -> Bedrock(通用處理)    -> 結果整合 -> 輸出
         -> SageMaker(專業處理)

應用場景:文件分析系統

  • Bedrock:提取文件關鍵資訊
  • SageMaker:進行行業特定的合規性檢查

決策樹架構(Decision Tree Architecture)

用戶輸入 -> 路由判斷 -> Bedrock(簡單任務)
                     -> SageMaker(複雜任務)

應用場景:智能推薦系統

  • 簡單推薦:使用Bedrock快速生成
  • 複雜推薦:使用SageMaker的個人化模型

範例:智能文件處理系統

系統架構設計

import boto3
import json
from typing import Dict, Any, Optional

class HybridAIProcessor:
    def __init__(self):
        self.bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')
        self.sagemaker_client = boto3.client('sagemaker-runtime', region_name='us-east-1')
        
    def process_document(self, document_content: str, document_type: str) -> Dict[str, Any]:
        """
        混合式文件處理主函數
        """
        # 步驟1: 使用Bedrock進行初步分析
        initial_analysis = self._bedrock_analysis(document_content)
        
        # 步驟2: 根據分析結果決定是否需要SageMaker深度處理
        if self._needs_deep_analysis(initial_analysis, document_type):
            deep_analysis = self._sagemaker_analysis(document_content, document_type)
            return self._merge_results(initial_analysis, deep_analysis)
        
        return initial_analysis
    
    def _bedrock_analysis(self, content: str) -> Dict[str, Any]:
        """
        使用Bedrock進行文件初步分析
        """
        prompt = f"""
        請分析以下文件內容,提取關鍵資訊:
        1. 文件主題
        2. 重要關鍵字
        3. 情感傾向
        4. 可能的文件類型
        
        文件內容:{content}
        
        請以JSON格式回答。
        """
        
        try:
            response = self.bedrock_client.invoke_model(
                modelId='anthropic.claude-3-sonnet-20240229-v1:0',
                body=json.dumps({
                    "anthropic_version": "bedrock-2023-05-31",
                    "max_tokens": 1000,
                    "messages": [{"role": "user", "content": prompt}]
                })
            )
            
            result = json.loads(response['body'].read())
            content_text = result['content'][0]['text']
            
            # 解析JSON回應
            try:
                analysis_result = json.loads(content_text)
                analysis_result['source'] = 'bedrock'
                analysis_result['confidence'] = 'standard'
                return analysis_result
            except json.JSONDecodeError:
                return {
                    'raw_response': content_text,
                    'source': 'bedrock',
                    'confidence': 'low'
                }
                
        except Exception as e:
            return {'error': str(e), 'source': 'bedrock'}
    
    def _needs_deep_analysis(self, initial_result: Dict[str, Any], doc_type: str) -> bool:
        """
        判斷是否需要SageMaker深度分析
        """
        # 如果是特定類型文件或置信度較低,則需要深度分析
        complex_types = ['legal', 'medical', 'financial']
        
        return (doc_type in complex_types or 
                initial_result.get('confidence') == 'low' or
                'error' in initial_result)
    
    def _sagemaker_analysis(self, content: str, doc_type: str) -> Dict[str, Any]:
        """
        使用SageMaker進行深度分析
        """
        # 根據文件類型選擇對應的SageMaker端點
        endpoint_mapping = {
            'legal': 'legal-document-analyzer',
            'medical': 'medical-text-classifier', 
            'financial': 'financial-sentiment-analyzer'
        }
        
        endpoint_name = endpoint_mapping.get(doc_type, 'general-text-analyzer')
        
        try:
            # 準備輸入數據
            payload = {
                'instances': [{
                    'text': content,
                    'analysis_type': doc_type
                }]
            }
            
            # 調用SageMaker端點
            response = self.sagemaker_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType='application/json',
                Body=json.dumps(payload)
            )
            
            result = json.loads(response['Body'].read().decode())
            result['source'] = 'sagemaker'
            result['confidence'] = 'high'
            result['endpoint'] = endpoint_name
            
            return result
            
        except Exception as e:
            return {'error': str(e), 'source': 'sagemaker'}
    
    def _merge_results(self, bedrock_result: Dict[str, Any], 
                      sagemaker_result: Dict[str, Any]) -> Dict[str, Any]:
        """
        整合Bedrock和SageMaker的分析結果
        """
        return {
            'bedrock_analysis': bedrock_result,
            'sagemaker_analysis': sagemaker_result,
            'integrated_insights': self._generate_integrated_insights(
                bedrock_result, sagemaker_result
            ),
            'processing_timestamp': boto3.Session().region_name
        }
    
    def _generate_integrated_insights(self, bedrock_result: Dict[str, Any], 
                                    sagemaker_result: Dict[str, Any]) -> Dict[str, Any]:
        """
        生成整合見解
        """
        insights = {
            'confidence_level': 'high' if sagemaker_result.get('confidence') == 'high' else 'medium',
            'processing_methods': ['bedrock', 'sagemaker'],
            'recommendations': []
        }
        
        # 根據兩個結果生成建議
        if bedrock_result.get('confidence') == 'low' and sagemaker_result.get('confidence') == 'high':
            insights['recommendations'].append('建議採用SageMaker的分析結果')
        elif 'error' not in bedrock_result and 'error' not in sagemaker_result:
            insights['recommendations'].append('兩種分析方法結果一致,可信度高')
        
        return insights

  • Lambda函數整合範例
import json
import boto3
from hybrid_ai_processor import HybridAIProcessor

def lambda_handler(event, context):
    """
    Lambda函數:整合Bedrock與SageMaker的文件處理服務
    """
    
    try:
        # 解析輸入參數
        document_content = event.get('document_content', '')
        document_type = event.get('document_type', 'general')
        
        if not document_content:
            return {
                'statusCode': 400,
                'body': json.dumps({
                    'error': 'document_content is required'
                })
            }
        
        # 初始化處理器
        processor = HybridAIProcessor()
        
        # 執行混合式分析
        result = processor.process_document(document_content, document_type)
        
        # 返回結果
        return {
            'statusCode': 200,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps(result, ensure_ascii=False, indent=2)
        }
        
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': f'Processing failed: {str(e)}'
            })
        }

API Gateway配置

serverless.yml

# serverless.yml 配置範例
service: hybrid-ai-document-processor

provider:
  name: aws
  runtime: python3.9
  region: us-east-1
  iamRoleStatements:
    - Effect: Allow
      Action:
        - bedrock:InvokeModel
        - sagemaker:InvokeEndpoint
      Resource: "*"

functions:
  processDocument:
    handler: handler.lambda_handler
    events:
      - http:
          path: /process
          method: post
          cors: true
    environment:
      BEDROCK_REGION: us-east-1
      SAGEMAKER_REGION: us-east-1

resources:
  Resources:
    # CloudWatch Log Group
    ProcessDocumentLogGroup:
      Type: AWS::Logs::LogGroup
      Properties:
        LogGroupName: /aws/lambda/hybrid-ai-document-processor-dev-processDocument
        RetentionInDays: 14

資料留管理最佳化

  1. cache strategy
import redis
import hashlib
import json
from typing import Optional

class AIResultCache:
    def __init__(self):
        self.redis_client = redis.Redis(
            host='your-elasticache-endpoint',
            port=6379,
            decode_responses=True
        )
        self.cache_ttl = 3600  # 1小時快取
    
    def get_cache_key(self, content: str, doc_type: str) -> str:
        """生成快取鍵值"""
        content_hash = hashlib.md5(content.encode()).hexdigest()
        return f"ai_analysis:{doc_type}:{content_hash}"
    
    def get_cached_result(self, content: str, doc_type: str) -> Optional[Dict]:
        """獲取快取結果"""
        cache_key = self.get_cache_key(content, doc_type)
        cached_data = self.redis_client.get(cache_key)
        
        if cached_data:
            return json.loads(cached_data)
        return None
    
    def cache_result(self, content: str, doc_type: str, result: Dict):
        """快取分析結果"""
        cache_key = self.get_cache_key(content, doc_type)
        self.redis_client.setex(
            cache_key, 
            self.cache_ttl, 
            json.dumps(result, ensure_ascii=False)
        )

其他部分:監控 - 整盒 cloudwatch

import boto3
import time
from datetime import datetime

class AIProcessingMonitor:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
    
    def log_processing_metrics(self, 
                             service_used: str, 
                             processing_time: float, 
                             success: bool,
                             document_type: str):
        """記錄處理指標"""
        
        timestamp = datetime.utcnow()
        
        # 記錄處理時間
        self.cloudwatch.put_metric_data(
            Namespace='AI/DocumentProcessing',
            MetricData=[
                {
                    'MetricName': 'ProcessingTime',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': service_used},
                        {'Name': 'DocumentType', 'Value': document_type}
                    ],
                    'Value': processing_time,
                    'Unit': 'Seconds',
                    'Timestamp': timestamp
                },
                {
                    'MetricName': 'ProcessingSuccess',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': service_used},
                        {'Name': 'DocumentType', 'Value': document_type}
                    ],
                    'Value': 1 if success else 0,
                    'Unit': 'Count',
                    'Timestamp': timestamp
                }
            ]
        )
    
    def log_cost_metrics(self, service_used: str, estimated_cost: float):
        """記錄成本指標"""
        self.cloudwatch.put_metric_data(
            Namespace='AI/DocumentProcessing',
            MetricData=[
                {
                    'MetricName': 'EstimatedCost',
                    'Dimensions': [
                        {'Name': 'Service', 'Value': service_used}
                    ],
                    'Value': estimated_cost,
                    'Unit': 'None',
                    'Timestamp': datetime.utcnow()
                }
            ]
        )

  • 智能路由決策
class CostOptimizedRouter:
    def __init__(self):
        # 成本估算(每1000 tokens)
        self.costs = {
            'bedrock_claude': 0.003,  # $0.003 per 1000 tokens
            'sagemaker_standard': 0.001,  # 根據實例類型
            'sagemaker_custom': 0.005
        }
    
    def estimate_processing_cost(self, content: str, method: str) -> float:
        """估算處理成本"""
        token_count = len(content.split()) * 1.3  # 粗略估算
        cost_per_1k_tokens = self.costs.get(method, 0.001)
        return (token_count / 1000) * cost_per_1k_tokens
    
    def choose_optimal_method(self, content: str, doc_type: str, 
                            accuracy_requirement: str) -> str:
        """選擇最優處理方法"""
        
        # 計算各種方法的成本
        bedrock_cost = self.estimate_processing_cost(content, 'bedrock_claude')
        sagemaker_cost = self.estimate_processing_cost(content, 'sagemaker_standard')
        
        # 根據準確性需求和成本選擇
        if accuracy_requirement == 'high' and doc_type in ['legal', 'medical']:
            return 'hybrid'  # 使用混合方法
        elif bedrock_cost < sagemaker_cost and accuracy_requirement != 'high':
            return 'bedrock_only'
        else:
            return 'sagemaker_only'

上一篇
SageMaker Endpoints:模型部署與推理
下一篇
中期專案:智慧客服聊天機器人
系列文
從零開始的AWS AI之路:用Bedrock與SageMaker打造智慧應用的30天實戰15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言