今天要學習如何在 AWS S3 上管理交易配置,這就像爸爸把農場的作業手冊存放在雲端一樣。不同季節、不同作物需要不同的操作方式,我們要能隨時從雲端取得最新的配置,確保所有農場工人都能按照最新的標準作業!
import boto3
import json
import hashlib
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import logging
from botocore.exceptions import ClientError
import jsonschema
from jsonschema import validate
import threading
import asyncio
class S3ConfigManager:
"""S3 配置管理器"""
def __init__(self, bucket_name: str, region: str = 'us-east-1'):
self.bucket_name = bucket_name
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.local_cache = {}
self.cache_timestamps = {}
self.config_schemas = self._define_config_schemas()
self.logger = logging.getLogger(__name__)
# 配置熱更新
self.auto_update = True
self.update_interval = 300 # 5分鐘檢查一次
self._update_thread = None
def _define_config_schemas(self) -> Dict:
"""定義配置文件的 JSON Schema"""
schemas = {
'strategy-config': {
"type": "object",
"properties": {
"strategies": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"enabled": {"type": "boolean"},
"symbol": {"type": "string"},
"parameters": {"type": "object"},
"risk_limits": {
"type": "object",
"properties": {
"max_position_size": {"type": "number", "minimum": 0},
"stop_loss_pct": {"type": "number", "minimum": 0, "maximum": 1},
"take_profit_pct": {"type": "number", "minimum": 0}
},
"required": ["max_position_size"]
}
},
"required": ["name", "enabled", "symbol", "parameters", "risk_limits"]
}
}
},
"required": ["strategies"]
},
'risk-params': {
"type": "object",
"properties": {
"global_limits": {
"type": "object",
"properties": {
"max_daily_loss": {"type": "number", "minimum": 0},
"max_total_exposure": {"type": "number", "minimum": 0},
"max_positions": {"type": "integer", "minimum": 1},
"emergency_stop_loss": {"type": "number", "minimum": 0, "maximum": 1}
},
"required": ["max_daily_loss", "max_total_exposure", "max_positions"]
},
"position_sizing": {
"type": "object",
"properties": {
"default_size_pct": {"type": "number", "minimum": 0, "maximum": 1},
"max_size_pct": {"type": "number", "minimum": 0, "maximum": 1},
"volatility_adjustment": {"type": "boolean"}
},
"required": ["default_size_pct", "max_size_pct"]
}
},
"required": ["global_limits", "position_sizing"]
},
'symbols-config': {
"type": "object",
"properties": {
"trading_pairs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"symbol": {"type": "string"},
"enabled": {"type": "boolean"},
"min_notional": {"type": "number", "minimum": 0},
"tick_size": {"type": "number", "minimum": 0},
"lot_size": {"type": "number", "minimum": 0},
"commission_rate": {"type": "number", "minimum": 0, "maximum": 1}
},
"required": ["symbol", "enabled", "min_notional", "tick_size", "lot_size"]
}
}
},
"required": ["trading_pairs"]
},
'notification-config': {
"type": "object",
"properties": {
"telegram": {
"type": "object",
"properties": {
"enabled": {"type": "boolean"},
"bot_token": {"type": "string"},
"chat_id": {"type": "string"},
"notification_levels": {
"type": "array",
"items": {"type": "string", "enum": ["info", "warning", "error", "critical"]}
}
},
"required": ["enabled"]
},
"email": {
"type": "object",
"properties": {
"enabled": {"type": "boolean"},
"smtp_server": {"type": "string"},
"smtp_port": {"type": "integer"},
"sender_email": {"type": "string"},
"recipient_emails": {"type": "array", "items": {"type": "string"}}
},
"required": ["enabled"]
}
},
"required": ["telegram", "email"]
}
}
return schemas
def upload_config(self, config_name: str, config_data: Dict,
version: Optional[str] = None) -> bool:
"""上傳配置到 S3"""
try:
# 驗證配置格式
if not self._validate_config(config_name, config_data):
return False
# 添加元數據
config_with_metadata = {
'version': version or self._generate_version(),
'timestamp': datetime.utcnow().isoformat(),
'checksum': self._calculate_checksum(config_data),
'config': config_data
}
# 上傳到 S3
key = f"configs/{config_name}.json"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(config_with_metadata, indent=2),
ContentType='application/json',
Metadata={
'config-type': config_name,
'version': config_with_metadata['version'],
'timestamp': config_with_metadata['timestamp']
}
)
# 同時保存版本化副本
versioned_key = f"configs/versions/{config_name}-{config_with_metadata['version']}.json"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=versioned_key,
Body=json.dumps(config_with_metadata, indent=2),
ContentType='application/json'
)
self.logger.info(f"配置 {config_name} 上傳成功,版本: {config_with_metadata['version']}")
return True
except Exception as e:
self.logger.error(f"上傳配置 {config_name} 失敗: {e}")
return False
def download_config(self, config_name: str, use_cache: bool = True) -> Optional[Dict]:
"""從 S3 下載配置"""
# 檢查本地緩存
if use_cache and config_name in self.local_cache:
cache_time = self.cache_timestamps.get(config_name, 0)
if time.time() - cache_time < 300: # 5分鐘緩存
return self.local_cache[config_name]
try:
key = f"configs/{config_name}.json"
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=key
)
config_content = json.loads(response['Body'].read().decode('utf-8'))
# 驗證校驗和
stored_checksum = config_content.get('checksum')
calculated_checksum = self._calculate_checksum(config_content['config'])
if stored_checksum != calculated_checksum:
self.logger.warning(f"配置 {config_name} 校驗和不匹配")
return None
# 更新本地緩存
self.local_cache[config_name] = config_content['config']
self.cache_timestamps[config_name] = time.time()
self.logger.info(f"配置 {config_name} 下載成功,版本: {config_content.get('version')}")
return config_content['config']
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
self.logger.warning(f"配置文件 {config_name} 不存在")
else:
self.logger.error(f"下載配置 {config_name} 失敗: {e}")
return None
except Exception as e:
self.logger.error(f"處理配置 {config_name} 時發生錯誤: {e}")
return None
def list_config_versions(self, config_name: str) -> List[Dict]:
"""列出配置的所有版本"""
try:
prefix = f"configs/versions/{config_name}-"
response = self.s3_client.list_objects_v2(
Bucket=self.bucket_name,
Prefix=prefix
)
versions = []
if 'Contents' in response:
for obj in response['Contents']:
# 獲取版本資訊
version_response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=obj['Key']
)
version_data = json.loads(version_response['Body'].read().decode('utf-8'))
versions.append({
'version': version_data.get('version'),
'timestamp': version_data.get('timestamp'),
'size': obj['Size'],
'last_modified': obj['LastModified'].isoformat(),
'key': obj['Key']
})
# 按時間戳排序
versions.sort(key=lambda x: x['timestamp'], reverse=True)
return versions
except Exception as e:
self.logger.error(f"列出配置版本失敗: {e}")
return []
def rollback_config(self, config_name: str, target_version: str) -> bool:
"""回滾配置到指定版本"""
try:
# 獲取目標版本的配置
versioned_key = f"configs/versions/{config_name}-{target_version}.json"
response = self.s3_client.get_object(
Bucket=self.bucket_name,
Key=versioned_key
)
version_data = json.loads(response['Body'].read().decode('utf-8'))
# 上傳為當前版本(添加新的時間戳和版本號)
rollback_version = f"{target_version}-rollback-{int(time.time())}"
return self.upload_config(
config_name,
version_data['config'],
rollback_version
)
except Exception as e:
self.logger.error(f"回滾配置 {config_name} 到版本 {target_version} 失敗: {e}")
return False
def _validate_config(self, config_name: str, config_data: Dict) -> bool:
"""驗證配置格式"""
if config_name not in self.config_schemas:
self.logger.warning(f"未找到配置 {config_name} 的驗證規則")
return True # 如果沒有定義schema,則通過驗證
try:
validate(instance=config_data, schema=self.config_schemas[config_name])
return True
except jsonschema.exceptions.ValidationError as e:
self.logger.error(f"配置 {config_name} 驗證失敗: {e.message}")
return False
def _calculate_checksum(self, data: Dict) -> str:
"""計算配置數據的校驗和"""
json_str = json.dumps(data, sort_keys=True)
return hashlib.md5(json_str.encode()).hexdigest()
def _generate_version(self) -> str:
"""生成版本號"""
timestamp = datetime.utcnow()
return timestamp.strftime("%Y%m%d-%H%M%S")
def start_auto_update(self):
"""啟動自動更新"""
if self._update_thread is None:
self.auto_update = True
self._update_thread = threading.Thread(target=self._auto_update_loop)
self._update_thread.daemon = True
self._update_thread.start()
self.logger.info("自動配置更新已啟動")
def stop_auto_update(self):
"""停止自動更新"""
self.auto_update = False
if self._update_thread:
self._update_thread.join(timeout=5)
self._update_thread = None
self.logger.info("自動配置更新已停止")
def _auto_update_loop(self):
"""自動更新循環"""
while self.auto_update:
try:
# 檢查並更新所有緩存的配置
for config_name in list(self.local_cache.keys()):
self.download_config(config_name, use_cache=False)
time.sleep(self.update_interval)
except Exception as e:
self.logger.error(f"自動更新過程中發生錯誤: {e}")
time.sleep(60) # 錯誤時等待更短時間
class TradingConfigManager:
"""交易配置管理器"""
def __init__(self, s3_manager: S3ConfigManager):
self.s3_manager = s3_manager
self.current_configs = {}
self.config_callbacks = {} # 配置變更回調
def load_all_configs(self) -> bool:
"""載入所有配置"""
config_names = ['strategy-config', 'risk-params', 'symbols-config', 'notification-config']
success = True
for config_name in config_names:
config = self.s3_manager.download_config(config_name)
if config:
self.current_configs[config_name] = config
self.logger.info(f"載入配置 {config_name} 成功")
else:
self.logger.error(f"載入配置 {config_name} 失敗")
success = False
return success
def get_strategy_config(self, strategy_name: str = None) -> Dict:
"""獲取策略配置"""
strategy_config = self.current_configs.get('strategy-config', {})
strategies = strategy_config.get('strategies', [])
if strategy_name:
for strategy in strategies:
if strategy['name'] == strategy_name:
return strategy
return {}
return strategy_config
def get_risk_params(self) -> Dict:
"""獲取風險參數"""
return self.current_configs.get('risk-params', {})
def get_symbol_config(self, symbol: str = None) -> Dict:
"""獲取交易對配置"""
symbols_config = self.current_configs.get('symbols-config', {})
trading_pairs = symbols_config.get('trading_pairs', [])
if symbol:
for pair in trading_pairs:
if pair['symbol'] == symbol:
return pair
return {}
return symbols_config
def get_notification_config(self) -> Dict:
"""獲取通知配置"""
return self.current_configs.get('notification-config', {})
def update_strategy_config(self, strategy_name: str, updates: Dict) -> bool:
"""更新策略配置"""
strategy_config = self.current_configs.get('strategy-config', {'strategies': []})
strategies = strategy_config['strategies']
# 查找並更新策略
strategy_found = False
for i, strategy in enumerate(strategies):
if strategy['name'] == strategy_name:
strategies[i].update(updates)
strategy_found = True
break
if not strategy_found:
# 添加新策略
new_strategy = {'name': strategy_name}
new_strategy.update(updates)
strategies.append(new_strategy)
# 上傳更新後的配置
return self.s3_manager.upload_config('strategy-config', strategy_config)
def register_config_callback(self, config_name: str, callback_func):
"""註冊配置變更回調"""
if config_name not in self.config_callbacks:
self.config_callbacks[config_name] = []
self.config_callbacks[config_name].append(callback_func)
def _trigger_config_callbacks(self, config_name: str, new_config: Dict):
"""觸發配置變更回調"""
if config_name in self.config_callbacks:
for callback in self.config_callbacks[config_name]:
try:
callback(config_name, new_config)
except Exception as e:
self.logger.error(f"執行配置回調時發生錯誤: {e}")
# 配置範例和使用示例
def create_sample_configs():
"""創建範例配置"""
# 策略配置範例
strategy_config = {
"strategies": [
{
"name": "btc_momentum",
"enabled": True,
"symbol": "BTCUSDT",
"parameters": {
"fast_period": 10,
"slow_period": 30,
"signal_threshold": 0.02,
"position_hold_time": 3600
},
"risk_limits": {
"max_position_size": 0.1,
"stop_loss_pct": 0.05,
"take_profit_pct": 0.15
}
},
{
"name": "eth_mean_reversion",
"enabled": False,
"symbol": "ETHUSDT",
"parameters": {
"lookback_period": 20,
"deviation_threshold": 2.0,
"entry_threshold": 0.95,
"exit_threshold": 1.05
},
"risk_limits": {
"max_position_size": 0.15,
"stop_loss_pct": 0.03,
"take_profit_pct": 0.08
}
}
]
}
# 風險參數配置範例
risk_params = {
"global_limits": {
"max_daily_loss": 0.05,
"max_total_exposure": 0.8,
"max_positions": 5,
"emergency_stop_loss": 0.15
},
"position_sizing": {
"default_size_pct": 0.1,
"max_size_pct": 0.25,
"volatility_adjustment": True
}
}
# 交易對配置範例
symbols_config = {
"trading_pairs": [
{
"symbol": "BTCUSDT",
"enabled": True,
"min_notional": 10.0,
"tick_size": 0.01,
"lot_size": 0.00001,
"commission_rate": 0.001
},
{
"symbol": "ETHUSDT",
"enabled": True,
"min_notional": 10.0,
"tick_size": 0.01,
"lot_size": 0.0001,
"commission_rate": 0.001
}
]
}
# 通知配置範例
notification_config = {
"telegram": {
"enabled": True,
"bot_token": "your_bot_token_here",
"chat_id": "your_chat_id_here",
"notification_levels": ["warning", "error", "critical"]
},
"email": {
"enabled": False,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender_email": "your_email@gmail.com",
"recipient_emails": ["admin@yourcompany.com"]
}
}
return {
'strategy-config': strategy_config,
'risk-params': risk_params,
'symbols-config': symbols_config,
'notification-config': notification_config
}
def demo_config_management():
"""演示配置管理系統"""
print("🔧 S3 配置管理系統演示")
print("=" * 50)
# 注意:需要設置正確的 S3 bucket 和 AWS 憑證
bucket_name = "your-trading-config-bucket"
try:
# 創建 S3 配置管理器
s3_manager = S3ConfigManager(bucket_name)
# 創建交易配置管理器
config_manager = TradingConfigManager(s3_manager)
# 1. 上傳範例配置
print("\n📤 上傳範例配置...")
sample_configs = create_sample_configs()
for config_name, config_data in sample_configs.items():
success = s3_manager.upload_config(config_name, config_data)
if success:
print(f" ✅ {config_name} 上傳成功")
else:
print(f" ❌ {config_name} 上傳失敗")
# 2. 載入所有配置
print("\n📥 載入所有配置...")
if config_manager.load_all_configs():
print(" ✅ 所有配置載入成功")
else:
print(" ❌ 部分配置載入失敗")
# 3. 查詢特定配置
print("\n🔍 查詢特定配置...")
btc_strategy = config_manager.get_strategy_config('btc_momentum')
if btc_strategy:
print(f" 📊 BTC 動量策略配置:")
print(f" 啟用狀態: {btc_strategy['enabled']}")
print(f" 最大倉位: {btc_strategy['risk_limits']['max_position_size']}")
print(f" 止損比例: {btc_strategy['risk_limits']['stop_loss_pct']}")
# 4. 更新配置
print("\n✏️ 更新策略配置...")
update_success = config_manager.update_strategy_config(
'btc_momentum',
{'enabled': False, 'parameters': {'fast_period': 12}}
)
if update_success:
print(" ✅ 策略配置更新成功")
else:
print(" ❌ 策略配置更新失敗")
# 5. 列出配置版本
print("\n📋 列出配置版本...")
versions = s3_manager.list_config_versions('strategy-config')
if versions:
print(f" 找到 {len(versions)} 個版本:")
for version in versions[:3]: # 只顯示最新的3個版本
print(f" 版本: {version['version']}, 時間: {version['timestamp']}")
print("\n✅ 配置管理系統演示完成!")
except Exception as e:
print(f"❌ 演示過程中發生錯誤: {e}")
print("請確保已正確設置 AWS 憑證和 S3 bucket")
# 執行演示
demo_config_management()
class ConfigHotReloader:
"""配置熱更新器"""
def __init__(self, config_manager: TradingConfigManager):
self.config_manager = config_manager
self.update_handlers = {}
self.last_update_time = {}
def register_update_handler(self, config_type: str, handler_func):
"""註冊配置更新處理器"""
if config_type not in self.update_handlers:
self.update_handlers[config_type] = []
self.update_handlers[config_type].append(handler_func)
async def check_and_reload_configs(self):
"""檢查並重新載入配置"""
config_types = ['strategy-config', 'risk-params', 'symbols-config', 'notification-config']
for config_type in config_types:
try:
# 檢查 S3 上的配置是否有更新
if await self._config_updated(config_type):
print(f"🔄 檢測到配置 {config_type} 更新,開始重新載入...")
# 重新載入配置
new_config = self.config_manager.s3_manager.download_config(config_type, use_cache=False)
if new_config:
# 更新本地配置
old_config = self.config_manager.current_configs.get(config_type)
self.config_manager.current_configs[config_type] = new_config
# 觸發更新處理器
await self._trigger_update_handlers(config_type, old_config, new_config)
self.last_update_time[config_type] = time.time()
print(f"✅ 配置 {config_type} 熱更新完成")
else:
print(f"❌ 重新載入配置 {config_type} 失敗")
except Exception as e:
print(f"❌ 檢查配置 {config_type} 時發生錯誤: {e}")
async def _config_updated(self, config_type: str) -> bool:
"""檢查配置是否已更新"""
try:
# 獲取 S3 上配置文件的最後修改時間
key = f"configs/{config_type}.json"
response = self.config_manager.s3_manager.s3_client.head_object(
Bucket=self.config_manager.s3_manager.bucket_name,
Key=key
)
s3_last_modified = response['LastModified'].timestamp()
local_last_update = self.last_update_time.get(config_type, 0)
return s3_last_modified > local_last_update
except Exception as e:
print(f"檢查配置更新時發生錯誤: {e}")
return False
async def _trigger_update_handlers(self, config_type: str, old_config: Dict, new_config: Dict):
"""觸發配置更新處理器"""
if config_type in self.update_handlers:
for handler in self.update_handlers[config_type]:
try:
if asyncio.iscoroutinefunction(handler):
await handler(config_type, old_config, new_config)
else:
handler(config_type, old_config, new_config)
except Exception as e:
print(f"執行更新處理器時發生錯誤: {e}")
# 配置更新處理器範例
class TradingSystemConfigHandler:
"""交易系統配置處理器"""
def __init__(self, trading_system):
self.trading_system = trading_system
async def handle_strategy_config_update(self, config_type: str, old_config: Dict, new_config: Dict):
"""處理策略配置更新"""
print(f"🔄 處理策略配置更新...")
# 比較策略變更
old_strategies = {s['name']: s for s in old_config.get('strategies', [])}
new_strategies = {s['name']: s for s in new_config.get('strategies', [])}
# 處理新增的策略
for name, strategy in new_strategies.items():
if name not in old_strategies:
print(f" ➕ 新增策略: {name}")
await self._add_strategy(strategy)
elif strategy != old_strategies[name]:
print(f" 🔧 更新策略: {name}")
await self._update_strategy(name, strategy)
# 處理刪除的策略
for name in old_strategies:
if name not in new_strategies:
print(f" ➖ 刪除策略: {name}")
await self._remove_strategy(name)
async def handle_risk_params_update(self, config_type: str, old_config: Dict, new_config: Dict):
"""處理風險參數更新"""
print(f"🛡️ 處理風險參數更新...")
# 更新全域風險限制
old_limits = old_config.get('global_limits', {})
new_limits = new_config.get('global_limits', {})
for param, new_value in new_limits.items():
old_value = old_limits.get(param)
if old_value != new_value:
print(f" 🔧 更新風險參數 {param}: {old_value} -> {new_value}")
await self._update_risk_limit(param, new_value)
async def _add_strategy(self, strategy: Dict):
"""新增策略"""
# 實作策略新增邏輯
pass
async def _update_strategy(self, name: str, strategy: Dict):
"""更新策略"""
# 實作策略更新邏輯
pass
async def _remove_strategy(self, name: str):
"""移除策略"""
# 實作策略移除邏輯
pass
async def _update_risk_limit(self, param: str, value: Any):
"""更新風險限制"""
# 實作風險限制更新邏輯
pass
# 使用範例
async def demo_hot_reload():
"""演示配置熱更新"""
print("🔥 配置熱更新演示")
print("=" * 30)
# 假設已有配置管理器
bucket_name = "your-trading-config-bucket"
s3_manager = S3ConfigManager(bucket_name)
config_manager = TradingConfigManager(s3_manager)
# 創建熱更新器
hot_reloader = ConfigHotReloader(config_manager)
# 創建配置處理器
trading_system = None # 假設的交易系統
config_handler = TradingSystemConfigHandler(trading_system)
# 註冊配置更新處理器
hot_reloader.register_update_handler('strategy-config', config_handler.handle_strategy_config_update)
hot_reloader.register_update_handler('risk-params', config_handler.handle_risk_params_update)
# 模擬配置熱更新檢查
print("開始配置熱更新檢查...")
await hot_reloader.check_and_reload_configs()
print("✅ 配置熱更新演示完成")
# 如果在異步環境中運行:
# await demo_hot_reload()
今天我們學習了如何在 AWS S3 上建立完整的交易配置管理系統,就像為農場建立了一套雲端作業手冊管理系統。重要概念包括:
配置管理核心:
熱更新機制:
安全性考量:
實用功能:
記住爸爸管理農場的原則:「好的管理系統要能隨時調整,但每次改動都要有記錄,出問題時要能快速回復」。配置管理是系統穩定運作的關鍵基礎!
明天我們將學習容器化服務的部署,完成整個系統的雲端化。
下一篇:Day 29 - Containerized Service