經過前面 25 天的學習,從 AWS 基礎設施到量化交易理論,現在要開始真正的實作了!就像爸爸決定要建設現代化農場一樣,我們需要一個完整的實作計畫,把所有學到的知識整合起來,建立一個真正能運作的量化交易系統。
交易引擎是整個系統的核心,負責執行交易決策:
# src/trading_engine/core.py
import asyncio
import logging
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from .strategy_manager import StrategyManager
from .risk_manager import RiskManager
from .order_manager import OrderManager
from .portfolio_manager import PortfolioManager
from ..data.market_data import MarketDataCollector
from ..utils.config import Config
from ..utils.logging import setup_logging
class TradingEngine:
"""交易引擎主類"""
def __init__(self, config: Config):
self.config = config
self.logger = setup_logging(__name__)
# 初始化各個管理器
self.strategy_manager = StrategyManager(config)
self.risk_manager = RiskManager(config)
self.order_manager = OrderManager(config)
self.portfolio_manager = PortfolioManager(config)
self.market_data = MarketDataCollector(config)
# 系統狀態
self.is_running = False
self.last_health_check = datetime.now()
async def start(self):
"""啟動交易引擎"""
self.logger.info("Starting Trading Engine...")
try:
# 初始化各個組件
await self.market_data.connect()
await self.order_manager.initialize()
await self.portfolio_manager.load_positions()
# 啟動策略
await self.strategy_manager.start_strategies()
self.is_running = True
self.logger.info("Trading Engine started successfully")
# 開始主循環
await self.main_loop()
except Exception as e:
self.logger.error(f"Failed to start Trading Engine: {e}")
await self.shutdown()
async def main_loop(self):
"""主要執行循環"""
while self.is_running:
try:
# 1. 獲取市場數據
market_data = await self.market_data.get_latest_data()
# 2. 更新投資組合
await self.portfolio_manager.update(market_data)
# 3. 生成交易信號
signals = await self.strategy_manager.generate_signals(market_data)
# 4. 風險檢查
validated_signals = await self.risk_manager.validate_signals(
signals, self.portfolio_manager.get_current_positions()
)
# 5. 執行交易
if validated_signals:
await self.order_manager.execute_signals(validated_signals)
# 6. 健康檢查
await self.health_check()
# 7. 等待下一個週期
await asyncio.sleep(self.config.trading.loop_interval)
except Exception as e:
self.logger.error(f"Error in main loop: {e}")
await asyncio.sleep(10) # 錯誤後等待較長時間
async def health_check(self):
"""系統健康檢查"""
current_time = datetime.now()
# 檢查各組件狀態
health_status = {
'market_data': await self.market_data.is_healthy(),
'order_manager': await self.order_manager.is_healthy(),
'portfolio': self.portfolio_manager.is_healthy(),
'strategies': self.strategy_manager.is_healthy(),
'last_update': current_time
}
# 記錄健康狀況
self.logger.info(f"Health check: {health_status}")
# 如果有組件不健康,發送警告
unhealthy_components = [k for k, v in health_status.items()
if isinstance(v, bool) and not v]
if unhealthy_components:
await self.send_alert(f"Unhealthy components: {unhealthy_components}")
self.last_health_check = current_time
async def send_alert(self, message: str):
"""發送警告訊息"""
# 實作 Telegram 通知
pass
async def shutdown(self):
"""優雅關閉系統"""
self.logger.info("Shutting down Trading Engine...")
self.is_running = False
# 關閉各個組件
await self.strategy_manager.stop()
await self.order_manager.close_all_positions()
await self.market_data.disconnect()
self.logger.info("Trading Engine shutdown complete")
# src/main.py
import asyncio
from trading_engine.core import TradingEngine
from utils.config import load_config
async def main():
"""主程式入口"""
# 載入配置
config = load_config()
# 創建交易引擎
engine = TradingEngine(config)
try:
# 啟動引擎
await engine.start()
except KeyboardInterrupt:
print("Received shutdown signal")
finally:
await engine.shutdown()
if __name__ == "__main__":
asyncio.run(main())
# src/trading_engine/strategy_manager.py
from typing import Dict, List
import importlib
from abc import ABC, abstractmethod
class BaseStrategy(ABC):
"""策略基類"""
def __init__(self, config):
self.config = config
self.name = self.__class__.__name__
self.enabled = True
@abstractmethod
async def generate_signal(self, market_data):
"""生成交易信號"""
pass
@abstractmethod
def get_position_size(self, signal_strength):
"""計算倉位大小"""
pass
class ArbitrageStrategy(BaseStrategy):
"""期現套利策略"""
def __init__(self, config):
super().__init__(config)
self.entry_threshold = config.strategies.arbitrage.entry_threshold
self.exit_threshold = config.strategies.arbitrage.exit_threshold
async def generate_signal(self, market_data):
"""生成套利信號"""
spot_price = market_data['spot']['BTCUSDT']['price']
futures_price = market_data['futures']['BTCUSDT']['price']
# 計算基差
basis = (futures_price - spot_price) / spot_price
signal = {
'strategy': self.name,
'symbol': 'BTCUSDT',
'basis': basis,
'timestamp': market_data['timestamp']
}
if basis > self.entry_threshold:
# 期貨溢價過高,做空期貨,做多現貨
signal.update({
'action': 'arbitrage_short_futures',
'spot_side': 'buy',
'futures_side': 'sell',
'confidence': min(basis / self.entry_threshold, 2.0)
})
elif basis < -self.entry_threshold:
# 期貨貼水過深,做多期貨,做空現貨
signal.update({
'action': 'arbitrage_long_futures',
'spot_side': 'sell',
'futures_side': 'buy',
'confidence': min(abs(basis) / self.entry_threshold, 2.0)
})
elif abs(basis) < self.exit_threshold:
# 基差回歸,平倉
signal.update({
'action': 'close_arbitrage',
'confidence': 1.0
})
else:
signal['action'] = 'hold'
return signal
def get_position_size(self, signal_strength):
"""計算套利倉位大小"""
base_size = self.config.risk.max_position_size
return base_size * signal_strength * 0.5 # 套利策略使用較小倉位
class StrategyManager:
"""策略管理器"""
def __init__(self, config):
self.config = config
self.strategies = {}
self.load_strategies()
def load_strategies(self):
"""動態載入策略"""
strategy_configs = self.config.strategies
for strategy_name, strategy_config in strategy_configs.items():
if strategy_config.get('enabled', False):
try:
# 動態導入策略類
module_path = f"strategies.{strategy_name}"
module = importlib.import_module(module_path)
strategy_class = getattr(module, f"{strategy_name.title()}Strategy")
# 創建策略實例
strategy = strategy_class(self.config)
self.strategies[strategy_name] = strategy
except Exception as e:
logging.error(f"Failed to load strategy {strategy_name}: {e}")
async def start_strategies(self):
"""啟動所有策略"""
for name, strategy in self.strategies.items():
logging.info(f"Starting strategy: {name}")
async def generate_signals(self, market_data):
"""生成所有策略的信號"""
all_signals = []
for name, strategy in self.strategies.items():
try:
signal = await strategy.generate_signal(market_data)
if signal and signal.get('action') != 'hold':
all_signals.append(signal)
except Exception as e:
logging.error(f"Error generating signal for {name}: {e}")
return all_signals
def is_healthy(self):
"""檢查策略管理器健康狀態"""
return len(self.strategies) > 0 and all(
strategy.enabled for strategy in self.strategies.values()
)
# src/trading_engine/risk_manager.py
import logging
from typing import List, Dict
from datetime import datetime, timedelta
class RiskManager:
"""風險管理器"""
def __init__(self, config):
self.config = config
self.max_portfolio_risk = config.risk.max_portfolio_risk
self.max_position_size = config.risk.max_position_size
self.max_daily_loss = config.risk.max_daily_loss
self.max_drawdown = config.risk.max_drawdown
self.daily_pnl = 0
self.portfolio_high_water_mark = 0
async def validate_signals(self, signals: List[Dict], current_positions: Dict):
"""驗證交易信號"""
validated_signals = []
for signal in signals:
# 1. 檢查單一倉位限制
if not self._check_position_size_limit(signal, current_positions):
logging.warning(f"Signal rejected: position size limit exceeded for {signal}")
continue
# 2. 檢查投資組合風險
if not self._check_portfolio_risk(signal, current_positions):
logging.warning(f"Signal rejected: portfolio risk limit exceeded for {signal}")
continue
# 3. 檢查日損失限制
if not self._check_daily_loss_limit():
logging.warning(f"Signal rejected: daily loss limit reached")
continue
# 4. 檢查最大回撤
if not self._check_max_drawdown():
logging.warning(f"Signal rejected: max drawdown exceeded")
continue
# 5. 檢查市場條件
if not self._check_market_conditions(signal):
logging.warning(f"Signal rejected: poor market conditions for {signal}")
continue
validated_signals.append(signal)
return validated_signals
def _check_position_size_limit(self, signal: Dict, current_positions: Dict):
"""檢查單一倉位限制"""
symbol = signal.get('symbol')
if not symbol:
return False
# 計算新倉位大小
current_size = current_positions.get(symbol, {}).get('size', 0)
signal_size = signal.get('size', 0)
new_total_size = abs(current_size + signal_size)
# 檢查是否超過限制
return new_total_size <= self.max_position_size
def _check_portfolio_risk(self, signal: Dict, current_positions: Dict):
"""檢查投資組合風險"""
# 計算當前投資組合風險
total_exposure = sum(
abs(pos.get('size', 0) * pos.get('price', 0))
for pos in current_positions.values()
)
# 計算新信號的風險敞口
signal_exposure = abs(signal.get('size', 0) * signal.get('price', 1))
# 檢查總風險是否超過限制
total_portfolio_value = self._get_portfolio_value()
total_risk = (total_exposure + signal_exposure) / total_portfolio_value
return total_risk <= self.max_portfolio_risk
def _check_daily_loss_limit(self):
"""檢查日損失限制"""
return self.daily_pnl > -self.max_daily_loss
def _check_max_drawdown(self):
"""檢查最大回撤"""
current_value = self._get_portfolio_value()
if current_value > self.portfolio_high_water_mark:
self.portfolio_high_water_mark = current_value
drawdown = (self.portfolio_high_water_mark - current_value) / self.portfolio_high_water_mark
return drawdown <= self.max_drawdown
def _check_market_conditions(self, signal: Dict):
"""檢查市場條件"""
# 檢查波動性
# 檢查流動性
# 檢查時間限制
# 簡化實作:檢查交易時間
current_hour = datetime.now().hour
# 避免在特定時間交易(例如:維護時間)
if 2 <= current_hour <= 4: # UTC 2-4點維護時間
return False
return True
def _get_portfolio_value(self):
"""獲取投資組合總值"""
# 這裡應該從 PortfolioManager 獲取實際值
return 100000 # 暫時返回固定值
def update_daily_pnl(self, pnl: float):
"""更新日損益"""
self.daily_pnl += pnl
def reset_daily_metrics(self):
"""重置日指標"""
self.daily_pnl = 0
# src/data/market_data.py
import asyncio
import websocket
import json
import logging
from datetime import datetime
from typing import Dict, List
import pandas as pd
class MarketDataCollector:
"""市場資料收集器"""
def __init__(self, config):
self.config = config
self.ws_connections = {}
self.latest_data = {}
self.is_connected = False
async def connect(self):
"""連接到市場資料源"""
try:
# 連接到 Bybit WebSocket
await self._connect_bybit_ws()
# 連接到其他資料源
# await self._connect_other_sources()
self.is_connected = True
logging.info("Market data collector connected")
except Exception as e:
logging.error(f"Failed to connect market data: {e}")
raise
async def _connect_bybit_ws(self):
"""連接 Bybit WebSocket"""
# 現貨 WebSocket
spot_ws_url = "wss://stream.bybit.com/v5/public/spot"
# 期貨 WebSocket
futures_ws_url = "wss://stream.bybit.com/v5/public/linear"
# 創建連接(簡化實作)
# 實際實作需要使用 websockets 庫
pass
async def get_latest_data(self):
"""獲取最新市場資料"""
return {
'timestamp': datetime.now(),
'spot': self._get_spot_data(),
'futures': self._get_futures_data(),
'funding_rates': self._get_funding_rates()
}
def _get_spot_data(self):
"""獲取現貨資料"""
# 模擬資料,實際應從 WebSocket 獲取
return {
'BTCUSDT': {
'price': 50000.0,
'volume': 1000.0,
'bid': 49995.0,
'ask': 50005.0
}
}
def _get_futures_data(self):
"""獲取期貨資料"""
return {
'BTCUSDT': {
'price': 50100.0,
'volume': 2000.0,
'bid': 50095.0,
'ask': 50105.0,
'funding_rate': 0.0001
}
}
def _get_funding_rates(self):
"""獲取資金費率"""
return {
'BTCUSDT': {
'current_rate': 0.0001,
'next_funding_time': datetime.now(),
'predicted_rate': 0.00015
}
}
async def is_healthy(self):
"""檢查資料收集器健康狀態"""
return self.is_connected and len(self.latest_data) > 0
async def disconnect(self):
"""斷開連接"""
for connection in self.ws_connections.values():
await connection.close()
self.is_connected = False
logging.info("Market data collector disconnected")
trading-bot/
├── src/
│ ├── trading_engine/
│ │ ├── __init__.py
│ │ ├── core.py # 交易引擎核心
│ │ ├── strategy_manager.py # 策略管理
│ │ ├── risk_manager.py # 風險管理
│ │ ├── order_manager.py # 訂單管理
│ │ └── portfolio_manager.py # 投資組合管理
│ │
│ ├── strategies/
│ │ ├── __init__.py
│ │ ├── base.py # 策略基類
│ │ ├── arbitrage.py # 套利策略
│ │ ├── momentum.py # 動量策略
│ │ └── mean_reversion.py # 均值回歸策略
│ │
│ ├── data/
│ │ ├── __init__.py
│ │ ├── market_data.py # 市場資料收集
│ │ ├── storage.py # 資料儲存
│ │ └── analysis.py # 資料分析
│ │
│ ├── api/
│ │ ├── __init__.py
│ │ ├── bybit_client.py # Bybit API 客戶端
│ │ └── telegram_bot.py # Telegram 機器人
│ │
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── config.py # 配置管理
│ │ ├── logging.py # 日誌配置
│ │ ├── metrics.py # 指標計算
│ │ └── helpers.py # 輔助函數
│ │
│ ├── web/
│ │ ├── __init__.py
│ │ ├── app.py # Flask Web 應用
│ │ ├── dashboard.py # 監控面板
│ │ └── api_routes.py # API 路由
│ │
│ └── main.py # 主程式入口
│
├── infrastructure/
│ ├── terraform/ # Terraform 腳本
│ ├── docker/ # Docker 配置
│ └── k8s/ # Kubernetes 配置
│
├── tests/
│ ├── unit/ # 單元測試
│ ├── integration/ # 整合測試
│ └── e2e/ # 端到端測試
│
├── config/
│ ├── development.yaml # 開發環境配置
│ ├── staging.yaml # 測試環境配置
│ └── production.yaml # 生產環境配置
│
├── scripts/
│ ├── deploy.sh # 部署腳本
│ ├── backup.sh # 備份腳本
│ └── health_check.sh # 健康檢查腳本
│
├── docs/
│ ├── api.md # API 文件
│ ├── deployment.md # 部署指南
│ └── troubleshooting.md # 故障排除
│
├── Dockerfile # Docker 映像檔定義
├── docker-compose.yml # 本地開發環境
├── requirements.txt # Python 依賴
├── .github/workflows/ # GitHub Actions
└── README.md # 專案說明
今天我們設計了完整的量化交易系統實作架構,就像為現代化農場畫好了藍圖。重要的設計原則:
系統設計原則:
技術架構特點:
開發策略:
明天我們將開始實際編寫程式碼,從 Bybit API 整合開始!
下一篇:Day 27 - Bybit API key for Testnet