今天我們要深入了解如何將這兩個強大的AI服務進行整合,打造出更完整且靈活的AI應用架構。
這種整合不僅能發揮各自的優勢,還能創造出1+1>2的效果
Bedrock的優勢
SageMaker的優勢
階段式處理架構(Staged Processing Architecture)
用戶輸入 → Bedrock(初步處理) → SageMaker(深度分析) → 結果輸出
應用場景:客戶服務系統
並行處理架構(Parallel Processing Architecture)
用戶輸入 -> Bedrock(通用處理) -> 結果整合 -> 輸出
-> SageMaker(專業處理)
應用場景:文件分析系統
決策樹架構(Decision Tree Architecture)
用戶輸入 -> 路由判斷 -> Bedrock(簡單任務)
-> SageMaker(複雜任務)
應用場景:智能推薦系統
系統架構設計
import boto3
import json
from typing import Dict, Any, Optional
class HybridAIProcessor:
def __init__(self):
self.bedrock_client = boto3.client('bedrock-runtime', region_name='us-east-1')
self.sagemaker_client = boto3.client('sagemaker-runtime', region_name='us-east-1')
def process_document(self, document_content: str, document_type: str) -> Dict[str, Any]:
"""
混合式文件處理主函數
"""
# 步驟1: 使用Bedrock進行初步分析
initial_analysis = self._bedrock_analysis(document_content)
# 步驟2: 根據分析結果決定是否需要SageMaker深度處理
if self._needs_deep_analysis(initial_analysis, document_type):
deep_analysis = self._sagemaker_analysis(document_content, document_type)
return self._merge_results(initial_analysis, deep_analysis)
return initial_analysis
def _bedrock_analysis(self, content: str) -> Dict[str, Any]:
"""
使用Bedrock進行文件初步分析
"""
prompt = f"""
請分析以下文件內容,提取關鍵資訊:
1. 文件主題
2. 重要關鍵字
3. 情感傾向
4. 可能的文件類型
文件內容:{content}
請以JSON格式回答。
"""
try:
response = self.bedrock_client.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [{"role": "user", "content": prompt}]
})
)
result = json.loads(response['body'].read())
content_text = result['content'][0]['text']
# 解析JSON回應
try:
analysis_result = json.loads(content_text)
analysis_result['source'] = 'bedrock'
analysis_result['confidence'] = 'standard'
return analysis_result
except json.JSONDecodeError:
return {
'raw_response': content_text,
'source': 'bedrock',
'confidence': 'low'
}
except Exception as e:
return {'error': str(e), 'source': 'bedrock'}
def _needs_deep_analysis(self, initial_result: Dict[str, Any], doc_type: str) -> bool:
"""
判斷是否需要SageMaker深度分析
"""
# 如果是特定類型文件或置信度較低,則需要深度分析
complex_types = ['legal', 'medical', 'financial']
return (doc_type in complex_types or
initial_result.get('confidence') == 'low' or
'error' in initial_result)
def _sagemaker_analysis(self, content: str, doc_type: str) -> Dict[str, Any]:
"""
使用SageMaker進行深度分析
"""
# 根據文件類型選擇對應的SageMaker端點
endpoint_mapping = {
'legal': 'legal-document-analyzer',
'medical': 'medical-text-classifier',
'financial': 'financial-sentiment-analyzer'
}
endpoint_name = endpoint_mapping.get(doc_type, 'general-text-analyzer')
try:
# 準備輸入數據
payload = {
'instances': [{
'text': content,
'analysis_type': doc_type
}]
}
# 調用SageMaker端點
response = self.sagemaker_client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body=json.dumps(payload)
)
result = json.loads(response['Body'].read().decode())
result['source'] = 'sagemaker'
result['confidence'] = 'high'
result['endpoint'] = endpoint_name
return result
except Exception as e:
return {'error': str(e), 'source': 'sagemaker'}
def _merge_results(self, bedrock_result: Dict[str, Any],
sagemaker_result: Dict[str, Any]) -> Dict[str, Any]:
"""
整合Bedrock和SageMaker的分析結果
"""
return {
'bedrock_analysis': bedrock_result,
'sagemaker_analysis': sagemaker_result,
'integrated_insights': self._generate_integrated_insights(
bedrock_result, sagemaker_result
),
'processing_timestamp': boto3.Session().region_name
}
def _generate_integrated_insights(self, bedrock_result: Dict[str, Any],
sagemaker_result: Dict[str, Any]) -> Dict[str, Any]:
"""
生成整合見解
"""
insights = {
'confidence_level': 'high' if sagemaker_result.get('confidence') == 'high' else 'medium',
'processing_methods': ['bedrock', 'sagemaker'],
'recommendations': []
}
# 根據兩個結果生成建議
if bedrock_result.get('confidence') == 'low' and sagemaker_result.get('confidence') == 'high':
insights['recommendations'].append('建議採用SageMaker的分析結果')
elif 'error' not in bedrock_result and 'error' not in sagemaker_result:
insights['recommendations'].append('兩種分析方法結果一致,可信度高')
return insights
import json
import boto3
from hybrid_ai_processor import HybridAIProcessor
def lambda_handler(event, context):
"""
Lambda函數:整合Bedrock與SageMaker的文件處理服務
"""
try:
# 解析輸入參數
document_content = event.get('document_content', '')
document_type = event.get('document_type', 'general')
if not document_content:
return {
'statusCode': 400,
'body': json.dumps({
'error': 'document_content is required'
})
}
# 初始化處理器
processor = HybridAIProcessor()
# 執行混合式分析
result = processor.process_document(document_content, document_type)
# 返回結果
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(result, ensure_ascii=False, indent=2)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': f'Processing failed: {str(e)}'
})
}
API Gateway配置
serverless.yml
# serverless.yml 配置範例
service: hybrid-ai-document-processor
provider:
name: aws
runtime: python3.9
region: us-east-1
iamRoleStatements:
- Effect: Allow
Action:
- bedrock:InvokeModel
- sagemaker:InvokeEndpoint
Resource: "*"
functions:
processDocument:
handler: handler.lambda_handler
events:
- http:
path: /process
method: post
cors: true
environment:
BEDROCK_REGION: us-east-1
SAGEMAKER_REGION: us-east-1
resources:
Resources:
# CloudWatch Log Group
ProcessDocumentLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/hybrid-ai-document-processor-dev-processDocument
RetentionInDays: 14
資料留管理最佳化
import redis
import hashlib
import json
from typing import Optional
class AIResultCache:
def __init__(self):
self.redis_client = redis.Redis(
host='your-elasticache-endpoint',
port=6379,
decode_responses=True
)
self.cache_ttl = 3600 # 1小時快取
def get_cache_key(self, content: str, doc_type: str) -> str:
"""生成快取鍵值"""
content_hash = hashlib.md5(content.encode()).hexdigest()
return f"ai_analysis:{doc_type}:{content_hash}"
def get_cached_result(self, content: str, doc_type: str) -> Optional[Dict]:
"""獲取快取結果"""
cache_key = self.get_cache_key(content, doc_type)
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
return None
def cache_result(self, content: str, doc_type: str, result: Dict):
"""快取分析結果"""
cache_key = self.get_cache_key(content, doc_type)
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps(result, ensure_ascii=False)
)
import boto3
import time
from datetime import datetime
class AIProcessingMonitor:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
def log_processing_metrics(self,
service_used: str,
processing_time: float,
success: bool,
document_type: str):
"""記錄處理指標"""
timestamp = datetime.utcnow()
# 記錄處理時間
self.cloudwatch.put_metric_data(
Namespace='AI/DocumentProcessing',
MetricData=[
{
'MetricName': 'ProcessingTime',
'Dimensions': [
{'Name': 'Service', 'Value': service_used},
{'Name': 'DocumentType', 'Value': document_type}
],
'Value': processing_time,
'Unit': 'Seconds',
'Timestamp': timestamp
},
{
'MetricName': 'ProcessingSuccess',
'Dimensions': [
{'Name': 'Service', 'Value': service_used},
{'Name': 'DocumentType', 'Value': document_type}
],
'Value': 1 if success else 0,
'Unit': 'Count',
'Timestamp': timestamp
}
]
)
def log_cost_metrics(self, service_used: str, estimated_cost: float):
"""記錄成本指標"""
self.cloudwatch.put_metric_data(
Namespace='AI/DocumentProcessing',
MetricData=[
{
'MetricName': 'EstimatedCost',
'Dimensions': [
{'Name': 'Service', 'Value': service_used}
],
'Value': estimated_cost,
'Unit': 'None',
'Timestamp': datetime.utcnow()
}
]
)
class CostOptimizedRouter:
def __init__(self):
# 成本估算(每1000 tokens)
self.costs = {
'bedrock_claude': 0.003, # $0.003 per 1000 tokens
'sagemaker_standard': 0.001, # 根據實例類型
'sagemaker_custom': 0.005
}
def estimate_processing_cost(self, content: str, method: str) -> float:
"""估算處理成本"""
token_count = len(content.split()) * 1.3 # 粗略估算
cost_per_1k_tokens = self.costs.get(method, 0.001)
return (token_count / 1000) * cost_per_1k_tokens
def choose_optimal_method(self, content: str, doc_type: str,
accuracy_requirement: str) -> str:
"""選擇最優處理方法"""
# 計算各種方法的成本
bedrock_cost = self.estimate_processing_cost(content, 'bedrock_claude')
sagemaker_cost = self.estimate_processing_cost(content, 'sagemaker_standard')
# 根據準確性需求和成本選擇
if accuracy_requirement == 'high' and doc_type in ['legal', 'medical']:
return 'hybrid' # 使用混合方法
elif bedrock_cost < sagemaker_cost and accuracy_requirement != 'high':
return 'bedrock_only'
else:
return 'sagemaker_only'