StockTradingEnvCashpenalty在開始寫加密貨幣自動交易的機器人時,我在嘗試使用FinRL中不同的Env來訓練DRL模型;今天一開始時嘗試了finrl.meta.env_stock_trading.env_stocktrading_cashpenalty.py中的StockTradingEnvCashpenalty。
想使用
StockTradingEnvCashpenalty是為了試試看有現金penalty的結果會不會比較好,因為加密貨幣風險較大,學習時有針對現金倉位的保留做限制感覺會更實用。
然而似乎是因為沒有在維護的關係,所以StockTradingEnvCashpenalty中有一些地方並沒有更新,導致在使用stable_baselines3的DLRAgent來訓練DRL模型的時候會產生兼容性的問題;為了能夠試用這個Env我針對修正了程式碼中因為的部分不被兼容的地方:
最新的OpenAI Gym版本更新後,reset()應該要回傳兩個物件
StockTradingEnvCashpenalty中的實作只有回傳statedef reset(
        self,
        *,
        seed=None,
        options=None,
    ):
    ...
    info = {}  # 可以根據需求填入更多的信息
    return init_state, info
最新版的 stable-baselines3 在調用 env.step() 時期望接收到五個返回值,而目前的環境(StockTradingEnvCashpenalty)只返回了四個值。因此,當前的 step() 函數需要進行調整,讓它符合新版本的 stable-baselines3 的要求。
新版本的 step() 函數預期返回以下五個值:
而舊版本的 Gym 環境中,step() 函數只返回四個值。這四個返回值的具體意義如下:
這個
bool值用來標記當前回合是否應該結束。如果done=True,表示回合結束(例如到達最後一步,或模型犯了不可挽回的錯誤,如失去所有資產)。回合結束後環境會被重置。
done 同時涵蓋了 "回合終止" 和 "回合被截斷" 兩種情況,因此只需要一個 done 布爾值來表示回合是否結束。terminated 表示回合是否自然終止(如完成任務或失敗)。truncated 表示回合是否因為達到最大時間步數或其他限制而被截斷。這樣的變化讓回合終止的原因更加清晰,也讓強化學習環境更靈活,可以處理更多不同的終止條件。
step() 的實作def step(self, actions):
    ...
    # if we're at the end
    if self.date_index == len(self.dates) - 1:
        # if we hit the end, set reward to total gains (or losses)
        state, reward, done, info = self.return_terminal(reward=self.get_reward())
        return state, reward, done, False, info  # 終止,沒有截斷
    else:
        """
        First, we need to compute values of holdings, save these, and log everything.
        Then we can reward our model for its earnings.
        """
        ...
        # if we run out of cash...
        if (spend + costs) > coh:
            if self.patient:
                # ... just don't buy anything until we got additional cash
                self.log_step(reason="CASH SHORTAGE")
                transactions = np.where(transactions > 0, 0, transactions)
                spend = 0
                costs = 0
            else:
                # ... end the cycle and penalize
                state, reward, done, info = self.return_terminal(
                    reason="CASH SHORTAGE", reward=self.get_reward()
                )
                return state, reward, done, False, info  # 終止,沒有截斷
        ...
        # truncated 是 False,因為不是因為達到最大步數被截斷
        return state, reward, False, False, {}  # 沒有截斷
False。在原本的程式碼中直接使用 from stable_baselines3.common import logger,但是在目前的stable_baselines3中的實際用法有變化,所以在整個程式的開始,做了修改,直接使用configure函數產生一個全域的logger。
from stable_baselines3.common.logger import configure
# 使用 configure 函數來配置和實例化 logger
logger = configure(folder="logs", format_strings=["stdout", "log", "csv"])
patient:StockTradingEnvCashpenalty(..., patient=False, ...)的時候,DRL的訓練一旦花道沒有現金的時候,就會回傳終止訊號;然而我覺得這樣很怪,所以我覺得訓練時,patient應該要設True,然而一旦patient改成True以後,碰到現金花光的狀況,就會出現類似無窮迴圈的東西,然後獲利開始狂加,所以目前只能保持patient=False`的設定DDPG時常會無法收斂,一毛錢都不花,永遠握著現金啥都不幹直到結束
應該是因為
CashPenalty,但如何克服還不知道
import os
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import A2C, DDPG, PPO, SAC, TD3
from stable_baselines3.common.logger import configure
from finrl.agents.stablebaselines3.models import DRLAgent
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
from finrl.meta.preprocessor.preprocessors import data_split
from finrl.meta.env_stock_trading.env_stocktrading_cashpenalty import (
    StockTradingEnvCashpenalty,
)
from finrl import config_tickers
from PolygonIO.PolygonIODownloader import PolygonIODownloader
from pypfopt.efficient_frontier import EfficientFrontier
from pypfopt import risk_models
import plotly.graph_objs as go
import talib
FEATURE_COLUMNS = [
    "macd",
    "macd_signal",
    "macd_hist",
    "boll_ub",
    "boll_lb",
    "rsi_30",
    "dx_30",
    "close_30_sma",
    "close_60_sma",
]
# 自定義特徵提取函數
def extract_custom_features(df):
    # MACD
    df["macd"], df["macd_signal"], df["macd_hist"] = talib.MACD(
        df["close"], fastperiod=12, slowperiod=26, signalperiod=9
    )
    # Bollinger Bands
    df["boll_ub"], df["boll_lb"] = talib.BBANDS(
        df["close"], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0
    )[:2]
    # RSI (30 period)
    df["rsi_30"] = talib.RSI(df["close"], timeperiod=30)
    # Directional Movement Index (DX)
    df["dx_30"] = talib.DX(df["high"], df["low"], df["close"], timeperiod=30)
    # SMA (Simple Moving Averages)
    df["close_30_sma"] = talib.SMA(df["close"], timeperiod=30)
    df["close_60_sma"] = talib.SMA(df["close"], timeperiod=60)
    # 去除任何 NaN 值
    df = df.fillna(0)
    return df
def train_drl(e_trade_gym, models_info):
    """
    Function to train deep reinforcement learning (DRL) models.
    Parameters:
    - e_trade_gym: The trading environment for backtesting.
    - models_info: A dictionary containing the model class as keys and corresponding training parameters and paths as values.
    Returns:
    - trained_models: A dictionary containing the trained models.
    """
    env_train, _ = e_trade_gym.get_sb_env()
    # Initialize DRLAgent
    agent = DRLAgent(env=env_train)
    # Dictionary to store the trained models
    trained_models = {}
    # Loop through each model class and its associated information
    for model_class, info in models_info.items():
        model_path = info["save_path"]
        if os.path.exists(model_path):
            print(
                f"\u6b63\u5728\u5f9e {model_path} \u52a0\u8f09\u73fe\u6709\u7684 {model_class.__name__} \u6a21\u578b"
            )
            # Load the model using stable-baselines3
            try:
                model = model_class.load(model_path, env=env_train)
                trained_models[model_class.__name__] = model
                print(
                    f"{model_class.__name__} \u6a21\u578b\u52a0\u8f09\u6210\u529f\u3002"
                )
            except Exception as e:
                print(
                    f"\u52a0\u8f09 {model_class.__name__} \u6a21\u578b\u5931\u6557: {e}"
                )
                print(
                    f"\u5c07\u7e7c\u7e8c\u8a13\u7df4 {model_class.__name__} \u6a21\u578b\u3002"
                )
                # Train the model if loading fails
                model = agent.get_model(
                    model_name=model_class.__name__.lower(), model_kwargs=info["params"]
                )
                trained_model = agent.train_model(
                    model=model,
                    tb_log_name=model_class.__name__.lower(),
                    total_timesteps=info["total_timesteps"],
                )
                trained_model.save(model_path)
                trained_models[model_class.__name__] = trained_model
                print(
                    f"{model_class.__name__} \u6a21\u578b\u5df2\u8a13\u7df4\u4e26\u4fdd\u5b58\u5230 {model_path}"
                )
        else:
            print(f"\u6b63\u5728\u8a13\u7df4 {model_class.__name__} \u6a21\u578b...")
            model = agent.get_model(
                model_name=model_class.__name__.lower(), model_kwargs=info["params"]
            )
            trained_model = agent.train_model(
                model=model,
                tb_log_name=model_class.__name__.lower(),
                total_timesteps=info["total_timesteps"],
            )
            trained_model.save(model_path)
            trained_models[model_class.__name__] = trained_model
            print(
                f"{model_class.__name__} \u6a21\u578b\u5df2\u8a13\u7df4\u4e26\u4fdd\u5b58\u5230 {model_path}"
            )
    return trained_models
def backtest_drl(e_trade_gym, trained_models):
    """
    Function to backtest all trained DRL models.
    Parameters:
    - e_trade_gym: The trading environment for backtesting.
    - trained_models: Dictionary of trained models.
    Returns:
    - backtest_results: Dictionary containing daily returns and actions for each model.
    """
    # Initialize backtest results dictionary
    backtest_results = {}
    # Iterate through each trained model for backtesting
    for model_name, model in trained_models.items():
        print(
            f"\u6b63\u5728\u5c0d {model_name} \u6a21\u578b\u9032\u884c\u56de\u6e2c..."
        )
        # Perform DRL prediction using the model
        df_account_value, df_actions = DRLAgent.DRL_prediction(
            model=model, environment=e_trade_gym
        )
        # Calculate daily returns for the model
        df_account_value["daily_return"] = (
            df_account_value["account_value"].pct_change().fillna(0)
        )
        # Store backtest results
        backtest_results[model_name] = {
            "account_value": df_account_value,
            "actions": df_actions,
            "daily_return": df_account_value["daily_return"],
        }
        # Output the first few rows of backtest results for verification
        print(
            f"{model_name} \u6a21\u578b\u7684\u5e33\u6236\u50f9\u503c\u524d\u5e7e\u884c:"
        )
        print(df_account_value.head())
        print(
            f"{model_name} \u6a21\u578b\u7684\u4ea4\u6613\u52d5\u4f5c\u524d\u5e7e\u884c:"
        )
        print(df_actions.head())
    return backtest_results
def main():
    INIT_AMOUNT = 1000000
    TRAIN_START_DATE = "2017-01-01"
    TRAIN_END_DATE = "2023-10-01"
    TRADE_START_DATE = "2023-10-01"
    TRADE_END_DATE = "2024-10-09"
    # TRAIN_START_DATE = "2024-01-01"
    # TRAIN_END_DATE = "2024-07-01"
    # TRADE_START_DATE = "2024-07-01"
    # TRADE_END_DATE = "2024-09-21"
    # train = pd.read_csv("sp500_1hour_2019-09-21_2024-01-01_train.csv")
    # trade = pd.read_csv("sp500_1hour_2024-01-01_2024-09-21_trade.csv")
    # processed_full = pd.concat([train, trade], ignore_index=True)
    # # \u91cd\u547d\u540d\u5217\uff0c\u4ee5\u7b26\u5408FinRL\u5904\u7406\u7684\u9700\u6c42
    TRAINED_MODEL_DIR = f"BTCUSD_15_minute_{TRAIN_START_DATE}_{TRAIN_END_DATE}_StockTradingEnvCashpenalty"
    TRAINED_MODEL_DIR = os.path.join("trained_models", TRAINED_MODEL_DIR)
    os.makedirs(TRAINED_MODEL_DIR, exist_ok=True)
    df_ohlcv = PolygonIODownloader().fetch_ohlcv(
        ["X:BTCUSD"], TRAIN_START_DATE, TRADE_END_DATE, "minute", 15
    )
    df_ohlcv = df_ohlcv.rename(columns={"timestamp": "date", "ticker": "tic"})
    # \u7167\u65e5\u671f\u6392\u5e8f
    df = df_ohlcv.sort_values(["date", "tic"]).reset_index(drop=True)
    # Adding custom features to the dataset
    df = extract_custom_features(df)
    train = data_split(df, TRAIN_START_DATE, TRAIN_END_DATE)
    trade = data_split(df, TRADE_START_DATE, TRADE_END_DATE)
    print(f"Training Data Length: {len(train)}")
    print(f"Trading Data Length: {len(trade)}")
    # Step 2: Define Model Configurations
    models_info = {
        A2C: {
            "params": {"n_steps": 5, "ent_coef": 0.005, "learning_rate": 0.0002},
            "total_timesteps": 50000,
            "save_path": os.path.join(TRAINED_MODEL_DIR, "agent_a2c.zip"),
        },
        PPO: {
            "params": {
                "n_steps": 2048,
                "ent_coef": 0.005,
                "learning_rate": 0.0001,
                "batch_size": 128,
            },
            "total_timesteps": 80000,
            "save_path": os.path.join(TRAINED_MODEL_DIR, "agent_ppo.zip"),
        },
        DDPG: {
            "params": {
                "batch_size": 128,
                "buffer_size": 200000,
                "learning_rate": 0.0001,
            },
            "total_timesteps": 100000,
            "save_path": os.path.join(TRAINED_MODEL_DIR, "agent_ddpg.zip"),
        },
        SAC: {
            "params": {
                "batch_size": 128,
                "buffer_size": 100000,
                "learning_rate": 0.0003,
                "learning_starts": 100,
                "ent_coef": "auto_0.1",
            },
            "total_timesteps": 70000,
            "save_path": os.path.join(TRAINED_MODEL_DIR, "agent_sac.zip"),
        },
        TD3: {
            "params": {
                "batch_size": 100,
                "buffer_size": 1000000,
                "learning_rate": 0.001,
            },
            "total_timesteps": 30000,
            "save_path": os.path.join(TRAINED_MODEL_DIR, "agent_td3.zip"),
        },
    }
    # Step 3: Train DRL Models
    # Initialize StockTradingEnvCashpenalty for training
    stock_dimension = len(train.tic.unique())
    state_space = 1 + len(train.tic.unique()) + len(FEATURE_COLUMNS) * stock_dimension
    print(f"Stock Dimension: {stock_dimension}, State Space: {state_space}")
    buy_cost_list = sell_cost_list = [0.001] * stock_dimension
    env_kwargs = {
        "df": train,
        "buy_cost_pct": 0.001,
        "sell_cost_pct": 0.001,
        "hmax": 100,
        "initial_amount": INIT_AMOUNT,
        "daily_information_cols": FEATURE_COLUMNS,
        "cash_penalty_proportion": 0.1,
        "print_verbosity": 100,  # 10
        "patient": False,
    }
    e_train_gym = StockTradingEnvCashpenalty(**env_kwargs)
    # Train models
    trained_models = train_drl(e_train_gym, models_info)
    # Step 4: Backtest Models
    # Initialize trading environment
    env_kwargs["df"] = trade
    e_trade_gym = StockTradingEnvCashpenalty(**env_kwargs)
    # Backtest trained models
    backtest_results = backtest_drl(e_trade_gym, trained_models)
    trade_dates = pd.to_datetime(trade["date"].unique()).sort_values()
    # Optional: Save backtest results
    print("Backtest Results:")
    for model_name, result in backtest_results.items():
        print(f"{model_name}:")
        print(result["daily_return"].head())
if __name__ == "__main__":
    main()