今天我們要學習如何在保持效能的同時,有效控制成本!透過智能的資源管理和優化策略,讓你的 AI 助理既強大又經濟實惠。
AI 應用的主要成本來源:
不做優化的代價:成本可能是優化後的 3-10 倍!
from typing import Dict, Any, List
from datetime import datetime, timedelta
from dataclasses import dataclass
import json
from collections import defaultdict
@dataclass
class CostRecord:
"""成本記錄"""
timestamp: datetime
service: str # gemini_api, cloud_run, storage, etc.
resource: str # model_name, instance_type, etc.
quantity: float # tokens, requests, GB, etc.
unit: str
cost: float
metadata: Dict[str, Any]
class CostTracker:
"""成本追蹤器"""
# 價格表(以 Gemini API 為例,需根據實際情況更新)
PRICING = {
'gemini_api': {
'gemini-2.0-flash-exp': {
'input_tokens': 0.075 / 1_000_000, # $0.075 per 1M tokens
'output_tokens': 0.30 / 1_000_000, # $0.30 per 1M tokens
},
'gemini-pro': {
'input_tokens': 0.125 / 1_000_000,
'output_tokens': 0.375 / 1_000_000,
}
},
'cloud_run': {
'cpu': 0.00002400, # per vCPU-second
'memory': 0.00000250, # per GiB-second
'requests': 0.40 / 1_000_000, # per million requests
},
'storage': {
'standard': 0.020 / 1024, # per GB-month
'nearline': 0.010 / 1024,
}
}
def __init__(self):
self.records: List[CostRecord] = []
self.daily_costs = defaultdict(float)
def track_api_call(self, model: str, input_tokens: int,
output_tokens: int, metadata: Dict = None):
"""追蹤 API 呼叫成本"""
pricing = self.PRICING['gemini_api'].get(model, {})
input_cost = input_tokens * pricing.get('input_tokens', 0)
output_cost = output_tokens * pricing.get('output_tokens', 0)
total_cost = input_cost + output_cost
record = CostRecord(
timestamp=datetime.utcnow(),
service='gemini_api',
resource=model,
quantity=input_tokens + output_tokens,
unit='tokens',
cost=total_cost,
metadata={
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'input_cost': input_cost,
'output_cost': output_cost,
**(metadata or {})
}
)
self.records.append(record)
self._update_daily_cost(record)
return total_cost
def track_compute(self, cpu_seconds: float, memory_gb_seconds: float,
requests: int = 0):
"""追蹤運算資源成本"""
cpu_cost = cpu_seconds * self.PRICING['cloud_run']['cpu']
memory_cost = memory_gb_seconds * self.PRICING['cloud_run']['memory']
request_cost = requests * self.PRICING['cloud_run']['requests']
total_cost = cpu_cost + memory_cost + request_cost
record = CostRecord(
timestamp=datetime.utcnow(),
service='cloud_run',
resource='compute',
quantity=cpu_seconds,
unit='cpu_seconds',
cost=total_cost,
metadata={
'cpu_seconds': cpu_seconds,
'memory_gb_seconds': memory_gb_seconds,
'requests': requests,
'cpu_cost': cpu_cost,
'memory_cost': memory_cost,
'request_cost': request_cost
}
)
self.records.append(record)
self._update_daily_cost(record)
return total_cost
def _update_daily_cost(self, record: CostRecord):
"""更新每日成本"""
date_key = record.timestamp.date().isoformat()
self.daily_costs[date_key] += record.cost
def get_daily_report(self, date: datetime = None) -> Dict[str, Any]:
"""獲取每日成本報告"""
if date is None:
date = datetime.utcnow()
date_key = date.date().isoformat()
day_records = [
r for r in self.records
if r.timestamp.date().isoformat() == date_key
]
# 按服務分類
by_service = defaultdict(float)
by_resource = defaultdict(float)
for record in day_records:
by_service[record.service] += record.cost
by_resource[f"{record.service}:{record.resource}"] += record.cost
return {
'date': date_key,
'total_cost': self.daily_costs[date_key],
'by_service': dict(by_service),
'by_resource': dict(by_resource),
'record_count': len(day_records)
}
def get_monthly_projection(self) -> Dict[str, Any]:
"""獲取月度成本預測"""
# 計算過去 7 天的平均成本
now = datetime.utcnow()
last_7_days = [
(now - timedelta(days=i)).date().isoformat()
for i in range(7)
]
recent_costs = [
self.daily_costs.get(date, 0)
for date in last_7_days
]
avg_daily_cost = sum(recent_costs) / len(recent_costs) if recent_costs else 0
projected_monthly = avg_daily_cost * 30
return {
'avg_daily_cost': avg_daily_cost,
'projected_monthly': projected_monthly,
'recent_7_days': recent_costs
}
def get_top_consumers(self, limit: int = 10) -> List[Dict]:
"""獲取成本消耗 Top N"""
resource_costs = defaultdict(float)
for record in self.records:
key = f"{record.service}:{record.resource}"
resource_costs[key] += record.cost
sorted_costs = sorted(
resource_costs.items(),
key=lambda x: x[1],
reverse=True
)
return [
{'resource': k, 'cost': v}
for k, v in sorted_costs[:limit]
]
# 全域成本追蹤器
cost_tracker = CostTracker()
from typing import Any, Optional, Callable
import hashlib
import json
import redis
from functools import wraps
import time
class SmartCache:
"""智能快取系統"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.hit_count = 0
self.miss_count = 0
self.saved_cost = 0.0
def cache_key(self, *args, **kwargs) -> str:
"""生成快取鍵"""
# 將參數序列化為字串
key_data = {
'args': str(args),
'kwargs': str(sorted(kwargs.items()))
}
key_str = json.dumps(key_data, sort_keys=True)
# 生成 hash
return hashlib.sha256(key_str.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""從快取獲取"""
try:
data = self.redis.get(f"cache:{key}")
if data:
self.hit_count += 1
return json.loads(data)
else:
self.miss_count += 1
return None
except Exception as e:
print(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = 3600):
"""設定快取"""
try:
self.redis.setex(
f"cache:{key}",
ttl,
json.dumps(value, ensure_ascii=False)
)
except Exception as e:
print(f"Cache set error: {e}")
def cached_ai_call(self, ttl: int = 3600, cost_per_call: float = 0.001):
"""快取 AI 呼叫的裝飾器"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成快取鍵
cache_key = self.cache_key(*args, **kwargs)
# 嘗試從快取獲取
cached_result = self.get(cache_key)
if cached_result is not None:
# 快取命中,節省成本
self.saved_cost += cost_per_call
print(f"💾 快取命中,節省 ${cost_per_call:.6f}")
return cached_result
# 快取未命中,呼叫原函數
result = await func(*args, **kwargs)
# 儲存到快取
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def get_stats(self) -> Dict[str, Any]:
"""獲取快取統計"""
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
return {
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_rate': hit_rate,
'saved_cost': self.saved_cost,
'estimated_monthly_savings': self.saved_cost * 30
}
# 初始化智能快取
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
smart_cache = SmartCache(redis_client)
from typing import Dict, Any
import tiktoken
class TokenOptimizer:
"""Token 優化器"""
def __init__(self):
# 使用 tiktoken 估算 token
try:
self.encoding = tiktoken.get_encoding("cl100k_base")
except:
self.encoding = None
def count_tokens(self, text: str) -> int:
"""計算 token 數量"""
if self.encoding:
return len(self.encoding.encode(text))
else:
# 簡化估算:1 token ≈ 4 個字元
return len(text) // 4
def optimize_prompt(self, prompt: str, max_tokens: int = 1000) -> Dict[str, Any]:
"""優化提示詞"""
current_tokens = self.count_tokens(prompt)
if current_tokens <= max_tokens:
return {
'optimized_prompt': prompt,
'original_tokens': current_tokens,
'optimized_tokens': current_tokens,
'reduction': 0
}
# 優化策略
optimized = prompt
# 1. 移除多餘空白
optimized = ' '.join(optimized.split())
# 2. 縮短過長的範例
if '範例:' in optimized:
parts = optimized.split('範例:')
if len(parts) > 1:
examples = parts[1][:500] # 限制範例長度
optimized = parts[0] + '範例:' + examples
# 3. 使用更簡潔的表達
replacements = {
'請你幫我': '請',
'非常感謝': '謝謝',
'能不能夠': '能否',
}
for old, new in replacements.items():
optimized = optimized.replace(old, new)
optimized_tokens = self.count_tokens(optimized)
return {
'optimized_prompt': optimized,
'original_tokens': current_tokens,
'optimized_tokens': optimized_tokens,
'reduction': current_tokens - optimized_tokens,
'cost_saved': self._estimate_cost_saved(
current_tokens - optimized_tokens
)
}
def _estimate_cost_saved(self, tokens_saved: int) -> float:
"""估算節省的成本"""
# 假設平均價格 $0.10 per 1M tokens
avg_price = 0.10 / 1_000_000
return tokens_saved * avg_price
def batch_optimize(self, prompts: list) -> Dict[str, Any]:
"""批次優化多個提示詞"""
results = []
total_saved_tokens = 0
total_saved_cost = 0
for prompt in prompts:
result = self.optimize_prompt(prompt)
results.append(result)
total_saved_tokens += result['reduction']
total_saved_cost += result.get('cost_saved', 0)
return {
'results': results,
'total_saved_tokens': total_saved_tokens,
'total_saved_cost': total_saved_cost,
'avg_reduction': total_saved_tokens / len(prompts) if prompts else 0
}
token_optimizer = TokenOptimizer()
import psutil
from typing import Dict, Any
from datetime import datetime, timedelta
class ResourceManager:
"""資源管理器"""
def __init__(self):
self.resource_history = []
self.alert_thresholds = {
'cpu': 80, # CPU 使用率 %
'memory': 85, # 記憶體使用率 %
'disk': 90, # 磁碟使用率 %
}
def get_resource_usage(self) -> Dict[str, Any]:
"""獲取資源使用情況"""
usage = {
'timestamp': datetime.utcnow().isoformat(),
'cpu': {
'percent': psutil.cpu_percent(interval=1),
'count': psutil.cpu_count()
},
'memory': {
'percent': psutil.virtual_memory().percent,
'available_gb': psutil.virtual_memory().available / (1024**3),
'total_gb': psutil.virtual_memory().total / (1024**3)
},
'disk': {
'percent': psutil.disk_usage('/').percent,
'free_gb': psutil.disk_usage('/').free / (1024**3)
}
}
self.resource_history.append(usage)
# 只保留最近 1 小時的數據
cutoff_time = datetime.utcnow() - timedelta(hours=1)
self.resource_history = [
r for r in self.resource_history
if datetime.fromisoformat(r['timestamp']) > cutoff_time
]
return usage
def check_alerts(self) -> List[Dict[str, Any]]:
"""檢查資源警報"""
usage = self.get_resource_usage()
alerts = []
if usage['cpu']['percent'] > self.alert_thresholds['cpu']:
alerts.append({
'type': 'cpu',
'severity': 'warning',
'message': f"CPU 使用率過高: {usage['cpu']['percent']}%",
'recommendation': '考慮水平擴展或優化程式碼'
})
if usage['memory']['percent'] > self.alert_thresholds['memory']:
alerts.append({
'type': 'memory',
'severity': 'critical',
'message': f"記憶體使用率過高: {usage['memory']['percent']}%",
'recommendation': '檢查記憶體洩漏或增加記憶體配置'
})
if usage['disk']['percent'] > self.alert_thresholds['disk']:
alerts.append({
'type': 'disk',
'severity': 'critical',
'message': f"磁碟使用率過高: {usage['disk']['percent']}%",
'recommendation': '清理日誌或增加儲存空間'
})
return alerts
def get_optimization_recommendations(self) -> List[str]:
"""獲取優化建議"""
recommendations = []
if len(self.resource_history) < 10:
return recommendations
# 分析趨勢
recent_cpu = [r['cpu']['percent'] for r in self.resource_history[-10:]]
avg_cpu = sum(recent_cpu) / len(recent_cpu)
if avg_cpu > 60:
recommendations.append(
"CPU 使用率持續偏高,建議實作快取或使用更高效的演算法"
)
recent_memory = [r['memory']['percent'] for r in self.resource_history[-10:]]
avg_memory = sum(recent_memory) / len(recent_memory)
if avg_memory > 70:
recommendations.append(
"記憶體使用率持續偏高,建議優化記憶體使用或增加資源"
)
return recommendations
def estimate_optimal_resources(self) -> Dict[str, Any]:
"""估算最佳資源配置"""
if len(self.resource_history) < 20:
return {
'message': '數據不足,需要更多監控數據'
}
# 計算 95th percentile
recent_cpu = sorted([r['cpu']['percent'] for r in self.resource_history])
recent_memory = sorted([r['memory']['percent'] for r in self.resource_history])
p95_index = int(len(recent_cpu) * 0.95)
p95_cpu = recent_cpu[p95_index]
p95_memory = recent_memory[p95_index]
# 建議配置(保留 20% 緩衝)
current_cpu_count = psutil.cpu_count()
current_memory_gb = psutil.virtual_memory().total / (1024**3)
recommended_cpu = max(1, int(current_cpu_count * (p95_cpu / 100) * 1.2))
recommended_memory = current_memory_gb * (p95_memory / 100) * 1.2
return {
'current': {
'cpu_count': current_cpu_count,
'memory_gb': current_memory_gb
},
'p95_usage': {
'cpu_percent': p95_cpu,
'memory_percent': p95_memory
},
'recommended': {
'cpu_count': recommended_cpu,
'memory_gb': recommended_memory
},
'potential_savings': self._calculate_savings(
current_cpu_count,
recommended_cpu,
current_memory_gb,
recommended_memory
)
}
def _calculate_savings(self, current_cpu: int, rec_cpu: int,
current_mem: float, rec_mem: float) -> Dict[str, float]:
"""計算潛在節省"""
# 簡化的成本計算(實際需根據雲端服務商定價)
cpu_cost_per_unit = 20 # 每個 vCPU 每月
mem_cost_per_gb = 5 # 每 GB 每月
current_cost = (current_cpu * cpu_cost_per_unit +
current_mem * mem_cost_per_gb)
recommended_cost = (rec_cpu * cpu_cost_per_unit +
rec_mem * mem_cost_per_gb)
savings = current_cost - recommended_cost
savings_percent = (savings / current_cost * 100) if current_cost > 0 else 0
return {
'monthly_savings': max(0, savings),
'savings_percent': max(0, savings_percent)
}
resource_manager = ResourceManager()
from app.cost_tracker import cost_tracker
from app.smart_cache import smart_cache
from app.token_optimizer import token_optimizer
from app.resource_manager import resource_manager
import google.generativeai as genai
# 優化的 AI 呼叫
@smart_cache.cached_ai_call(ttl=3600, cost_per_call=0.001)
async def optimized_ai_call(prompt: str, model: str = "gemini-2.0-flash-exp"):
"""優化的 AI 呼叫"""
start_time = time.time()
# Token 優化
optimization = token_optimizer.optimize_prompt(prompt)
optimized_prompt = optimization['optimized_prompt']
# 記錄優化效果
if optimization['reduction'] > 0:
print(f"✂️ Token 優化:減少 {optimization['reduction']} tokens")
try:
# 呼叫 Gemini
gemini_model = genai.GenerativeModel(model)
response = gemini_model.generate_content(optimized_prompt)
# 估算 token 使用
input_tokens = token_optimizer.count_tokens(optimized_prompt)
output_tokens = token_optimizer.count_tokens(response.text)
# 追蹤成本
cost = cost_tracker.track_api_call(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
metadata={
'duration': time.time() - start_time,
'tokens_saved': optimization['reduction']
}
)
print(f"💰 本次呼叫成本:${cost:.6f}")
return response.text
except Exception as e:
print(f"❌ AI 呼叫失敗:{e}")
raise
# 成本監控端點
@app.get("/cost/dashboard")
async def cost_dashboard(current_user: User = Depends(get_current_active_admin)):
"""成本監控儀表板"""
return {
'daily_report': cost_tracker.get_daily_report(),
'monthly_projection': cost_tracker.get_monthly_projection(),
'top_consumers': cost_tracker.get_top_consumers(),
'cache_stats': smart_cache.get_stats(),
'resource_usage': resource_manager.get_resource_usage(),
'optimization_recommendations': resource_manager.get_optimization_recommendations()
}
@app.get("/cost/optimize")
async def cost_optimization_suggestions(
current_user: User = Depends(get_current_active_admin)
):
"""成本優化建議"""
return {
'resource_optimization': resource_manager.estimate_optimal_resources(),
'cache_effectiveness': smart_cache.get_stats(),
'token_optimization': {
'enabled': True,
'avg_reduction': '15-25%',
'monthly_savings_estimate': smart_cache.saved_cost * 30
},
'recommendations': [
{
'priority': 'high',
'action': '實作更積極的快取策略',
'potential_savings': '30-40%',
'effort': 'medium'
},
{
'priority': 'medium',
'action': '優化提示詞模板',
'potential_savings': '15-20%',
'effort': 'low'
},
{
'priority': 'medium',
'action': '調整資源配置',
'potential_savings': '10-15%',
'effort': 'low'
}
]
}
# ❌ 未優化
response = model.generate_content("""
這是一個非常冗長的提示詞,包含了很多不必要的說明...
重複的內容...
冗餘的範例...
""")
# ✅ 優化後
response = model.generate_content("""
簡潔的提示:核心需求
必要範例:示例1
預期輸出:格式說明
""")
# 節省:30-50% tokens
## 快取優先級
### Tier 1: 長期快取 (24小時+)
- 靜態內容(FAQ、產品資訊)
- 不常變動的分析結果
- 標準化的回應模板
### Tier 2: 中期快取 (1-6小時)
- 個性化但不頻繁變動的內容
- 統計分析結果
- 使用者偏好
### Tier 3: 短期快取 (5-30分鐘)
- 即時查詢結果
- 動態生成的內容
- 會話資料
# ❌ 逐筆處理
for item in items:
result = await ai_call(item) # 100 次呼叫
# ✅ 批次處理
batch_prompt = f"處理以下項目:{items}"
results = await ai_call(batch_prompt) # 1 次呼叫
# 節省:90% 呼叫次數
# 根據任務複雜度選擇模型
task_models = {
'simple': 'gemini-2.0-flash-exp', # 最便宜
'moderate': 'gemini-pro', # 平衡
'complex': 'gemini-pro-vision', # 最強但最貴
}
# 動態選擇
def select_model(task_complexity: str) -> str:
return task_models.get(task_complexity, 'gemini-2.0-flash-exp')
{
"summary": {
"today_cost": 1.25,
"yesterday_cost": 1.18,
"month_to_date": 35.40,
"projected_monthly": 42.00,
"vs_last_month": "-15%"
},
"breakdown": {
"gemini_api": {
"cost": 0.85,
"percentage": 68%,
"calls": 1250,
"tokens": 850000
},
"cloud_run": {
"cost": 0.30,
"percentage": 24%
},
"storage": {
"cost": 0.10,
"percentage": 8%
}
},
"optimization_impact": {
"cache_hit_rate": 45%,
"tokens_saved": 125000,
"cost_saved": 0.15,
"monthly_savings_projection": 4.50
},
"alerts": [
{
"type": "cost_spike",
"message": "API 呼叫比昨天增加 30%",
"action": "檢查是否有異常流量"
}
]
}
今天我們建立了完整的成本控制系統:
✅ 成本追蹤:詳細記錄每筆開銷
✅ 智能快取:減少重複的 API 呼叫
✅ Token 優化:自動優化提示詞
✅ 資源監控:即時追蹤資源使用
✅ 自動建議:AI 驅動的優化建議