相比前幾版,我這次改了Walk-forward 訓練,增加了滑動視窗式訓練,又改了策略回測包含 ATR 止損、固定獲利、手續費、最終平倉、勝率與回撤計算。
# ============================================
# 📊 AI Crypto Strategy Backtest System (Full)
# ============================================
import os
import ccxt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
TOTAL_LIMIT = 1500
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
# -------------------------
# Step 1: 抓資料
# -------------------------
def fetch_crypto_data(symbol="BTC/USDT", timeframe="1h", limit_total=TOTAL_LIMIT, force_reload=True):
filename = f"{symbol.replace('/', '_')}_latest.csv"
if os.path.exists(filename) and not force_reload:
print(f"✅ 讀取快取檔案 {filename}")
df = pd.read_csv(filename)
df['timestamp'] = pd.to_datetime(df['timestamp'])
print(f"📅 Data range: {df['timestamp'].min()} → {df['timestamp'].max()}")
return df
print(f"📡 從 Binance 抓取最新 {symbol} {timeframe} 資料中...")
exchange = ccxt.binance({"enableRateLimit": True})
exchange.load_markets()
all_data = []
since = None
limit = 1000
fetched = 0
while True:
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
if not ohlcv:
break
all_data += ohlcv
fetched += len(ohlcv)
since = ohlcv[-1][0] + 1
print(f" → 已抓取 {fetched} 筆")
if len(ohlcv) < limit or fetched >= limit_total:
break
df = pd.DataFrame(all_data, columns=['timestamp','open','high','low','close','volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
df.to_csv(filename, index=False)
print(f"✅ 資料已儲存至 {filename}")
print(f"📅 Data range: {df['timestamp'].min()} → {df['timestamp'].max()}")
return df
# -------------------------
# Step 2: 技術指標
# -------------------------
def add_indicators(df):
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA50'] = df['close'].rolling(window=50).mean()
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(window=14).mean()
avg_loss = loss.rolling(window=14).mean()
rs = avg_gain / avg_loss
df['RSI'] = 100 - (100 / (1 + rs))
ema12 = df['close'].ewm(span=12, adjust=False).mean()
ema26 = df['close'].ewm(span=26, adjust=False).mean()
df['MACD'] = ema12 - ema26
df['Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
df['BB_Mid'] = df['MA20']
df['BB_Upper'] = df['MA20'] + 2 * df['close'].rolling(window=20).std()
df['BB_Lower'] = df['MA20'] - 2 * df['close'].rolling(window=20).std()
df['Vol_MA20'] = df['volume'].rolling(window=20).mean()
df['Vol_MA50'] = df['volume'].rolling(window=50).mean()
df['ROC'] = df['close'].pct_change(periods=10) * 100
df['H-L'] = df['high'] - df['low']
df['H-PC'] = abs(df['high'] - df['close'].shift(1))
df['L-PC'] = abs(df['low'] - df['close'].shift(1))
df['TR'] = df[['H-L', 'H-PC', 'L-PC']].max(axis=1)
df['ATR'] = df['TR'].rolling(window=14).mean()
df['Momentum'] = df['close'] - df['close'].shift(10)
df['Price_vs_MA20'] = df['close'] / df['MA20'] - 1
df['Price_vs_MA50'] = df['close'] / df['MA50'] - 1
return df.dropna().reset_index(drop=True)
# -------------------------
# Step 3: 準備 ML 數據集
# -------------------------
def prepare_ml_data(df, return_threshold=0.003):
df = df.copy()
df['Future_Close'] = df['close'].shift(-1)
df['Return'] = (df['Future_Close'] / df['close']) - 1
df['Target'] = (df['Return'] > return_threshold).astype(int)
features = [
'MA20', 'MA50', 'RSI', 'MACD', 'Signal',
'BB_Mid', 'BB_Upper', 'BB_Lower',
'Vol_MA20', 'Vol_MA50',
'ROC', 'ATR', 'Momentum',
'Price_vs_MA20', 'Price_vs_MA50'
]
df = df.dropna().reset_index(drop=True)
X = df[features]
y = df['Target']
return X, y, df
# -------------------------
# Step 4: 滑動視窗 Walk-forward 訓練
# -------------------------
def walk_forward_train(X, y, df, train_window=2000, test_window=400, step=400, model_cls=RandomForestClassifier, model_kwargs=None):
"""
sliding-window walk-forward training.
- train_window: 用多少筆資料訓練
- test_window: 每次測試用多少筆
- step: 每次往前移動多少筆 (通常 = test_window)
"""
if model_kwargs is None:
model_kwargs = {"n_estimators":200, "random_state":42}
n = len(X)
folds = []
scores = []
last_model = None
last_test_index = None
last_y_true = None
last_y_pred = None
start = 0
# 從 train_window 開始,確保有足夠訓練資料
while start + train_window + test_window <= n:
train_idx = list(range(start, start + train_window))
test_idx = list(range(start + train_window, start + train_window + test_window))
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
model = model_cls(**model_kwargs)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
scores.append(acc)
# 為了簡化輸出,這裡不列印每個 Fold 的細節
# print(f"Fold {len(scores)} Accuracy: {acc:.4f}")
folds.append({
"train_index": train_idx,
"test_index": test_idx,
"model": model,
"y_true": y_test,
"y_pred": y_pred,
"acc": acc
})
# move window
start += step
# 保留最後一個 fold 的結果
last_model = model
last_test_index = test_idx
last_y_true = y_test
last_y_pred = y_pred
print(f"\nWalk-forward 訓練完成。共 {len(scores)} 個 Fold。")
print(f"📊 平均準確率: {np.mean(scores):.4f}")
if not folds:
print("❌ 錯誤:Walk-forward 沒有執行任何 Fold。請檢查資料量是否足夠。")
return None, None, None, None, None
# 使用最後一個 Fold 的結果繪製預測圖
plot_predictions(df, last_y_true, last_y_pred, last_test_index, f"Predictions vs Actual (Latest {test_window} bars)")
return last_model, last_test_index, last_y_true, last_y_pred, folds
# -------------------------
# Step 4B: 輔助繪圖函式 (與回測共用)
# -------------------------
def plot_predictions(df, y_true, y_pred, test_index, title="Predictions vs Actual"):
# 確保 test_index 是索引列表
df_test = df.iloc[test_index].copy().reset_index(drop=True)
df_test["True"] = pd.Series(y_true).reset_index(drop=True)
df_test["Pred"] = pd.Series(y_pred).reset_index(drop=True)
plt.figure(figsize=(12,6))
plt.plot(df_test["timestamp"], df_test["close"], label="Close Price", color="black", alpha=0.6)
plt.scatter(df_test["timestamp"][df_test["Pred"] == 1], df_test["close"][df_test["Pred"] == 1], label="Pred Up", color="green", marker="^", s=50)
plt.scatter(df_test["timestamp"][df_test["Pred"] == 0], df_test["close"][df_test["Pred"] == 0], label="Pred Down", color="red", marker="v", s=50)
plt.title(title)
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# -------------------------
# Step 5: 進階回測 (使用 test2.py 的 robust 邏輯)
# -------------------------
def backtest_strategy(df, y_true, y_pred, test_index,
initial_capital=10000,
position_size_ratio=0.1,
fee_rate=0.001,
atr_multiplier=1.5,
take_profit_ratio=0.02,
debug=False):
"""
改進版策略回測:
- 支援多空進出場
- 含 ATR 止損與獲利邏輯
- 加入最終平倉
- 修正 equity 曲線與手續費
"""
df_test = df.iloc[test_index].copy().reset_index(drop=True)
df_test["True"] = pd.Series(y_true).reset_index(drop=True)
df_test["Pred"] = pd.Series(y_pred).reset_index(drop=True)
balance = initial_capital
equity_curve = [balance]
trades = []
position, entry_price, entry_capital, entry_units = None, 0, 0, 0
for i in range(1, len(df_test)):
price_now = df_test["close"].iloc[i]
rsi = np.nan_to_num(df_test["RSI"].iloc[i], nan=50)
pred = df_test["Pred"].iloc[i - 1]
atr = np.nan_to_num(df_test["ATR"].iloc[i], nan=0)
# -------------------
# 1️⃣ 進場邏輯
# -------------------
if position is None:
if pred == 1 and rsi > 55:
position = "long"
entry_price = price_now
entry_capital = balance * position_size_ratio
entry_units = entry_capital / entry_price
balance -= entry_capital * fee_rate # 手續費
if debug:
print(f"[BUY] @ {price_now:.2f}")
elif pred == 0 and rsi < 45:
position = "short"
entry_price = price_now
entry_capital = balance * position_size_ratio
entry_units = entry_capital / entry_price
balance -= entry_capital * fee_rate
if debug:
print(f"[SELL] @ {price_now:.2f}")
# -------------------
# 2️⃣ 出場邏輯
# -------------------
elif position == "long":
change = (price_now - entry_price) / entry_price
stop_loss = -atr_multiplier * atr / entry_price
take_profit = take_profit_ratio
if change <= stop_loss or change >= take_profit:
pnl = entry_capital * change
balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
trades.append(pnl / entry_capital)
if debug:
print(f"[EXIT LONG] @ {price_now:.2f}, PnL={pnl/entry_capital:.2%}")
position, entry_capital, entry_units = None, 0, 0
elif position == "short":
change = (entry_price - price_now) / entry_price
stop_loss = -atr_multiplier * atr / entry_price
take_profit = take_profit_ratio
if change <= stop_loss or change >= take_profit:
pnl = entry_capital * change
balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
trades.append(pnl / entry_capital)
if debug:
print(f"[EXIT SHORT] @ {price_now:.2f}, PnL={pnl/entry_capital:.2%}")
position, entry_capital, entry_units = None, 0, 0
# -------------------
# 3️⃣ 記錄淨值 (Equity)
# -------------------
current_equity = balance
if position == "long":
current_equity += entry_capital * ((price_now - entry_price) / entry_price)
elif position == "short":
current_equity += entry_capital * ((entry_price - price_now) / entry_price)
equity_curve.append(current_equity)
# -------------------
# 4️⃣ 最後平倉 (Final Closeout)
# -------------------
if position is not None:
final_price = df_test["close"].iloc[-1]
if position == "long":
pnl = entry_capital * ((final_price - entry_price) / entry_price)
else:
pnl = entry_capital * ((entry_price - final_price) / entry_price)
balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
trades.append(pnl / entry_capital)
if debug:
print(f"[FORCED EXIT] {position.upper()} @ {final_price:.2f}, Final PnL={pnl/entry_capital:.2%}")
# -------------------
# 5️⃣ 結果與報表
# -------------------
if len(equity_curve) < len(df_test):
equity_curve += [balance] * (len(df_test) - len(equity_curve))
df_test["Equity"] = equity_curve
total_return = (balance / initial_capital - 1) * 100
max_drawdown = ((df_test["Equity"].cummax() - df_test["Equity"]) / df_test["Equity"].cummax()).max() * 100
win_rate = (sum([1 for t in trades if t > 0]) / len(trades)) * 100 if trades else 0
# -------------------
# 6️⃣ 繪製曲線
# -------------------
plt.figure(figsize=(12, 6))
plt.plot(df.iloc[test_index].index, df_test["Equity"], label="Equity Curve", color="blue")
plt.axhline(initial_capital, linestyle="--", color="gray", alpha=0.7)
plt.title("Backtest Equity Curve (v2 Improved)")
plt.xlabel("Time")
plt.ylabel("Capital (USDT)")
plt.legend()
plt.tight_layout()
plt.show()
# -------------------
# 7️⃣ 印出統計
# -------------------
print(f"💰 最終資金: {balance:.2f} USDT")
print(f"📈 總報酬率: {total_return:.2f}%")
print(f"📉 最大回撤: {max_drawdown:.2f}%")
print(f"✅ 勝率: {win_rate:.2f}%")
print(f"📊 交易次數: {len(trades)}")
return df_test, trades
# -------------------------
# Step 6: 主程式執行流程
# -------------------------
if __name__ == "__main__":
# 設置高數據量 (20000 筆 K 線)
# 設置 Walk-forward 參數 (訓練 5000 筆,測試 1000 筆,步進 1000 筆)
TRAIN_WINDOW = 500
TEST_WINDOW = 100
STEP = 100
RETURN_THRESHOLD = 0.003 # 0.3% 漲幅才算 Target=1
print(f"===== 抓取與處理資料 (總筆數: {TOTAL_LIMIT}) =====")
# 【修正:加入 force_reload=True 以確保抓取足夠數據】
df_raw = fetch_crypto_data("BTC/USDT", timeframe="1h", limit_total=TOTAL_LIMIT, force_reload=True)
df_ind = add_indicators(df_raw)
X, y, df = prepare_ml_data(df_ind, return_threshold=RETURN_THRESHOLD)
print("\n===== 開始 Sliding-Window Walk-forward 訓練 =====")
# 傳遞 TRAIN_WINDOW, TEST_WINDOW, STEP 參數
model, last_test_index, y_true, y_pred, folds = walk_forward_train(
X, y, df,
train_window=TRAIN_WINDOW,
test_window=TEST_WINDOW,
step=STEP
)
# 確保只有在有結果時才嘗試回測 (解決您之前的 TypeError)
if model is not None:
print("\n===== 回測最後一個 Walk-forward 區段的績效 =====")
df_test, trades = backtest_strategy(
df,
y_true.astype(int),
y_pred,
last_test_index
)
print(f"\n✅ 系統已使用 {len(folds)} 個 Fold 訓練並完成回測。")
else:
print("\n⚠️ 因數據量不足,無法執行回測。")