iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0

Day 26: 成本控制與資源優化

今天我們要學習如何在保持效能的同時,有效控制成本!透過智能的資源管理和優化策略,讓你的 AI 助理既強大又經濟實惠。

💰 為什麼需要成本控制?

AI 應用的主要成本來源:

  • 🤖 API 呼叫費用:Gemini API 的 token 使用
  • ☁️ 雲端運算資源:CPU、記憶體、儲存
  • 🌐 網路流量:資料傳輸費用
  • 💾 資料儲存:資料庫、快取、日誌
  • 🔧 第三方服務:監控、日誌、分析工具

不做優化的代價:成本可能是優化後的 3-10 倍

📊 成本分析與監控

1. 成本追蹤系統 (app/cost_tracker.py)

from typing import Dict, Any, List
from datetime import datetime, timedelta
from dataclasses import dataclass
import json
from collections import defaultdict

@dataclass
class CostRecord:
    """成本記錄"""
    timestamp: datetime
    service: str  # gemini_api, cloud_run, storage, etc.
    resource: str  # model_name, instance_type, etc.
    quantity: float  # tokens, requests, GB, etc.
    unit: str
    cost: float
    metadata: Dict[str, Any]

class CostTracker:
    """成本追蹤器"""
    
    # 價格表(以 Gemini API 為例,需根據實際情況更新)
    PRICING = {
        'gemini_api': {
            'gemini-2.0-flash-exp': {
                'input_tokens': 0.075 / 1_000_000,   # $0.075 per 1M tokens
                'output_tokens': 0.30 / 1_000_000,   # $0.30 per 1M tokens
            },
            'gemini-pro': {
                'input_tokens': 0.125 / 1_000_000,
                'output_tokens': 0.375 / 1_000_000,
            }
        },
        'cloud_run': {
            'cpu': 0.00002400,  # per vCPU-second
            'memory': 0.00000250,  # per GiB-second
            'requests': 0.40 / 1_000_000,  # per million requests
        },
        'storage': {
            'standard': 0.020 / 1024,  # per GB-month
            'nearline': 0.010 / 1024,
        }
    }
    
    def __init__(self):
        self.records: List[CostRecord] = []
        self.daily_costs = defaultdict(float)
    
    def track_api_call(self, model: str, input_tokens: int, 
                      output_tokens: int, metadata: Dict = None):
        """追蹤 API 呼叫成本"""
        pricing = self.PRICING['gemini_api'].get(model, {})
        
        input_cost = input_tokens * pricing.get('input_tokens', 0)
        output_cost = output_tokens * pricing.get('output_tokens', 0)
        total_cost = input_cost + output_cost
        
        record = CostRecord(
            timestamp=datetime.utcnow(),
            service='gemini_api',
            resource=model,
            quantity=input_tokens + output_tokens,
            unit='tokens',
            cost=total_cost,
            metadata={
                'input_tokens': input_tokens,
                'output_tokens': output_tokens,
                'input_cost': input_cost,
                'output_cost': output_cost,
                **(metadata or {})
            }
        )
        
        self.records.append(record)
        self._update_daily_cost(record)
        
        return total_cost
    
    def track_compute(self, cpu_seconds: float, memory_gb_seconds: float, 
                     requests: int = 0):
        """追蹤運算資源成本"""
        cpu_cost = cpu_seconds * self.PRICING['cloud_run']['cpu']
        memory_cost = memory_gb_seconds * self.PRICING['cloud_run']['memory']
        request_cost = requests * self.PRICING['cloud_run']['requests']
        
        total_cost = cpu_cost + memory_cost + request_cost
        
        record = CostRecord(
            timestamp=datetime.utcnow(),
            service='cloud_run',
            resource='compute',
            quantity=cpu_seconds,
            unit='cpu_seconds',
            cost=total_cost,
            metadata={
                'cpu_seconds': cpu_seconds,
                'memory_gb_seconds': memory_gb_seconds,
                'requests': requests,
                'cpu_cost': cpu_cost,
                'memory_cost': memory_cost,
                'request_cost': request_cost
            }
        )
        
        self.records.append(record)
        self._update_daily_cost(record)
        
        return total_cost
    
    def _update_daily_cost(self, record: CostRecord):
        """更新每日成本"""
        date_key = record.timestamp.date().isoformat()
        self.daily_costs[date_key] += record.cost
    
    def get_daily_report(self, date: datetime = None) -> Dict[str, Any]:
        """獲取每日成本報告"""
        if date is None:
            date = datetime.utcnow()
        
        date_key = date.date().isoformat()
        day_records = [
            r for r in self.records 
            if r.timestamp.date().isoformat() == date_key
        ]
        
        # 按服務分類
        by_service = defaultdict(float)
        by_resource = defaultdict(float)
        
        for record in day_records:
            by_service[record.service] += record.cost
            by_resource[f"{record.service}:{record.resource}"] += record.cost
        
        return {
            'date': date_key,
            'total_cost': self.daily_costs[date_key],
            'by_service': dict(by_service),
            'by_resource': dict(by_resource),
            'record_count': len(day_records)
        }
    
    def get_monthly_projection(self) -> Dict[str, Any]:
        """獲取月度成本預測"""
        # 計算過去 7 天的平均成本
        now = datetime.utcnow()
        last_7_days = [
            (now - timedelta(days=i)).date().isoformat() 
            for i in range(7)
        ]
        
        recent_costs = [
            self.daily_costs.get(date, 0) 
            for date in last_7_days
        ]
        
        avg_daily_cost = sum(recent_costs) / len(recent_costs) if recent_costs else 0
        projected_monthly = avg_daily_cost * 30
        
        return {
            'avg_daily_cost': avg_daily_cost,
            'projected_monthly': projected_monthly,
            'recent_7_days': recent_costs
        }
    
    def get_top_consumers(self, limit: int = 10) -> List[Dict]:
        """獲取成本消耗 Top N"""
        resource_costs = defaultdict(float)
        
        for record in self.records:
            key = f"{record.service}:{record.resource}"
            resource_costs[key] += record.cost
        
        sorted_costs = sorted(
            resource_costs.items(), 
            key=lambda x: x[1], 
            reverse=True
        )
        
        return [
            {'resource': k, 'cost': v} 
            for k, v in sorted_costs[:limit]
        ]

# 全域成本追蹤器
cost_tracker = CostTracker()

2. 智能快取策略 (app/smart_cache.py)

from typing import Any, Optional, Callable
import hashlib
import json
import redis
from functools import wraps
import time

class SmartCache:
    """智能快取系統"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.hit_count = 0
        self.miss_count = 0
        self.saved_cost = 0.0
    
    def cache_key(self, *args, **kwargs) -> str:
        """生成快取鍵"""
        # 將參數序列化為字串
        key_data = {
            'args': str(args),
            'kwargs': str(sorted(kwargs.items()))
        }
        key_str = json.dumps(key_data, sort_keys=True)
        
        # 生成 hash
        return hashlib.sha256(key_str.encode()).hexdigest()
    
    def get(self, key: str) -> Optional[Any]:
        """從快取獲取"""
        try:
            data = self.redis.get(f"cache:{key}")
            if data:
                self.hit_count += 1
                return json.loads(data)
            else:
                self.miss_count += 1
                return None
        except Exception as e:
            print(f"Cache get error: {e}")
            return None
    
    def set(self, key: str, value: Any, ttl: int = 3600):
        """設定快取"""
        try:
            self.redis.setex(
                f"cache:{key}",
                ttl,
                json.dumps(value, ensure_ascii=False)
            )
        except Exception as e:
            print(f"Cache set error: {e}")
    
    def cached_ai_call(self, ttl: int = 3600, cost_per_call: float = 0.001):
        """快取 AI 呼叫的裝飾器"""
        def decorator(func: Callable):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # 生成快取鍵
                cache_key = self.cache_key(*args, **kwargs)
                
                # 嘗試從快取獲取
                cached_result = self.get(cache_key)
                
                if cached_result is not None:
                    # 快取命中,節省成本
                    self.saved_cost += cost_per_call
                    print(f"💾 快取命中,節省 ${cost_per_call:.6f}")
                    return cached_result
                
                # 快取未命中,呼叫原函數
                result = await func(*args, **kwargs)
                
                # 儲存到快取
                self.set(cache_key, result, ttl)
                
                return result
            
            return wrapper
        return decorator
    
    def get_stats(self) -> Dict[str, Any]:
        """獲取快取統計"""
        total_requests = self.hit_count + self.miss_count
        hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
        
        return {
            'hit_count': self.hit_count,
            'miss_count': self.miss_count,
            'hit_rate': hit_rate,
            'saved_cost': self.saved_cost,
            'estimated_monthly_savings': self.saved_cost * 30
        }

# 初始化智能快取
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
smart_cache = SmartCache(redis_client)

3. Token 優化器 (app/token_optimizer.py)

from typing import Dict, Any
import tiktoken

class TokenOptimizer:
    """Token 優化器"""
    
    def __init__(self):
        # 使用 tiktoken 估算 token
        try:
            self.encoding = tiktoken.get_encoding("cl100k_base")
        except:
            self.encoding = None
    
    def count_tokens(self, text: str) -> int:
        """計算 token 數量"""
        if self.encoding:
            return len(self.encoding.encode(text))
        else:
            # 簡化估算:1 token ≈ 4 個字元
            return len(text) // 4
    
    def optimize_prompt(self, prompt: str, max_tokens: int = 1000) -> Dict[str, Any]:
        """優化提示詞"""
        current_tokens = self.count_tokens(prompt)
        
        if current_tokens <= max_tokens:
            return {
                'optimized_prompt': prompt,
                'original_tokens': current_tokens,
                'optimized_tokens': current_tokens,
                'reduction': 0
            }
        
        # 優化策略
        optimized = prompt
        
        # 1. 移除多餘空白
        optimized = ' '.join(optimized.split())
        
        # 2. 縮短過長的範例
        if '範例:' in optimized:
            parts = optimized.split('範例:')
            if len(parts) > 1:
                examples = parts[1][:500]  # 限制範例長度
                optimized = parts[0] + '範例:' + examples
        
        # 3. 使用更簡潔的表達
        replacements = {
            '請你幫我': '請',
            '非常感謝': '謝謝',
            '能不能夠': '能否',
        }
        
        for old, new in replacements.items():
            optimized = optimized.replace(old, new)
        
        optimized_tokens = self.count_tokens(optimized)
        
        return {
            'optimized_prompt': optimized,
            'original_tokens': current_tokens,
            'optimized_tokens': optimized_tokens,
            'reduction': current_tokens - optimized_tokens,
            'cost_saved': self._estimate_cost_saved(
                current_tokens - optimized_tokens
            )
        }
    
    def _estimate_cost_saved(self, tokens_saved: int) -> float:
        """估算節省的成本"""
        # 假設平均價格 $0.10 per 1M tokens
        avg_price = 0.10 / 1_000_000
        return tokens_saved * avg_price
    
    def batch_optimize(self, prompts: list) -> Dict[str, Any]:
        """批次優化多個提示詞"""
        results = []
        total_saved_tokens = 0
        total_saved_cost = 0
        
        for prompt in prompts:
            result = self.optimize_prompt(prompt)
            results.append(result)
            total_saved_tokens += result['reduction']
            total_saved_cost += result.get('cost_saved', 0)
        
        return {
            'results': results,
            'total_saved_tokens': total_saved_tokens,
            'total_saved_cost': total_saved_cost,
            'avg_reduction': total_saved_tokens / len(prompts) if prompts else 0
        }

token_optimizer = TokenOptimizer()

4. 資源監控與自動調整 (app/resource_manager.py)

import psutil
from typing import Dict, Any
from datetime import datetime, timedelta

class ResourceManager:
    """資源管理器"""
    
    def __init__(self):
        self.resource_history = []
        self.alert_thresholds = {
            'cpu': 80,  # CPU 使用率 %
            'memory': 85,  # 記憶體使用率 %
            'disk': 90,  # 磁碟使用率 %
        }
    
    def get_resource_usage(self) -> Dict[str, Any]:
        """獲取資源使用情況"""
        usage = {
            'timestamp': datetime.utcnow().isoformat(),
            'cpu': {
                'percent': psutil.cpu_percent(interval=1),
                'count': psutil.cpu_count()
            },
            'memory': {
                'percent': psutil.virtual_memory().percent,
                'available_gb': psutil.virtual_memory().available / (1024**3),
                'total_gb': psutil.virtual_memory().total / (1024**3)
            },
            'disk': {
                'percent': psutil.disk_usage('/').percent,
                'free_gb': psutil.disk_usage('/').free / (1024**3)
            }
        }
        
        self.resource_history.append(usage)
        
        # 只保留最近 1 小時的數據
        cutoff_time = datetime.utcnow() - timedelta(hours=1)
        self.resource_history = [
            r for r in self.resource_history 
            if datetime.fromisoformat(r['timestamp']) > cutoff_time
        ]
        
        return usage
    
    def check_alerts(self) -> List[Dict[str, Any]]:
        """檢查資源警報"""
        usage = self.get_resource_usage()
        alerts = []
        
        if usage['cpu']['percent'] > self.alert_thresholds['cpu']:
            alerts.append({
                'type': 'cpu',
                'severity': 'warning',
                'message': f"CPU 使用率過高: {usage['cpu']['percent']}%",
                'recommendation': '考慮水平擴展或優化程式碼'
            })
        
        if usage['memory']['percent'] > self.alert_thresholds['memory']:
            alerts.append({
                'type': 'memory',
                'severity': 'critical',
                'message': f"記憶體使用率過高: {usage['memory']['percent']}%",
                'recommendation': '檢查記憶體洩漏或增加記憶體配置'
            })
        
        if usage['disk']['percent'] > self.alert_thresholds['disk']:
            alerts.append({
                'type': 'disk',
                'severity': 'critical',
                'message': f"磁碟使用率過高: {usage['disk']['percent']}%",
                'recommendation': '清理日誌或增加儲存空間'
            })
        
        return alerts
    
    def get_optimization_recommendations(self) -> List[str]:
        """獲取優化建議"""
        recommendations = []
        
        if len(self.resource_history) < 10:
            return recommendations
        
        # 分析趨勢
        recent_cpu = [r['cpu']['percent'] for r in self.resource_history[-10:]]
        avg_cpu = sum(recent_cpu) / len(recent_cpu)
        
        if avg_cpu > 60:
            recommendations.append(
                "CPU 使用率持續偏高,建議實作快取或使用更高效的演算法"
            )
        
        recent_memory = [r['memory']['percent'] for r in self.resource_history[-10:]]
        avg_memory = sum(recent_memory) / len(recent_memory)
        
        if avg_memory > 70:
            recommendations.append(
                "記憶體使用率持續偏高,建議優化記憶體使用或增加資源"
            )
        
        return recommendations
    
    def estimate_optimal_resources(self) -> Dict[str, Any]:
        """估算最佳資源配置"""
        if len(self.resource_history) < 20:
            return {
                'message': '數據不足,需要更多監控數據'
            }
        
        # 計算 95th percentile
        recent_cpu = sorted([r['cpu']['percent'] for r in self.resource_history])
        recent_memory = sorted([r['memory']['percent'] for r in self.resource_history])
        
        p95_index = int(len(recent_cpu) * 0.95)
        
        p95_cpu = recent_cpu[p95_index]
        p95_memory = recent_memory[p95_index]
        
        # 建議配置(保留 20% 緩衝)
        current_cpu_count = psutil.cpu_count()
        current_memory_gb = psutil.virtual_memory().total / (1024**3)
        
        recommended_cpu = max(1, int(current_cpu_count * (p95_cpu / 100) * 1.2))
        recommended_memory = current_memory_gb * (p95_memory / 100) * 1.2
        
        return {
            'current': {
                'cpu_count': current_cpu_count,
                'memory_gb': current_memory_gb
            },
            'p95_usage': {
                'cpu_percent': p95_cpu,
                'memory_percent': p95_memory
            },
            'recommended': {
                'cpu_count': recommended_cpu,
                'memory_gb': recommended_memory
            },
            'potential_savings': self._calculate_savings(
                current_cpu_count,
                recommended_cpu,
                current_memory_gb,
                recommended_memory
            )
        }
    
    def _calculate_savings(self, current_cpu: int, rec_cpu: int,
                          current_mem: float, rec_mem: float) -> Dict[str, float]:
        """計算潛在節省"""
        # 簡化的成本計算(實際需根據雲端服務商定價)
        cpu_cost_per_unit = 20  # 每個 vCPU 每月
        mem_cost_per_gb = 5     # 每 GB 每月
        
        current_cost = (current_cpu * cpu_cost_per_unit + 
                       current_mem * mem_cost_per_gb)
        recommended_cost = (rec_cpu * cpu_cost_per_unit + 
                          rec_mem * mem_cost_per_gb)
        
        savings = current_cost - recommended_cost
        savings_percent = (savings / current_cost * 100) if current_cost > 0 else 0
        
        return {
            'monthly_savings': max(0, savings),
            'savings_percent': max(0, savings_percent)
        }

resource_manager = ResourceManager()

5. 整合到 FastAPI (app/main.py 更新)

from app.cost_tracker import cost_tracker
from app.smart_cache import smart_cache
from app.token_optimizer import token_optimizer
from app.resource_manager import resource_manager
import google.generativeai as genai

# 優化的 AI 呼叫
@smart_cache.cached_ai_call(ttl=3600, cost_per_call=0.001)
async def optimized_ai_call(prompt: str, model: str = "gemini-2.0-flash-exp"):
    """優化的 AI 呼叫"""
    start_time = time.time()
    
    # Token 優化
    optimization = token_optimizer.optimize_prompt(prompt)
    optimized_prompt = optimization['optimized_prompt']
    
    # 記錄優化效果
    if optimization['reduction'] > 0:
        print(f"✂️ Token 優化:減少 {optimization['reduction']} tokens")
    
    try:
        # 呼叫 Gemini
        gemini_model = genai.GenerativeModel(model)
        response = gemini_model.generate_content(optimized_prompt)
        
        # 估算 token 使用
        input_tokens = token_optimizer.count_tokens(optimized_prompt)
        output_tokens = token_optimizer.count_tokens(response.text)
        
        # 追蹤成本
        cost = cost_tracker.track_api_call(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            metadata={
                'duration': time.time() - start_time,
                'tokens_saved': optimization['reduction']
            }
        )
        
        print(f"💰 本次呼叫成本:${cost:.6f}")
        
        return response.text
        
    except Exception as e:
        print(f"❌ AI 呼叫失敗:{e}")
        raise

# 成本監控端點
@app.get("/cost/dashboard")
async def cost_dashboard(current_user: User = Depends(get_current_active_admin)):
    """成本監控儀表板"""
    return {
        'daily_report': cost_tracker.get_daily_report(),
        'monthly_projection': cost_tracker.get_monthly_projection(),
        'top_consumers': cost_tracker.get_top_consumers(),
        'cache_stats': smart_cache.get_stats(),
        'resource_usage': resource_manager.get_resource_usage(),
        'optimization_recommendations': resource_manager.get_optimization_recommendations()
    }

@app.get("/cost/optimize")
async def cost_optimization_suggestions(
    current_user: User = Depends(get_current_active_admin)
):
    """成本優化建議"""
    return {
        'resource_optimization': resource_manager.estimate_optimal_resources(),
        'cache_effectiveness': smart_cache.get_stats(),
        'token_optimization': {
            'enabled': True,
            'avg_reduction': '15-25%',
            'monthly_savings_estimate': smart_cache.saved_cost * 30
        },
        'recommendations': [
            {
                'priority': 'high',
                'action': '實作更積極的快取策略',
                'potential_savings': '30-40%',
                'effort': 'medium'
            },
            {
                'priority': 'medium',
                'action': '優化提示詞模板',
                'potential_savings': '15-20%',
                'effort': 'low'
            },
            {
                'priority': 'medium',
                'action': '調整資源配置',
                'potential_savings': '10-15%',
                'effort': 'low'
            }
        ]
    }

💡 成本優化策略

1. API 呼叫優化

# ❌ 未優化
response = model.generate_content("""
    這是一個非常冗長的提示詞,包含了很多不必要的說明...
    重複的內容...
    冗餘的範例...
""")

# ✅ 優化後
response = model.generate_content("""
    簡潔的提示:核心需求
    必要範例:示例1
    預期輸出:格式說明
""")

# 節省:30-50% tokens

2. 快取策略

## 快取優先級

### Tier 1: 長期快取 (24小時+)
- 靜態內容(FAQ、產品資訊)
- 不常變動的分析結果
- 標準化的回應模板

### Tier 2: 中期快取 (1-6小時)
- 個性化但不頻繁變動的內容
- 統計分析結果
- 使用者偏好

### Tier 3: 短期快取 (5-30分鐘)
- 即時查詢結果
- 動態生成的內容
- 會話資料

3. 批次處理

# ❌ 逐筆處理
for item in items:
    result = await ai_call(item)  # 100 次呼叫

# ✅ 批次處理
batch_prompt = f"處理以下項目:{items}"
results = await ai_call(batch_prompt)  # 1 次呼叫

# 節省:90% 呼叫次數

4. 模型選擇

# 根據任務複雜度選擇模型
task_models = {
    'simple': 'gemini-2.0-flash-exp',  # 最便宜
    'moderate': 'gemini-pro',           # 平衡
    'complex': 'gemini-pro-vision',     # 最強但最貴
}

# 動態選擇
def select_model(task_complexity: str) -> str:
    return task_models.get(task_complexity, 'gemini-2.0-flash-exp')

📊 成本監控儀表板範例

{
  "summary": {
    "today_cost": 1.25,
    "yesterday_cost": 1.18,
    "month_to_date": 35.40,
    "projected_monthly": 42.00,
    "vs_last_month": "-15%"
  },
  "breakdown": {
    "gemini_api": {
      "cost": 0.85,
      "percentage": 68%,
      "calls": 1250,
      "tokens": 850000
    },
    "cloud_run": {
      "cost": 0.30,
      "percentage": 24%
    },
    "storage": {
      "cost": 0.10,
      "percentage": 8%
    }
  },
  "optimization_impact": {
    "cache_hit_rate": 45%,
    "tokens_saved": 125000,
    "cost_saved": 0.15,
    "monthly_savings_projection": 4.50
  },
  "alerts": [
    {
      "type": "cost_spike",
      "message": "API 呼叫比昨天增加 30%",
      "action": "檢查是否有異常流量"
    }
  ]
}

🎯 今日總結

今天我們建立了完整的成本控制系統:

成本追蹤:詳細記錄每筆開銷
智能快取:減少重複的 API 呼叫
Token 優化:自動優化提示詞
資源監控:即時追蹤資源使用
自動建議:AI 驅動的優化建議


上一篇
Day 25: 安全性考量與權限管理
下一篇
Day 27: 使用者體驗優化
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言