iT邦幫忙

0

Day20繼續改進

d20
  • 分享至 

  • xImage
  •  

相比前幾版,我這次改了Walk-forward 訓練,增加了滑動視窗式訓練,又改了策略回測包含 ATR 止損、固定獲利、手續費、最終平倉、勝率與回撤計算。

# ============================================
# 📊 AI Crypto Strategy Backtest System (Full)
# ============================================

import os
import ccxt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
TOTAL_LIMIT = 1500
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# -------------------------
# Step 1: 抓資料
# -------------------------
def fetch_crypto_data(symbol="BTC/USDT", timeframe="1h", limit_total=TOTAL_LIMIT, force_reload=True):
    filename = f"{symbol.replace('/', '_')}_latest.csv"

    if os.path.exists(filename) and not force_reload:
        print(f"✅ 讀取快取檔案 {filename}")
        df = pd.read_csv(filename)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        print(f"📅 Data range: {df['timestamp'].min()} → {df['timestamp'].max()}")
        return df

    print(f"📡 從 Binance 抓取最新 {symbol} {timeframe} 資料中...")
    exchange = ccxt.binance({"enableRateLimit": True})
    exchange.load_markets()
    
    all_data = []
    since = None
    limit = 1000    
    fetched = 0

    while True:
        ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
        if not ohlcv:
            break
        all_data += ohlcv
        fetched += len(ohlcv)
        since = ohlcv[-1][0] + 1
        print(f" → 已抓取 {fetched} 筆")
        if len(ohlcv) < limit or fetched >= limit_total:
            break

    df = pd.DataFrame(all_data, columns=['timestamp','open','high','low','close','volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df = df.drop_duplicates(subset='timestamp').sort_values('timestamp').reset_index(drop=True)
    
    df.to_csv(filename, index=False)
    print(f"✅ 資料已儲存至 {filename}")
    print(f"📅 Data range: {df['timestamp'].min()} → {df['timestamp'].max()}")
    return df

# -------------------------
# Step 2: 技術指標
# -------------------------
def add_indicators(df):
    df['MA20'] = df['close'].rolling(window=20).mean()
    df['MA50'] = df['close'].rolling(window=50).mean()

    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))

    ema12 = df['close'].ewm(span=12, adjust=False).mean()
    ema26 = df['close'].ewm(span=26, adjust=False).mean()
    df['MACD'] = ema12 - ema26
    df['Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()

    df['BB_Mid'] = df['MA20']
    df['BB_Upper'] = df['MA20'] + 2 * df['close'].rolling(window=20).std()
    df['BB_Lower'] = df['MA20'] - 2 * df['close'].rolling(window=20).std()

    df['Vol_MA20'] = df['volume'].rolling(window=20).mean()
    df['Vol_MA50'] = df['volume'].rolling(window=50).mean()

    df['ROC'] = df['close'].pct_change(periods=10) * 100

    df['H-L'] = df['high'] - df['low']
    df['H-PC'] = abs(df['high'] - df['close'].shift(1))
    df['L-PC'] = abs(df['low'] - df['close'].shift(1))
    df['TR'] = df[['H-L', 'H-PC', 'L-PC']].max(axis=1)
    df['ATR'] = df['TR'].rolling(window=14).mean()

    df['Momentum'] = df['close'] - df['close'].shift(10)
    df['Price_vs_MA20'] = df['close'] / df['MA20'] - 1
    df['Price_vs_MA50'] = df['close'] / df['MA50'] - 1

    return df.dropna().reset_index(drop=True)

# -------------------------
# Step 3: 準備 ML 數據集
# -------------------------
def prepare_ml_data(df, return_threshold=0.003):
    df = df.copy()
    df['Future_Close'] = df['close'].shift(-1)
    df['Return'] = (df['Future_Close'] / df['close']) - 1
    df['Target'] = (df['Return'] > return_threshold).astype(int)

    features = [
        'MA20', 'MA50', 'RSI', 'MACD', 'Signal',
        'BB_Mid', 'BB_Upper', 'BB_Lower',
        'Vol_MA20', 'Vol_MA50',
        'ROC', 'ATR', 'Momentum',
        'Price_vs_MA20', 'Price_vs_MA50'
    ]
    df = df.dropna().reset_index(drop=True)
    X = df[features]
    y = df['Target']
    return X, y, df

# -------------------------
# Step 4: 滑動視窗 Walk-forward 訓練
# -------------------------
def walk_forward_train(X, y, df, train_window=2000, test_window=400, step=400, model_cls=RandomForestClassifier, model_kwargs=None):
    """
    sliding-window walk-forward training.
    - train_window: 用多少筆資料訓練
    - test_window: 每次測試用多少筆
    - step: 每次往前移動多少筆 (通常 = test_window)
    """
    if model_kwargs is None:
        model_kwargs = {"n_estimators":200, "random_state":42}

    n = len(X)
    folds = []
    scores = []
    last_model = None
    last_test_index = None
    last_y_true = None
    last_y_pred = None

    start = 0
    # 從 train_window 開始,確保有足夠訓練資料
    while start + train_window + test_window <= n:
        train_idx = list(range(start, start + train_window))
        test_idx = list(range(start + train_window, start + train_window + test_window))

        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        model = model_cls(**model_kwargs)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)
        scores.append(acc)
        
        # 為了簡化輸出,這裡不列印每個 Fold 的細節
        # print(f"Fold {len(scores)} Accuracy: {acc:.4f}")

        folds.append({
            "train_index": train_idx,
            "test_index": test_idx,
            "model": model,
            "y_true": y_test,
            "y_pred": y_pred,
            "acc": acc
        })

        # move window
        start += step

        # 保留最後一個 fold 的結果
        last_model = model
        last_test_index = test_idx
        last_y_true = y_test
        last_y_pred = y_pred

    print(f"\nWalk-forward 訓練完成。共 {len(scores)} 個 Fold。")
    print(f"📊 平均準確率: {np.mean(scores):.4f}")

    if not folds:
        print("❌ 錯誤:Walk-forward 沒有執行任何 Fold。請檢查資料量是否足夠。")
        return None, None, None, None, None
    
    # 使用最後一個 Fold 的結果繪製預測圖
    plot_predictions(df, last_y_true, last_y_pred, last_test_index, f"Predictions vs Actual (Latest {test_window} bars)")

    return last_model, last_test_index, last_y_true, last_y_pred, folds
# -------------------------
# Step 4B: 輔助繪圖函式 (與回測共用)
# -------------------------
def plot_predictions(df, y_true, y_pred, test_index, title="Predictions vs Actual"):
    # 確保 test_index 是索引列表
    df_test = df.iloc[test_index].copy().reset_index(drop=True)
    df_test["True"] = pd.Series(y_true).reset_index(drop=True)
    df_test["Pred"] = pd.Series(y_pred).reset_index(drop=True)

    plt.figure(figsize=(12,6))
    plt.plot(df_test["timestamp"], df_test["close"], label="Close Price", color="black", alpha=0.6)
    plt.scatter(df_test["timestamp"][df_test["Pred"] == 1], df_test["close"][df_test["Pred"] == 1], label="Pred Up", color="green", marker="^", s=50)
    plt.scatter(df_test["timestamp"][df_test["Pred"] == 0], df_test["close"][df_test["Pred"] == 0], label="Pred Down", color="red", marker="v", s=50)
    plt.title(title)
    plt.xlabel("Time")
    plt.ylabel("Price")
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

# -------------------------
# Step 5: 進階回測 (使用 test2.py 的 robust 邏輯)
# -------------------------
def backtest_strategy(df, y_true, y_pred, test_index,
                         initial_capital=10000,
                         position_size_ratio=0.1,
                         fee_rate=0.001,
                         atr_multiplier=1.5,
                         take_profit_ratio=0.02,
                         debug=False):
    """
    改進版策略回測:
    - 支援多空進出場
    - 含 ATR 止損與獲利邏輯
    - 加入最終平倉
    - 修正 equity 曲線與手續費
    """
    df_test = df.iloc[test_index].copy().reset_index(drop=True)
    df_test["True"] = pd.Series(y_true).reset_index(drop=True)
    df_test["Pred"] = pd.Series(y_pred).reset_index(drop=True)

    balance = initial_capital
    equity_curve = [balance]
    trades = []
    position, entry_price, entry_capital, entry_units = None, 0, 0, 0

    for i in range(1, len(df_test)):
        price_now = df_test["close"].iloc[i]
        rsi = np.nan_to_num(df_test["RSI"].iloc[i], nan=50)
        pred = df_test["Pred"].iloc[i - 1]
        atr = np.nan_to_num(df_test["ATR"].iloc[i], nan=0)

        # -------------------
        # 1️⃣ 進場邏輯
        # -------------------
        if position is None:
            if pred == 1 and rsi > 55:
                position = "long"
                entry_price = price_now
                entry_capital = balance * position_size_ratio
                entry_units = entry_capital / entry_price
                balance -= entry_capital * fee_rate  # 手續費
                if debug:
                    print(f"[BUY] @ {price_now:.2f}")
            elif pred == 0 and rsi < 45:
                position = "short"
                entry_price = price_now
                entry_capital = balance * position_size_ratio
                entry_units = entry_capital / entry_price
                balance -= entry_capital * fee_rate
                if debug:
                    print(f"[SELL] @ {price_now:.2f}")

        # -------------------
        # 2️⃣ 出場邏輯
        # -------------------
        elif position == "long":
            change = (price_now - entry_price) / entry_price
            stop_loss = -atr_multiplier * atr / entry_price
            take_profit = take_profit_ratio
            if change <= stop_loss or change >= take_profit:
                pnl = entry_capital * change
                balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
                trades.append(pnl / entry_capital)
                if debug:
                    print(f"[EXIT LONG] @ {price_now:.2f}, PnL={pnl/entry_capital:.2%}")
                position, entry_capital, entry_units = None, 0, 0

        elif position == "short":
            change = (entry_price - price_now) / entry_price
            stop_loss = -atr_multiplier * atr / entry_price
            take_profit = take_profit_ratio
            if change <= stop_loss or change >= take_profit:
                pnl = entry_capital * change
                balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
                trades.append(pnl / entry_capital)
                if debug:
                    print(f"[EXIT SHORT] @ {price_now:.2f}, PnL={pnl/entry_capital:.2%}")
                position, entry_capital, entry_units = None, 0, 0

        # -------------------
        # 3️⃣ 記錄淨值 (Equity)
        # -------------------
        current_equity = balance
        if position == "long":
            current_equity += entry_capital * ((price_now - entry_price) / entry_price)
        elif position == "short":
            current_equity += entry_capital * ((entry_price - price_now) / entry_price)

        equity_curve.append(current_equity)

    # -------------------
    # 4️⃣ 最後平倉 (Final Closeout)
    # -------------------
    if position is not None:
        final_price = df_test["close"].iloc[-1]
        if position == "long":
            pnl = entry_capital * ((final_price - entry_price) / entry_price)
        else:
            pnl = entry_capital * ((entry_price - final_price) / entry_price)
        balance += entry_capital + pnl - (entry_capital + pnl) * fee_rate
        trades.append(pnl / entry_capital)
        if debug:
            print(f"[FORCED EXIT] {position.upper()} @ {final_price:.2f}, Final PnL={pnl/entry_capital:.2%}")

    # -------------------
    # 5️⃣ 結果與報表
    # -------------------
    if len(equity_curve) < len(df_test):
        equity_curve += [balance] * (len(df_test) - len(equity_curve))
    df_test["Equity"] = equity_curve

    total_return = (balance / initial_capital - 1) * 100
    max_drawdown = ((df_test["Equity"].cummax() - df_test["Equity"]) / df_test["Equity"].cummax()).max() * 100
    win_rate = (sum([1 for t in trades if t > 0]) / len(trades)) * 100 if trades else 0

    # -------------------
    # 6️⃣ 繪製曲線
    # -------------------
    plt.figure(figsize=(12, 6))
    plt.plot(df.iloc[test_index].index, df_test["Equity"], label="Equity Curve", color="blue")
    plt.axhline(initial_capital, linestyle="--", color="gray", alpha=0.7)
    plt.title("Backtest Equity Curve (v2 Improved)")
    plt.xlabel("Time")
    plt.ylabel("Capital (USDT)")
    plt.legend()
    plt.tight_layout()
    plt.show()

    # -------------------
    # 7️⃣ 印出統計
    # -------------------
    print(f"💰 最終資金: {balance:.2f} USDT")
    print(f"📈 總報酬率: {total_return:.2f}%")
    print(f"📉 最大回撤: {max_drawdown:.2f}%")
    print(f"✅ 勝率: {win_rate:.2f}%")
    print(f"📊 交易次數: {len(trades)}")

    return df_test, trades
# -------------------------
# Step 6: 主程式執行流程
# -------------------------
if __name__ == "__main__":    
    # 設置高數據量 (20000 筆 K 線)
    # 設置 Walk-forward 參數 (訓練 5000 筆,測試 1000 筆,步進 1000 筆)
    TRAIN_WINDOW = 500
    TEST_WINDOW = 100
    STEP = 100
    RETURN_THRESHOLD = 0.003 # 0.3% 漲幅才算 Target=1

    print(f"===== 抓取與處理資料 (總筆數: {TOTAL_LIMIT}) =====")
    # 【修正:加入 force_reload=True 以確保抓取足夠數據】
    df_raw = fetch_crypto_data("BTC/USDT", timeframe="1h", limit_total=TOTAL_LIMIT, force_reload=True)
    df_ind = add_indicators(df_raw)
    X, y, df = prepare_ml_data(df_ind, return_threshold=RETURN_THRESHOLD)

    print("\n===== 開始 Sliding-Window Walk-forward 訓練 =====")
    # 傳遞 TRAIN_WINDOW, TEST_WINDOW, STEP 參數
    model, last_test_index, y_true, y_pred, folds = walk_forward_train(
        X, y, df,
        train_window=TRAIN_WINDOW, 
        test_window=TEST_WINDOW, 
        step=STEP
    )

    # 確保只有在有結果時才嘗試回測 (解決您之前的 TypeError)
    if model is not None:
        print("\n===== 回測最後一個 Walk-forward 區段的績效 =====")
        df_test, trades = backtest_strategy(
            df, 
            y_true.astype(int), 
            y_pred, 
            last_test_index
        )
        
        print(f"\n✅ 系統已使用 {len(folds)} 個 Fold 訓練並完成回測。")
    else:
        print("\n⚠️ 因數據量不足,無法執行回測。")


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言