今天要深入學習回測系統的建立,這就像爸爸研究歷史天氣資料來預測今年的農作物收成一樣。透過分析過去的數據,我們可以驗證交易策略的有效性,就像查看「如果我們在過去使用這個種植方法,會有什麼結果」!
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Optional, Tuple, Any
import warnings
warnings.filterwarnings('ignore')
class BacktestEngine:
"""專業級回測引擎"""
def __init__(self, initial_capital: float = 100000, commission: float = 0.001):
self.initial_capital = initial_capital
self.commission = commission
# 狀態變數
self.current_capital = initial_capital
self.positions = {} # {symbol: quantity}
self.trades = []
self.equity_curve = []
self.daily_stats = []
# 配置參數
self.slippage = 0.0005 # 0.05% 滑價
self.risk_free_rate = 0.02 # 2% 無風險利率
def add_data(self, symbol: str, data: pd.DataFrame) -> None:
"""添加歷史數據"""
# 驗證數據完整性
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"數據缺少必要欄位: {missing_columns}")
# 檢查數據品質
if data.isnull().any().any():
print(f"警告: {symbol} 數據包含空值,將進行處理")
data = data.fillna(method='forward').fillna(method='backward')
# 確保數據按時間排序
data = data.sort_index()
# 添加基本技術指標
data = self._add_technical_indicators(data)
setattr(self, f'data_{symbol}', data)
print(f"已載入 {symbol} 數據: {len(data)} 筆記錄")
def _add_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""添加技術指標"""
# 移動平均線
data['sma_20'] = data['close'].rolling(20).mean()
data['sma_50'] = data['close'].rolling(50).mean()
data['sma_200'] = data['close'].rolling(200).mean()
# 指數移動平均
data['ema_12'] = data['close'].ewm(span=12).mean()
data['ema_26'] = data['close'].ewm(span=26).mean()
# MACD
data['macd'] = data['ema_12'] - data['ema_26']
data['macd_signal'] = data['macd'].ewm(span=9).mean()
data['macd_histogram'] = data['macd'] - data['macd_signal']
# RSI
delta = data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss
data['rsi'] = 100 - (100 / (1 + rs))
# 布林帶
data['bb_middle'] = data['close'].rolling(20).mean()
bb_std = data['close'].rolling(20).std()
data['bb_upper'] = data['bb_middle'] + (bb_std * 2)
data['bb_lower'] = data['bb_middle'] - (bb_std * 2)
data['bb_width'] = (data['bb_upper'] - data['bb_lower']) / data['bb_middle']
data['bb_position'] = (data['close'] - data['bb_lower']) / (data['bb_upper'] - data['bb_lower'])
# ATR (平均真實範圍)
high_low = data['high'] - data['low']
high_close = np.abs(data['high'] - data['close'].shift())
low_close = np.abs(data['low'] - data['close'].shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
data['atr'] = true_range.rolling(14).mean()
# 價格動量
data['momentum_5'] = data['close'] / data['close'].shift(5) - 1
data['momentum_10'] = data['close'] / data['close'].shift(10) - 1
data['momentum_20'] = data['close'] / data['close'].shift(20) - 1
return data
def run_backtest(self, strategy, start_date: str = None, end_date: str = None) -> Dict:
"""執行回測"""
print("開始回測...")
# 重置狀態
self._reset_state()
# 獲取數據
symbol = strategy.symbol
data = getattr(self, f'data_{symbol}')
# 時間範圍篩選
if start_date:
data = data[data.index >= start_date]
if end_date:
data = data[data.index <= end_date]
# 執行策略
for current_time, row in data.iterrows():
# 策略決策
signal = strategy.generate_signal(row, self.positions.get(symbol, 0))
# 執行交易
if signal:
self._execute_signal(symbol, signal, row, current_time)
# 記錄權益曲線
portfolio_value = self._calculate_portfolio_value(row['close'], symbol)
self.equity_curve.append({
'timestamp': current_time,
'portfolio_value': portfolio_value,
'cash': self.current_capital,
'position_value': portfolio_value - self.current_capital,
'price': row['close']
})
# 生成績效報告
performance = self._generate_performance_report()
print("回測完成!")
return performance
def _reset_state(self):
"""重置回測狀態"""
self.current_capital = self.initial_capital
self.positions = {}
self.trades = []
self.equity_curve = []
self.daily_stats = []
def _execute_signal(self, symbol: str, signal: Dict, market_data: pd.Series, timestamp):
"""執行交易信號"""
action = signal['action']
quantity = signal.get('quantity', 0)
if action == 'buy':
self._execute_buy(symbol, quantity, market_data, timestamp)
elif action == 'sell':
self._execute_sell(symbol, quantity, market_data, timestamp)
elif action == 'close':
self._close_position(symbol, market_data, timestamp)
def _execute_buy(self, symbol: str, quantity: float, market_data: pd.Series, timestamp):
"""執行買入"""
# 計算執行價格(包含滑價)
base_price = market_data['close']
execution_price = base_price * (1 + self.slippage)
# 計算交易成本
trade_value = quantity * execution_price
commission_cost = trade_value * self.commission
total_cost = trade_value + commission_cost
# 檢查資金是否足夠
if total_cost <= self.current_capital:
# 更新倉位和資金
self.positions[symbol] = self.positions.get(symbol, 0) + quantity
self.current_capital -= total_cost
# 記錄交易
trade_record = {
'timestamp': timestamp,
'symbol': symbol,
'action': 'buy',
'quantity': quantity,
'price': execution_price,
'value': trade_value,
'commission': commission_cost,
'total_cost': total_cost,
'cash_after': self.current_capital,
'position_after': self.positions[symbol]
}
self.trades.append(trade_record)
else:
print(f"資金不足,無法買入 {quantity} {symbol}")
def _execute_sell(self, symbol: str, quantity: float, market_data: pd.Series, timestamp):
"""執行賣出"""
current_position = self.positions.get(symbol, 0)
# 檢查持倉是否足夠
if current_position >= quantity:
# 計算執行價格(包含滑價)
base_price = market_data['close']
execution_price = base_price * (1 - self.slippage)
# 計算交易收入
trade_value = quantity * execution_price
commission_cost = trade_value * self.commission
net_proceeds = trade_value - commission_cost
# 更新倉位和資金
self.positions[symbol] -= quantity
if self.positions[symbol] == 0:
del self.positions[symbol]
self.current_capital += net_proceeds
# 記錄交易
trade_record = {
'timestamp': timestamp,
'symbol': symbol,
'action': 'sell',
'quantity': quantity,
'price': execution_price,
'value': trade_value,
'commission': commission_cost,
'net_proceeds': net_proceeds,
'cash_after': self.current_capital,
'position_after': self.positions.get(symbol, 0)
}
self.trades.append(trade_record)
else:
print(f"持倉不足,無法賣出 {quantity} {symbol}")
def _close_position(self, symbol: str, market_data: pd.Series, timestamp):
"""平倉"""
current_position = self.positions.get(symbol, 0)
if current_position > 0:
self._execute_sell(symbol, current_position, market_data, timestamp)
elif current_position < 0:
self._execute_buy(symbol, abs(current_position), market_data, timestamp)
def _calculate_portfolio_value(self, current_price: float, symbol: str) -> float:
"""計算投資組合總值"""
position_value = self.positions.get(symbol, 0) * current_price
return self.current_capital + position_value
def _generate_performance_report(self) -> Dict:
"""生成績效報告"""
if not self.equity_curve:
return {}
# 轉換為 DataFrame
equity_df = pd.DataFrame(self.equity_curve)
equity_df.set_index('timestamp', inplace=True)
# 計算日收益率
equity_df['daily_returns'] = equity_df['portfolio_value'].pct_change()
# 基本績效指標
total_return = (equity_df['portfolio_value'].iloc[-1] / self.initial_capital) - 1
total_days = (equity_df.index[-1] - equity_df.index[0]).days
annualized_return = (1 + total_return) ** (365 / total_days) - 1
# 風險指標
daily_returns = equity_df['daily_returns'].dropna()
volatility = daily_returns.std() * np.sqrt(365)
# 夏普比率
excess_returns = daily_returns - (self.risk_free_rate / 365)
sharpe_ratio = excess_returns.mean() / excess_returns.std() * np.sqrt(365) if excess_returns.std() > 0 else 0
# 最大回撤
equity_df['peak'] = equity_df['portfolio_value'].expanding().max()
equity_df['drawdown'] = (equity_df['portfolio_value'] - equity_df['peak']) / equity_df['peak']
max_drawdown = equity_df['drawdown'].min()
# 最大回撤持續時間
in_drawdown = equity_df['drawdown'] < 0
drawdown_periods = []
current_period = 0
for is_dd in in_drawdown:
if is_dd:
current_period += 1
else:
if current_period > 0:
drawdown_periods.append(current_period)
current_period = 0
max_drawdown_duration = max(drawdown_periods) if drawdown_periods else 0
# 交易統計
trade_analysis = self._analyze_trades()
# Calmar 比率
calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0
# Sortino 比率
downside_returns = daily_returns[daily_returns < 0]
downside_deviation = downside_returns.std() * np.sqrt(365)
sortino_ratio = (annualized_return - self.risk_free_rate) / downside_deviation if downside_deviation > 0 else 0
# VaR (Value at Risk)
var_95 = np.percentile(daily_returns, 5)
var_99 = np.percentile(daily_returns, 1)
performance = {
'basic_metrics': {
'total_return': total_return,
'annualized_return': annualized_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'calmar_ratio': calmar_ratio,
'max_drawdown': max_drawdown,
'max_drawdown_duration_days': max_drawdown_duration,
'var_95': var_95,
'var_99': var_99
},
'trade_analysis': trade_analysis,
'equity_curve': equity_df,
'final_portfolio_value': equity_df['portfolio_value'].iloc[-1],
'total_trades': len(self.trades)
}
return performance
def _analyze_trades(self) -> Dict:
"""分析交易記錄"""
if not self.trades:
return {}
trades_df = pd.DataFrame(self.trades)
# 配對買賣交易計算盈虧
buy_trades = trades_df[trades_df['action'] == 'buy'].copy()
sell_trades = trades_df[trades_df['action'] == 'sell'].copy()
trade_pairs = []
# 簡化的 FIFO 配對邏輯
buy_queue = buy_trades.to_dict('records')
for _, sell in sell_trades.iterrows():
remaining_sell_qty = sell['quantity']
while remaining_sell_qty > 0 and buy_queue:
buy = buy_queue[0]
if buy['quantity'] <= remaining_sell_qty:
# 完全配對
pnl = (sell['price'] - buy['price']) * buy['quantity']
pnl -= (buy['commission'] + sell['commission'] * buy['quantity'] / sell['quantity'])
trade_pairs.append({
'buy_date': buy['timestamp'],
'sell_date': sell['timestamp'],
'quantity': buy['quantity'],
'buy_price': buy['price'],
'sell_price': sell['price'],
'pnl': pnl,
'return': pnl / (buy['price'] * buy['quantity']),
'holding_days': (sell['timestamp'] - buy['timestamp']).days
})
remaining_sell_qty -= buy['quantity']
buy_queue.pop(0)
else:
# 部分配對
pnl = (sell['price'] - buy['price']) * remaining_sell_qty
pnl -= sell['commission'] + buy['commission'] * remaining_sell_qty / buy['quantity']
trade_pairs.append({
'buy_date': buy['timestamp'],
'sell_date': sell['timestamp'],
'quantity': remaining_sell_qty,
'buy_price': buy['price'],
'sell_price': sell['price'],
'pnl': pnl,
'return': pnl / (buy['price'] * remaining_sell_qty),
'holding_days': (sell['timestamp'] - buy['timestamp']).days
})
buy_queue[0]['quantity'] -= remaining_sell_qty
remaining_sell_qty = 0
if not trade_pairs:
return {}
pairs_df = pd.DataFrame(trade_pairs)
# 統計分析
winning_trades = pairs_df[pairs_df['pnl'] > 0]
losing_trades = pairs_df[pairs_df['pnl'] < 0]
analysis = {
'total_paired_trades': len(pairs_df),
'winning_trades': len(winning_trades),
'losing_trades': len(losing_trades),
'win_rate': len(winning_trades) / len(pairs_df) if len(pairs_df) > 0 else 0,
'total_pnl': pairs_df['pnl'].sum(),
'avg_pnl': pairs_df['pnl'].mean(),
'avg_return': pairs_df['return'].mean(),
'avg_winning_pnl': winning_trades['pnl'].mean() if len(winning_trades) > 0 else 0,
'avg_losing_pnl': losing_trades['pnl'].mean() if len(losing_trades) > 0 else 0,
'largest_win': pairs_df['pnl'].max(),
'largest_loss': pairs_df['pnl'].min(),
'avg_holding_days': pairs_df['holding_days'].mean(),
'max_holding_days': pairs_df['holding_days'].max(),
'profit_factor': abs(winning_trades['pnl'].sum() / losing_trades['pnl'].sum()) if len(losing_trades) > 0 and losing_trades['pnl'].sum() != 0 else float('inf')
}
return analysis
class TradingStrategy:
"""交易策略基類"""
def __init__(self, symbol: str, **params):
self.symbol = symbol
self.params = params
def generate_signal(self, market_data: pd.Series, current_position: float) -> Optional[Dict]:
"""生成交易信號 - 子類需要實作"""
raise NotImplementedError("子類必須實作 generate_signal 方法")
class MovingAverageCrossStrategy(TradingStrategy):
"""移動平均交叉策略"""
def __init__(self, symbol: str, fast_period: int = 20, slow_period: int = 50, **params):
super().__init__(symbol, **params)
self.fast_period = fast_period
self.slow_period = slow_period
self.position_size = params.get('position_size', 10000) # 每次交易金額
def generate_signal(self, market_data: pd.Series, current_position: float) -> Optional[Dict]:
"""生成移動平均交叉信號"""
# 檢查數據是否足夠
if pd.isna(market_data[f'sma_{self.fast_period}']) or pd.isna(market_data[f'sma_{self.slow_period}']):
return None
fast_ma = market_data[f'sma_{self.fast_period}']
slow_ma = market_data[f'sma_{self.slow_period}']
current_price = market_data['close']
# 黃金交叉:快線突破慢線
if fast_ma > slow_ma and current_position == 0:
quantity = self.position_size / current_price
return {
'action': 'buy',
'quantity': quantity,
'reason': f'黃金交叉: MA{self.fast_period}({fast_ma:.2f}) > MA{self.slow_period}({slow_ma:.2f})'
}
# 死亡交叉:快線跌破慢線
elif fast_ma < slow_ma and current_position > 0:
return {
'action': 'close',
'reason': f'死亡交叉: MA{self.fast_period}({fast_ma:.2f}) < MA{self.slow_period}({slow_ma:.2f})'
}
return None
class RSIMeanReversionStrategy(TradingStrategy):
"""RSI 均值回歸策略"""
def __init__(self, symbol: str, rsi_period: int = 14, oversold: float = 30,
overbought: float = 70, **params):
super().__init__(symbol, **params)
self.rsi_period = rsi_period
self.oversold = oversold
self.overbought = overbought
self.position_size = params.get('position_size', 10000)
def generate_signal(self, market_data: pd.Series, current_position: float) -> Optional[Dict]:
"""生成 RSI 均值回歸信號"""
if pd.isna(market_data['rsi']):
return None
rsi = market_data['rsi']
current_price = market_data['close']
# RSI 超賣,買入信號
if rsi < self.oversold and current_position == 0:
quantity = self.position_size / current_price
return {
'action': 'buy',
'quantity': quantity,
'reason': f'RSI 超賣: {rsi:.2f} < {self.oversold}'
}
# RSI 超買,賣出信號
elif rsi > self.overbought and current_position > 0:
return {
'action': 'close',
'reason': f'RSI 超買: {rsi:.2f} > {self.overbought}'
}
return None
# 回測執行範例
def run_comprehensive_backtest():
"""執行完整回測範例"""
# 創建回測引擎
engine = BacktestEngine(initial_capital=100000, commission=0.001)
# 生成模擬數據
dates = pd.date_range('2023-01-01', '2024-01-01', freq='1D')
np.random.seed(42)
# 模擬 BTC 價格走勢
returns = np.random.normal(0.001, 0.025, len(dates)) # 日收益率
prices = [50000] # 起始價格
for ret in returns[1:]:
prices.append(prices[-1] * (1 + ret))
# 生成 OHLCV 數據
btc_data = pd.DataFrame(index=dates)
btc_data['close'] = prices
btc_data['open'] = btc_data['close'].shift(1).fillna(btc_data['close'].iloc[0])
btc_data['high'] = btc_data[['open', 'close']].max(axis=1) * (1 + np.random.uniform(0, 0.02, len(dates)))
btc_data['low'] = btc_data[['open', 'close']].min(axis=1) * (1 - np.random.uniform(0, 0.02, len(dates)))
btc_data['volume'] = np.random.uniform(1000, 10000, len(dates))
# 載入數據
engine.add_data('BTCUSDT', btc_data)
# 測試不同策略
strategies = [
MovingAverageCrossStrategy('BTCUSDT', fast_period=10, slow_period=30),
MovingAverageCrossStrategy('BTCUSDT', fast_period=20, slow_period=50),
RSIMeanReversionStrategy('BTCUSDT', oversold=30, overbought=70)
]
results = {}
for i, strategy in enumerate(strategies, 1):
print(f"\n{'='*50}")
print(f"測試策略 {i}: {strategy.__class__.__name__}")
print(f"參數: {strategy.params}")
print(f"{'='*50}")
performance = engine.run_backtest(strategy)
results[f"Strategy_{i}"] = performance
# 顯示關鍵指標
basic_metrics = performance['basic_metrics']
trade_analysis = performance['trade_analysis']
print(f"\n績效摘要:")
print(f"總收益率: {basic_metrics['total_return']:.2%}")
print(f"年化收益率: {basic_metrics['annualized_return']:.2%}")
print(f"年化波動率: {basic_metrics['volatility']:.2%}")
print(f"夏普比率: {basic_metrics['sharpe_ratio']:.2f}")
print(f"最大回撤: {basic_metrics['max_drawdown']:.2%}")
if trade_analysis:
print(f"\n交易統計:")
print(f"總交易次數: {trade_analysis['total_paired_trades']}")
print(f"勝率: {trade_analysis['win_rate']:.2%}")
print(f"平均收益: {trade_analysis['avg_pnl']:.2f}")
print(f"盈虧比: {trade_analysis['profit_factor']:.2f}")
return results
# 執行回測
backtest_results = run_comprehensive_backtest()
class AdvancedPerformanceAnalyzer:
"""進階績效分析器"""
def __init__(self, backtest_results: Dict):
self.results = backtest_results
def analyze_risk_adjusted_returns(self) -> Dict:
"""風險調整收益分析"""
equity_curve = self.results['equity_curve']
daily_returns = equity_curve['daily_returns'].dropna()
# 計算各種風險調整收益指標
analysis = {}
# Omega 比率
threshold = 0 # 閾值收益率
gains = daily_returns[daily_returns > threshold]
losses = daily_returns[daily_returns <= threshold]
if len(losses) > 0:
omega_ratio = gains.sum() / abs(losses.sum())
else:
omega_ratio = float('inf')
# 資訊比率 (假設基準為0)
tracking_error = daily_returns.std()
information_ratio = daily_returns.mean() / tracking_error if tracking_error > 0 else 0
# Treynor 比率 (假設 Beta = 1)
beta = 1.0 # 簡化假設
treynor_ratio = (daily_returns.mean() * 252 - 0.02) / beta
analysis.update({
'omega_ratio': omega_ratio,
'information_ratio': information_ratio * np.sqrt(252),
'treynor_ratio': treynor_ratio
})
return analysis
def analyze_drawdown_characteristics(self) -> Dict:
"""回撤特徵分析"""
equity_curve = self.results['equity_curve']
# 計算所有回撤期間
equity_curve['peak'] = equity_curve['portfolio_value'].expanding().max()
equity_curve['drawdown'] = (equity_curve['portfolio_value'] - equity_curve['peak']) / equity_curve['peak']
# 識別回撤期間
in_drawdown = equity_curve['drawdown'] < -0.001 # 0.1% 以上視為回撤
drawdown_periods = []
current_period = {'start': None, 'end': None, 'max_dd': 0, 'duration': 0}
for idx, (timestamp, is_dd) in enumerate(zip(equity_curve.index, in_drawdown)):
if is_dd and current_period['start'] is None:
# 開始新的回撤期
current_period['start'] = timestamp
current_period['max_dd'] = equity_curve.loc[timestamp, 'drawdown']
elif is_dd and current_period['start'] is not None:
# 持續回撤
current_period['max_dd'] = min(current_period['max_dd'],
equity_curve.loc[timestamp, 'drawdown'])
elif not is_dd and current_period['start'] is not None:
# 回撤結束
current_period['end'] = timestamp
current_period['duration'] = (timestamp - current_period['start']).days
drawdown_periods.append(current_period.copy())
current_period = {'start': None, 'end': None, 'max_dd': 0, 'duration': 0}
if not drawdown_periods:
return {'message': '未發現顯著回撤期間'}
# 統計分析
max_drawdowns = [period['max_dd'] for period in drawdown_periods]
durations = [period['duration'] for period in drawdown_periods]
analysis = {
'total_drawdown_periods': len(drawdown_periods),
'avg_drawdown_depth': np.mean(max_drawdowns),
'avg_drawdown_duration': np.mean(durations),
'max_drawdown_depth': min(max_drawdowns),
'max_drawdown_duration': max(durations),
'recovery_factor': abs(self.results['basic_metrics']['total_return'] /
self.results['basic_metrics']['max_drawdown']),
'drawdown_periods': drawdown_periods
}
return analysis
def analyze_monthly_returns(self) -> pd.DataFrame:
"""月度收益分析"""
equity_curve = self.results['equity_curve']
# 計算月度收益
monthly_values = equity_curve['portfolio_value'].resample('M').last()
monthly_returns = monthly_values.pct_change().dropna()
# 創建月度收益矩陣
monthly_df = monthly_returns.to_frame('returns')
monthly_df['year'] = monthly_df.index.year
monthly_df['month'] = monthly_df.index.month
# 透視表
pivot_table = monthly_df.pivot_table(
values='returns',
index='year',
columns='month',
fill_value=0
)
# 添加年度總計
pivot_table['Annual'] = (1 + pivot_table).prod(axis=1) - 1
return pivot_table
def generate_comprehensive_report(self) -> Dict:
"""生成綜合報告"""
report = {
'basic_performance': self.results['basic_metrics'],
'trade_analysis': self.results['trade_analysis'],
'risk_adjusted_returns': self.analyze_risk_adjusted_returns(),
'drawdown_analysis': self.analyze_drawdown_characteristics(),
'monthly_returns': self.analyze_monthly_returns()
}
return report
def create_performance_visualization(backtest_results: Dict) -> None:
"""創建績效可視化圖表"""
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('回測績效分析報告', fontsize=16, fontweight='bold')
equity_curve = backtest_results['equity_curve']
# 1. 權益曲線
axes[0, 0].plot(equity_curve.index, equity_curve['portfolio_value'],
label='投資組合價值', linewidth=2)
axes[0, 0].plot(equity_curve.index, [backtest_results['basic_metrics']['total_return'] *
backtest_results['equity_curve']['portfolio_value'].iloc[0]] * len(equity_curve),
'--', label='買入持有', alpha=0.7)
axes[0, 0].set_title('權益曲線')
axes[0, 0].set_ylabel('投資組合價值')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# 2. 回撤圖
equity_curve['peak'] = equity_curve['portfolio_value'].expanding().max()
equity_curve['drawdown'] = (equity_curve['portfolio_value'] - equity_curve['peak']) / equity_curve['peak']
axes[0, 1].fill_between(equity_curve.index, equity_curve['drawdown'], 0,
alpha=0.3, color='red', label='回撤')
axes[0, 1].plot(equity_curve.index, equity_curve['drawdown'], color='red', linewidth=1)
axes[0, 1].set_title('回撤分析')
axes[0, 1].set_ylabel('回撤比例')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# 3. 日收益率分布
daily_returns = equity_curve['daily_returns'].dropna()
axes[1, 0].hist(daily_returns, bins=50, alpha=0.7, density=True)
axes[1, 0].axvline(daily_returns.mean(), color='red', linestyle='--',
label=f'平均: {daily_returns.mean():.4f}')
axes[1, 0].set_title('日收益率分布')
axes[1, 0].set_xlabel('日收益率')
axes[1, 0].set_ylabel('密度')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 4. 滾動績效指標
rolling_window = 60 # 60天滾動窗口
rolling_returns = daily_returns.rolling(rolling_window).mean() * 252
rolling_volatility = daily_returns.rolling(rolling_window).std() * np.sqrt(252)
rolling_sharpe = rolling_returns / rolling_volatility
ax4_twin = axes[1, 1].twinx()
line1 = axes[1, 1].plot(rolling_sharpe.index, rolling_sharpe,
color='blue', label='滾動夏普比率')
line2 = ax4_twin.plot(rolling_volatility.index, rolling_volatility,
color='orange', label='滾動波動率', alpha=0.7)
axes[1, 1].set_title('滾動績效指標 (60天)')
axes[1, 1].set_ylabel('夏普比率', color='blue')
ax4_twin.set_ylabel('波動率', color='orange')
axes[1, 1].grid(True, alpha=0.3)
# 合併圖例
lines1, labels1 = axes[1, 1].get_legend_handles_labels()
lines2, labels2 = ax4_twin.get_legend_handles_labels()
axes[1, 1].legend(lines1 + lines2, labels1 + labels2, loc='upper left')
plt.tight_layout()
plt.show()
# 執行進階分析
def run_advanced_analysis():
"""執行進階分析"""
# 使用之前的回測結果
if 'backtest_results' in globals():
first_strategy_result = list(backtest_results.values())[0]
# 進階分析
analyzer = AdvancedPerformanceAnalyzer(first_strategy_result)
comprehensive_report = analyzer.generate_comprehensive_report()
print("\n" + "="*60)
print("進階績效分析報告")
print("="*60)
# 風險調整收益
risk_adj = comprehensive_report['risk_adjusted_returns']
print(f"\n風險調整收益指標:")
print(f"Omega 比率: {risk_adj['omega_ratio']:.2f}")
print(f"資訊比率: {risk_adj['information_ratio']:.2f}")
print(f"Treynor 比率: {risk_adj['treynor_ratio']:.2f}")
# 回撤分析
dd_analysis = comprehensive_report['drawdown_analysis']
if 'message' not in dd_analysis:
print(f"\n回撤特徵分析:")
print(f"回撤期間數: {dd_analysis['total_drawdown_periods']}")
print(f"平均回撤深度: {dd_analysis['avg_drawdown_depth']:.2%}")
print(f"平均回撤持續時間: {dd_analysis['avg_drawdown_duration']:.0f} 天")
print(f"恢復因子: {dd_analysis['recovery_factor']:.2f}")
# 月度收益
monthly_returns = comprehensive_report['monthly_returns']
print(f"\n月度收益統計:")
print(f"最佳月份收益: {monthly_returns.max().max():.2%}")
print(f"最差月份收益: {monthly_returns.min().min():.2%}")
# 創建可視化圖表
create_performance_visualization(first_strategy_result)
return comprehensive_report
# 運行進階分析
advanced_analysis_results = run_advanced_analysis()
今天我們建立了一個專業級的回測系統,就像為農場建立了一套完整的歷史數據分析系統。重要概念包括:
回測系統核心:
關鍵功能:
進階分析:
重要注意事項:
記住爸爸研究歷史天氣的話:「過去的經驗很重要,但不能完全依賴,還要結合當前的實際情況」。回測是策略開發的重要工具,但實盤交易時還需要考慮更多現實因素!
明天我們將完成系列的最後幾篇實作文章,學習具體的 API 整合和系統部署。
下一篇:Day 27 - Binance API key for Testnet