iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0

小明的數位農場建設計畫

經過前面 25 天的學習,從 AWS 基礎設施到量化交易理論,現在要開始真正的實作了!就像爸爸決定要建設現代化農場一樣,我們需要一個完整的實作計畫,把所有學到的知識整合起來,建立一個真正能運作的量化交易系統。

專案總體架構

系統整體設計

系統整體設計

核心功能模組

1. 交易引擎 (Trading Engine)

交易引擎是整個系統的核心,負責執行交易決策:

# src/trading_engine/core.py
import asyncio
import logging
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import pandas as pd

from .strategy_manager import StrategyManager
from .risk_manager import RiskManager
from .order_manager import OrderManager
from .portfolio_manager import PortfolioManager
from ..data.market_data import MarketDataCollector
from ..utils.config import Config
from ..utils.logging import setup_logging

class TradingEngine:
    """交易引擎主類"""
    
    def __init__(self, config: Config):
        self.config = config
        self.logger = setup_logging(__name__)
        
        # 初始化各個管理器
        self.strategy_manager = StrategyManager(config)
        self.risk_manager = RiskManager(config)
        self.order_manager = OrderManager(config)
        self.portfolio_manager = PortfolioManager(config)
        self.market_data = MarketDataCollector(config)
        
        # 系統狀態
        self.is_running = False
        self.last_health_check = datetime.now()
        
    async def start(self):
        """啟動交易引擎"""
        self.logger.info("Starting Trading Engine...")
        
        try:
            # 初始化各個組件
            await self.market_data.connect()
            await self.order_manager.initialize()
            await self.portfolio_manager.load_positions()
            
            # 啟動策略
            await self.strategy_manager.start_strategies()
            
            self.is_running = True
            self.logger.info("Trading Engine started successfully")
            
            # 開始主循環
            await self.main_loop()
            
        except Exception as e:
            self.logger.error(f"Failed to start Trading Engine: {e}")
            await self.shutdown()
    
    async def main_loop(self):
        """主要執行循環"""
        
        while self.is_running:
            try:
                # 1. 獲取市場數據
                market_data = await self.market_data.get_latest_data()
                
                # 2. 更新投資組合
                await self.portfolio_manager.update(market_data)
                
                # 3. 生成交易信號
                signals = await self.strategy_manager.generate_signals(market_data)
                
                # 4. 風險檢查
                validated_signals = await self.risk_manager.validate_signals(
                    signals, self.portfolio_manager.get_current_positions()
                )
                
                # 5. 執行交易
                if validated_signals:
                    await self.order_manager.execute_signals(validated_signals)
                
                # 6. 健康檢查
                await self.health_check()
                
                # 7. 等待下一個週期
                await asyncio.sleep(self.config.trading.loop_interval)
                
            except Exception as e:
                self.logger.error(f"Error in main loop: {e}")
                await asyncio.sleep(10)  # 錯誤後等待較長時間
    
    async def health_check(self):
        """系統健康檢查"""
        
        current_time = datetime.now()
        
        # 檢查各組件狀態
        health_status = {
            'market_data': await self.market_data.is_healthy(),
            'order_manager': await self.order_manager.is_healthy(),
            'portfolio': self.portfolio_manager.is_healthy(),
            'strategies': self.strategy_manager.is_healthy(),
            'last_update': current_time
        }
        
        # 記錄健康狀況
        self.logger.info(f"Health check: {health_status}")
        
        # 如果有組件不健康,發送警告
        unhealthy_components = [k for k, v in health_status.items() 
                              if isinstance(v, bool) and not v]
        
        if unhealthy_components:
            await self.send_alert(f"Unhealthy components: {unhealthy_components}")
        
        self.last_health_check = current_time
    
    async def send_alert(self, message: str):
        """發送警告訊息"""
        # 實作 Telegram 通知
        pass
    
    async def shutdown(self):
        """優雅關閉系統"""
        self.logger.info("Shutting down Trading Engine...")
        
        self.is_running = False
        
        # 關閉各個組件
        await self.strategy_manager.stop()
        await self.order_manager.close_all_positions()
        await self.market_data.disconnect()
        
        self.logger.info("Trading Engine shutdown complete")

# src/main.py
import asyncio
from trading_engine.core import TradingEngine
from utils.config import load_config

async def main():
    """主程式入口"""
    
    # 載入配置
    config = load_config()
    
    # 創建交易引擎
    engine = TradingEngine(config)
    
    try:
        # 啟動引擎
        await engine.start()
    except KeyboardInterrupt:
        print("Received shutdown signal")
    finally:
        await engine.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

2. 策略管理器 (Strategy Manager)

# src/trading_engine/strategy_manager.py
from typing import Dict, List
import importlib
from abc import ABC, abstractmethod

class BaseStrategy(ABC):
    """策略基類"""
    
    def __init__(self, config):
        self.config = config
        self.name = self.__class__.__name__
        self.enabled = True
        
    @abstractmethod
    async def generate_signal(self, market_data):
        """生成交易信號"""
        pass
    
    @abstractmethod
    def get_position_size(self, signal_strength):
        """計算倉位大小"""
        pass

class ArbitrageStrategy(BaseStrategy):
    """期現套利策略"""
    
    def __init__(self, config):
        super().__init__(config)
        self.entry_threshold = config.strategies.arbitrage.entry_threshold
        self.exit_threshold = config.strategies.arbitrage.exit_threshold
        
    async def generate_signal(self, market_data):
        """生成套利信號"""
        
        spot_price = market_data['spot']['BTCUSDT']['price']
        futures_price = market_data['futures']['BTCUSDT']['price']
        
        # 計算基差
        basis = (futures_price - spot_price) / spot_price
        
        signal = {
            'strategy': self.name,
            'symbol': 'BTCUSDT',
            'basis': basis,
            'timestamp': market_data['timestamp']
        }
        
        if basis > self.entry_threshold:
            # 期貨溢價過高,做空期貨,做多現貨
            signal.update({
                'action': 'arbitrage_short_futures',
                'spot_side': 'buy',
                'futures_side': 'sell',
                'confidence': min(basis / self.entry_threshold, 2.0)
            })
        elif basis < -self.entry_threshold:
            # 期貨貼水過深,做多期貨,做空現貨
            signal.update({
                'action': 'arbitrage_long_futures',
                'spot_side': 'sell',
                'futures_side': 'buy',
                'confidence': min(abs(basis) / self.entry_threshold, 2.0)
            })
        elif abs(basis) < self.exit_threshold:
            # 基差回歸,平倉
            signal.update({
                'action': 'close_arbitrage',
                'confidence': 1.0
            })
        else:
            signal['action'] = 'hold'
            
        return signal
    
    def get_position_size(self, signal_strength):
        """計算套利倉位大小"""
        base_size = self.config.risk.max_position_size
        return base_size * signal_strength * 0.5  # 套利策略使用較小倉位

class StrategyManager:
    """策略管理器"""
    
    def __init__(self, config):
        self.config = config
        self.strategies = {}
        self.load_strategies()
    
    def load_strategies(self):
        """動態載入策略"""
        
        strategy_configs = self.config.strategies
        
        for strategy_name, strategy_config in strategy_configs.items():
            if strategy_config.get('enabled', False):
                try:
                    # 動態導入策略類
                    module_path = f"strategies.{strategy_name}"
                    module = importlib.import_module(module_path)
                    strategy_class = getattr(module, f"{strategy_name.title()}Strategy")
                    
                    # 創建策略實例
                    strategy = strategy_class(self.config)
                    self.strategies[strategy_name] = strategy
                    
                except Exception as e:
                    logging.error(f"Failed to load strategy {strategy_name}: {e}")
    
    async def start_strategies(self):
        """啟動所有策略"""
        for name, strategy in self.strategies.items():
            logging.info(f"Starting strategy: {name}")
    
    async def generate_signals(self, market_data):
        """生成所有策略的信號"""
        
        all_signals = []
        
        for name, strategy in self.strategies.items():
            try:
                signal = await strategy.generate_signal(market_data)
                if signal and signal.get('action') != 'hold':
                    all_signals.append(signal)
            except Exception as e:
                logging.error(f"Error generating signal for {name}: {e}")
        
        return all_signals
    
    def is_healthy(self):
        """檢查策略管理器健康狀態"""
        return len(self.strategies) > 0 and all(
            strategy.enabled for strategy in self.strategies.values()
        )

3. 風險管理器 (Risk Manager)

# src/trading_engine/risk_manager.py
import logging
from typing import List, Dict
from datetime import datetime, timedelta

class RiskManager:
    """風險管理器"""
    
    def __init__(self, config):
        self.config = config
        self.max_portfolio_risk = config.risk.max_portfolio_risk
        self.max_position_size = config.risk.max_position_size
        self.max_daily_loss = config.risk.max_daily_loss
        self.max_drawdown = config.risk.max_drawdown
        
        self.daily_pnl = 0
        self.portfolio_high_water_mark = 0
        
    async def validate_signals(self, signals: List[Dict], current_positions: Dict):
        """驗證交易信號"""
        
        validated_signals = []
        
        for signal in signals:
            # 1. 檢查單一倉位限制
            if not self._check_position_size_limit(signal, current_positions):
                logging.warning(f"Signal rejected: position size limit exceeded for {signal}")
                continue
            
            # 2. 檢查投資組合風險
            if not self._check_portfolio_risk(signal, current_positions):
                logging.warning(f"Signal rejected: portfolio risk limit exceeded for {signal}")
                continue
            
            # 3. 檢查日損失限制
            if not self._check_daily_loss_limit():
                logging.warning(f"Signal rejected: daily loss limit reached")
                continue
            
            # 4. 檢查最大回撤
            if not self._check_max_drawdown():
                logging.warning(f"Signal rejected: max drawdown exceeded")
                continue
            
            # 5. 檢查市場條件
            if not self._check_market_conditions(signal):
                logging.warning(f"Signal rejected: poor market conditions for {signal}")
                continue
            
            validated_signals.append(signal)
        
        return validated_signals
    
    def _check_position_size_limit(self, signal: Dict, current_positions: Dict):
        """檢查單一倉位限制"""
        
        symbol = signal.get('symbol')
        if not symbol:
            return False
        
        # 計算新倉位大小
        current_size = current_positions.get(symbol, {}).get('size', 0)
        signal_size = signal.get('size', 0)
        new_total_size = abs(current_size + signal_size)
        
        # 檢查是否超過限制
        return new_total_size <= self.max_position_size
    
    def _check_portfolio_risk(self, signal: Dict, current_positions: Dict):
        """檢查投資組合風險"""
        
        # 計算當前投資組合風險
        total_exposure = sum(
            abs(pos.get('size', 0) * pos.get('price', 0)) 
            for pos in current_positions.values()
        )
        
        # 計算新信號的風險敞口
        signal_exposure = abs(signal.get('size', 0) * signal.get('price', 1))
        
        # 檢查總風險是否超過限制
        total_portfolio_value = self._get_portfolio_value()
        total_risk = (total_exposure + signal_exposure) / total_portfolio_value
        
        return total_risk <= self.max_portfolio_risk
    
    def _check_daily_loss_limit(self):
        """檢查日損失限制"""
        return self.daily_pnl > -self.max_daily_loss
    
    def _check_max_drawdown(self):
        """檢查最大回撤"""
        current_value = self._get_portfolio_value()
        if current_value > self.portfolio_high_water_mark:
            self.portfolio_high_water_mark = current_value
        
        drawdown = (self.portfolio_high_water_mark - current_value) / self.portfolio_high_water_mark
        return drawdown <= self.max_drawdown
    
    def _check_market_conditions(self, signal: Dict):
        """檢查市場條件"""
        
        # 檢查波動性
        # 檢查流動性
        # 檢查時間限制
        
        # 簡化實作:檢查交易時間
        current_hour = datetime.now().hour
        
        # 避免在特定時間交易(例如:維護時間)
        if 2 <= current_hour <= 4:  # UTC 2-4點維護時間
            return False
        
        return True
    
    def _get_portfolio_value(self):
        """獲取投資組合總值"""
        # 這裡應該從 PortfolioManager 獲取實際值
        return 100000  # 暫時返回固定值
    
    def update_daily_pnl(self, pnl: float):
        """更新日損益"""
        self.daily_pnl += pnl
    
    def reset_daily_metrics(self):
        """重置日指標"""
        self.daily_pnl = 0

4. 資料收集器 (Market Data Collector)

# src/data/market_data.py
import asyncio
import websocket
import json
import logging
from datetime import datetime
from typing import Dict, List
import pandas as pd

class MarketDataCollector:
    """市場資料收集器"""
    
    def __init__(self, config):
        self.config = config
        self.ws_connections = {}
        self.latest_data = {}
        self.is_connected = False
        
    async def connect(self):
        """連接到市場資料源"""
        
        try:
            # 連接到 Bybit WebSocket
            await self._connect_bybit_ws()
            
            # 連接到其他資料源
            # await self._connect_other_sources()
            
            self.is_connected = True
            logging.info("Market data collector connected")
            
        except Exception as e:
            logging.error(f"Failed to connect market data: {e}")
            raise
    
    async def _connect_bybit_ws(self):
        """連接 Bybit WebSocket"""
        
        # 現貨 WebSocket
        spot_ws_url = "wss://stream.bybit.com/v5/public/spot"
        
        # 期貨 WebSocket
        futures_ws_url = "wss://stream.bybit.com/v5/public/linear"
        
        # 創建連接(簡化實作)
        # 實際實作需要使用 websockets 庫
        pass
    
    async def get_latest_data(self):
        """獲取最新市場資料"""
        
        return {
            'timestamp': datetime.now(),
            'spot': self._get_spot_data(),
            'futures': self._get_futures_data(),
            'funding_rates': self._get_funding_rates()
        }
    
    def _get_spot_data(self):
        """獲取現貨資料"""
        
        # 模擬資料,實際應從 WebSocket 獲取
        return {
            'BTCUSDT': {
                'price': 50000.0,
                'volume': 1000.0,
                'bid': 49995.0,
                'ask': 50005.0
            }
        }
    
    def _get_futures_data(self):
        """獲取期貨資料"""
        
        return {
            'BTCUSDT': {
                'price': 50100.0,
                'volume': 2000.0,
                'bid': 50095.0,
                'ask': 50105.0,
                'funding_rate': 0.0001
            }
        }
    
    def _get_funding_rates(self):
        """獲取資金費率"""
        
        return {
            'BTCUSDT': {
                'current_rate': 0.0001,
                'next_funding_time': datetime.now(),
                'predicted_rate': 0.00015
            }
        }
    
    async def is_healthy(self):
        """檢查資料收集器健康狀態"""
        return self.is_connected and len(self.latest_data) > 0
    
    async def disconnect(self):
        """斷開連接"""
        
        for connection in self.ws_connections.values():
            await connection.close()
        
        self.is_connected = False
        logging.info("Market data collector disconnected")

專案結構設計

trading-bot/
├── src/
│   ├── trading_engine/
│   │   ├── __init__.py
│   │   ├── core.py                 # 交易引擎核心
│   │   ├── strategy_manager.py     # 策略管理
│   │   ├── risk_manager.py         # 風險管理
│   │   ├── order_manager.py        # 訂單管理
│   │   └── portfolio_manager.py    # 投資組合管理
│   │
│   ├── strategies/
│   │   ├── __init__.py
│   │   ├── base.py                 # 策略基類
│   │   ├── arbitrage.py            # 套利策略
│   │   ├── momentum.py             # 動量策略
│   │   └── mean_reversion.py       # 均值回歸策略
│   │
│   ├── data/
│   │   ├── __init__.py
│   │   ├── market_data.py          # 市場資料收集
│   │   ├── storage.py              # 資料儲存
│   │   └── analysis.py             # 資料分析
│   │
│   ├── api/
│   │   ├── __init__.py
│   │   ├── bybit_client.py         # Bybit API 客戶端
│   │   └── telegram_bot.py         # Telegram 機器人
│   │
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── config.py               # 配置管理
│   │   ├── logging.py              # 日誌配置
│   │   ├── metrics.py              # 指標計算
│   │   └── helpers.py              # 輔助函數
│   │
│   ├── web/
│   │   ├── __init__.py
│   │   ├── app.py                  # Flask Web 應用
│   │   ├── dashboard.py            # 監控面板
│   │   └── api_routes.py           # API 路由
│   │
│   └── main.py                     # 主程式入口
│
├── infrastructure/
│   ├── terraform/                  # Terraform 腳本
│   ├── docker/                     # Docker 配置
│   └── k8s/                        # Kubernetes 配置
│
├── tests/
│   ├── unit/                       # 單元測試
│   ├── integration/                # 整合測試
│   └── e2e/                        # 端到端測試
│
├── config/
│   ├── development.yaml            # 開發環境配置
│   ├── staging.yaml                # 測試環境配置
│   └── production.yaml             # 生產環境配置
│
├── scripts/
│   ├── deploy.sh                   # 部署腳本
│   ├── backup.sh                   # 備份腳本
│   └── health_check.sh             # 健康檢查腳本
│
├── docs/
│   ├── api.md                      # API 文件
│   ├── deployment.md               # 部署指南
│   └── troubleshooting.md          # 故障排除
│
├── Dockerfile                      # Docker 映像檔定義
├── docker-compose.yml              # 本地開發環境
├── requirements.txt                # Python 依賴
├── .github/workflows/              # GitHub Actions
└── README.md                       # 專案說明

開發里程碑

Phase 1: 基礎架構 (Day 26-27)

  • [x] 專案結構設計
  • [ ] 核心引擎框架
  • [ ] 基本配置系統
  • [ ] 日誌和監控

Phase 2: 資料和 API (Day 28)

  • [ ] Bybit API 整合
  • [ ] 市場資料收集
  • [ ] 資料儲存系統
  • [ ] API 測試環境

Phase 3: 交易邏輯 (Day 29)

  • [ ] 策略框架實作
  • [ ] 風險管理系統
  • [ ] 訂單管理器
  • [ ] 回測功能

Phase 4: 部署和監控 (Day 30-31)

  • [ ] Docker 容器化
  • [ ] AWS 部署
  • [ ] 監控面板
  • [ ] Telegram 通知

技術選型說明

程式語言:Python 3.9+

  • 優勢:豐富的金融庫、易於開發和維護
  • 劣勢:執行速度相對較慢
  • 適用場景:量化交易、資料分析

非同步框架:asyncio

  • 優勢:高併發、適合 I/O 密集型任務
  • 應用:WebSocket 連接、API 呼叫

資料庫:PostgreSQL + Redis

  • PostgreSQL:持久化資料、交易記錄
  • Redis:快取、即時資料

容器化:Docker + ECS

  • Docker:應用程式容器化
  • ECS:容器編排和管理

監控:CloudWatch + Grafana

  • CloudWatch:AWS 原生監控
  • Grafana:自訂儀表板

小結

今天我們設計了完整的量化交易系統實作架構,就像為現代化農場畫好了藍圖。重要的設計原則:

系統設計原則:

  • 模組化設計,便於維護和擴展
  • 異步處理,提高系統效能
  • 容錯設計,確保系統穩定性
  • 監控完整,及時發現問題

技術架構特點:

  • 雲原生設計,充分利用 AWS 服務
  • 微服務架構,每個模組職責單一
  • 自動化部署,提高開發效率
  • 全面監控,保障系統可靠性

開發策略:

  • 階段性開發,降低實作風險
  • 測試驅動,確保程式碼品質
  • 持續整合,快速反饋問題
  • 文件完整,便於團隊協作

明天我們將開始實際編寫程式碼,從 Bybit API 整合開始!


下一篇:Day 27 - Bybit API key for Testnet


上一篇
Day 25: Paper Trade
下一篇
Day 27: Binance API key for Testnet
系列文
小資族的量化交易 10128
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言