iT邦幫忙

2024 iThome 鐵人賽

DAY 13
0
AI/ ML & Data

自動交易程式探索系列 第 13

Day 13 - 使用 Polygon.io 抓大量日內數據 (2/3)

  • 分享至 

  • xImage
  •  

為了簡化使用 Polygon.io API 抓取數據的過程,我將核心功能封裝成一個名為 PolygonIODownloader 的 Python 類。這個類能夠輕鬆地從 Polygon.io 抓取數據,並將其轉換為與 FinRL 兼容的 OHLCV 格式。

安裝 PolygonIODownloader 所需的依賴

首先,確保已安裝 Polygon.io 的官方 Python 客戶端:

pip install polygon-api-client

此外,還需要安裝 FinRL 及其依賴:

pip install finrl

PolygonIODownloader程式碼

from polygon import RESTClient
import os
import pandas as pd
from datetime import datetime, timedelta


class FinCSVStorage:

    def __init__(self, root_dir: str = './'):
        """
        初始化存儲路徑, 文件將依據指定的命名規則保存。
        文件名格式為:'{ticker}_{start_date_str}_{end_date_str}_{multiplier}_{timespan}_{format}.csv'。
        文件將被存儲在 '{root_dir}/{cls}/' 目錄中, 其中 cls 是呼叫者的類別名。
        若目錄不存在, 系統會自動創建該路徑以確保文件正確保存。
        """
        self.root_dir = root_dir

    def get_file_path(self, cls, ticker: str, start_date_str: str,
                      end_date_str: str, timespan: str, multiplier: int,
                      format: str) -> str:
        """
        構建CSV文件的完整文件路徑, 根據指定的參數來生成。
        """
        ### 加密貨幣會使用到特殊字符`:`, 因此生成檔案路徑時要額外處理一下
        ticker = ticker.replace(':', '__')
        class_name = cls.__name__  # 只使用類名,避免特殊字符
        filename = f'{ticker}_{start_date_str}_{end_date_str}_{multiplier}_{timespan}_{format}.csv'
        file_path = os.path.join(self.root_dir, class_name, filename)
        return file_path

    def save_csv(self, cls, ticker: str, start_date_str: str,
                 end_date_str: str, timespan: str, multiplier: int,
                 format: str, df: pd.DataFrame):
        """
        保存DataFrame為CSV文件, 路徑通過_get_file_path函數獲取。
        """
        file_path = self.get_file_path(cls, ticker, start_date_str,
                                       end_date_str, timespan, multiplier,
                                       format)

        # 如果目錄不存在, 則創建它
        os.makedirs(os.path.dirname(file_path), exist_ok=True)

        # 保存DataFrame為CSV
        df.to_csv(file_path, index=False)
        print(f"Data saved to {file_path}")

    def read_csv(self, cls, ticker: str, start_date_str: str,
                 end_date_str: str, timespan: str, multiplier: int,
                 format: str) -> pd.DataFrame:
        """
        檢查指定的CSV文件是否存在, 並通過_get_file_path函數獲取文件路徑。
        """
        file_path = self.get_file_path(cls, ticker, start_date_str,
                                       end_date_str, timespan, multiplier,
                                       format)

        # 如果文件存在, 則讀取CSV
        if os.path.exists(file_path):
            print(f"Loading data from {file_path}...")
            return pd.read_csv(file_path)
        else:
            return None


class PolygonIOCore:

    def __init__(self, root_dir: str = './datasets'):
        """
        初始化 PolygonIODownloader 類別。

        :param root_dir: 指定存儲股票數據的根目錄路徑, 默認為 './datasets'。
                         數據將以CSV文件形式存儲, 路徑格式為 '{root_dir}/{cls}/'。
                         如果 root_dir 設為 None, 將不啟用本地存儲功能, 數據僅從API抓取並返回, 不會保存。
                         預設情況下會自動創建所需的目錄以保存數據。
        """
        # 在此處寫死你的Polygon.io API金鑰
        from PolygonIO.config import API_KEY
        self.api_key = API_KEY
        self.client = RESTClient(self.api_key)  # 創建一個REST客戶端
        self.root_dir = None  # 保存的目錄
        self.save_csv_enabled = False
        ### 在此處啟用/關閉自動保存CSV文件的功能 ###
        if root_dir is not None:
            self.storage = FinCSVStorage(root_dir)
        else:
            self.storage = None

    def fetch(self,
              ticker: str,
              start_date_str: str,
              end_date_str: str,
              timespan: str,
              multiplier: int = 1) -> pd.DataFrame:
        """
        從 Polygon.io 獲取指定股票在給定時間範圍內的聚合數據,並支援自動保存數據到 CSV。
        支援的時間間隔包括 'second'、'minute'、'hour'、'day' 和 'week',並自動處理大數據量的分頁請求。

        :param ticker: 股票代碼,如 'AAPL'
        :param start_date_str: 開始日期,格式為 'YYYY-MM-DD'
        :param end_date_str: 結束日期,格式為 'YYYY-MM-DD'
        :param timespan: 請求的時間間隔,支援 'second'、'minute'、'hour'、'day' 和 'week'
        :param multiplier: 時間間隔的倍數,默認為 1。例如,若 multiplier=15,且 timespan='minute',表示每 15 分鐘為一個單位
        :return: 回傳抓取到的原始數據,並自動合併所有區段為一個 DataFrame
        """

        # 檢查是否已有相同的數據保存到CSV文件
        if self.storage is not None:
            df = self.storage.read_csv(self.__class__, ticker, start_date_str,
                                       end_date_str, timespan, multiplier,
                                       'raw')
            if df is not None:
                return df  # 如果找到匹配的文件, 則直接返回數據

        # 使用 list_aggs 自動處理大數據的分頁
        aggs = []
        for a in self.client.list_aggs(ticker=ticker,
                                       multiplier=multiplier,
                                       timespan=timespan,
                                       from_=start_date_str,
                                       to=end_date_str,
                                       limit=5000):
            aggs.append(a)

        # 將數據轉換為Pandas DataFrame
        df = pd.DataFrame([agg.__dict__ for agg in aggs])

        # 如果啟用了CSV保存, 則調用save_csv保存原始數據
        if self.storage is not None and len(df) > 0:
            self.storage.save_csv(self.__class__, ticker, start_date_str,
                                  end_date_str, timespan, multiplier, 'raw',
                                  df)

        return df

    def raw2utc(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        將原始數據中的時間戳轉換為UTC格式。

        :param df: 原始數據的DataFrame
        :return: 時間戳轉換為UTC的DataFrame
        """
        # 轉換時間戳為可讀格式 (UTC)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
        cols = ['timestamp'] + [col for col in df if col != 'timestamp'
                                ]  # 將timestamp設為第一列
        df = df[cols]
        return df

    def raw2ohlcv(self, df: pd.DataFrame, ticker: str) -> pd.DataFrame:
        """
        將原始數據轉換為OHLCV格式, 並加入 ticker 標識符。
        """
        # 轉換時間戳為可讀格式 (UTC)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)

        # 選擇並重命名需要的欄位,並加入'tic'欄位
        df = df[['timestamp', 'open', 'high', 'low', 'close',
                 'volume']].copy()  # 使用 .copy() 避免副本警告
        df.loc[:, 'ticker'] = ticker  # 使用 .loc 設定'tic'欄位來標識資產

        return df


class PolygonIODownloader(PolygonIOCore):

    def __init__(self, root_dir: str = './datasets'):
        super().__init__(root_dir)

    def combine_df(self, df_list: list) -> pd.DataFrame:
        # 合併所有股票的數據
        df_combined = pd.concat(df_list, ignore_index=True)

        # 根據 'timestamp' 進行排序
        df_combined = df_combined.sort_values(by='timestamp').reset_index(
            drop=True)

        return df_combined

    def fetch_raw(self,
                  ticker_list: list,
                  start_date_str: str,
                  end_date_str: str,
                  timespan: str,
                  multiplier: int = 1) -> pd.DataFrame:
        """
        從Polygon.io獲取多支股票在給定時間範圍內的原始數據。
        
        :param ticker_list: 股票代碼列表, 例如 ['AAPL', 'MSFT']
        :param start_date_str: 開始日期 (YYYY-MM-DD)
        :param end_date_str: 結束日期 (YYYY-MM-DD)
        :param timespan: 時間間隔類型 ('minute', 'hour', 'day', 'week')
        :param multiplier: 時間間隔的倍數(例如 15 表示 15 分鐘, 默認為 1)
        :return: 合併後的DataFrame, 包含多支股票的原始數據
        """
        all_data = []

        # 依次處理每個ticker
        for ticker in ticker_list:
            df = self.fetch(ticker, start_date_str, end_date_str, timespan,
                            multiplier)

            # 加入 ticker 欄位標識每支股票
            df['ticker'] = ticker
            all_data.append(df)

        return self.combine_df(all_data)

    def fetch_utc(self,
                  ticker_list: list,
                  start_date_str: str,
                  end_date_str: str,
                  timespan: str,
                  multiplier: int = 1) -> pd.DataFrame:
        """
        從Polygon.io獲取指定股票列表在給定時間範圍內的數據,並將時間戳轉換為UTC格式。

        :param ticker_list: 股票代碼列表, 例如 ['AAPL', 'MSFT']
        :param start_date_str: 開始日期 (YYYY-MM-DD)
        :param end_date_str: 結束日期 (YYYY-MM-DD)
        :param timespan: 時間間隔類型 ('minute', 'hour', 'day', 'week')
        :param multiplier: 時間間隔的倍數(例如 15 表示 15 分鐘, 默認為 1)
        :return: 合併後的DataFrame, 包含多支股票的UTC時間格式數據
        """
        all_data = []

        # 依次處理每個ticker
        for ticker in ticker_list:
            df = self.fetch(ticker, start_date_str, end_date_str, timespan,
                            multiplier)

            # 將時間戳轉換為UTC格式
            df['timestamp'] = pd.to_datetime(df['timestamp'],
                                             unit='ms',
                                             utc=True)
            df.loc[:, 'ticker'] = ticker  # 加入ticker欄位

            all_data.append(df)

        return self.combine_df(all_data)

    def fetch_ohlcv(self,
                    ticker_list: list,
                    start_date_str: str,
                    end_date_str: str,
                    timespan: str,
                    multiplier: int = 1) -> pd.DataFrame:
        """
        從Polygon.io獲取指定股票列表在給定時間範圍內的OHLCV數據。

        :param ticker_list: 股票代碼列表, 例如 ['AAPL', 'MSFT']
        :param start_date_str: 開始日期 (YYYY-MM-DD)
        :param end_date_str: 結束日期 (YYYY-MM-DD)
        :param timespan: 時間間隔類型 ('minute', 'hour', 'day', 'week')
        :param multiplier: 時間間隔的倍數(例如 15 表示 15 分鐘, 默認為 1)
        :return: 合併後的DataFrame, 包含多支股票的OHLCV數據
        """
        all_data = []

        # 依次處理每個ticker
        for ticker in ticker_list:
            df = self.fetch(ticker, start_date_str, end_date_str, timespan,
                            multiplier)

            # 將時間戳轉換為UTC格式
            df['timestamp'] = pd.to_datetime(df['timestamp'],
                                             unit='ms',
                                             utc=True)

            # 將原始數據轉換為OHLCV格式,並加入ticker欄位
            df_ohlcv = df[[
                'timestamp', 'open', 'high', 'low', 'close', 'volume'
            ]].copy()
            df_ohlcv.loc[:, 'ticker'] = ticker  # 加入ticker欄位

            all_data.append(df_ohlcv)

        return self.combine_df(all_data)


def main():
    # 初始化 PolygonIODownloader
    polygon_wrapper = PolygonIODownloader()

    # 設定股票代碼和日期範圍
    ticker_list = ['AAPL']
    start_date_str = '2020-01-01'
    end_date_str = '2023-09-21'

    # 獲取1小時的OHLCV數據
    print("Fetching 1-hour OHLCV data...")
    df_1h_ohlcv = polygon_wrapper.fetch_ohlcv(ticker_list,
                                              start_date_str,
                                              end_date_str,
                                              timespan='hour')
    print(df_1h_ohlcv.head())  # 顯示OHLCV數據

    # 使用 FinRL 提取特徵
    from finrl.meta.preprocessor.preprocessors import FeatureEngineer

    # 初始化特徵工程器
    fe = FeatureEngineer(use_technical_indicator=True,
                         tech_indicator_list=['macd', 'rsi', 'cci', 'adx'],
                         use_turbulence=False)

    # 對抓取的數據進行特徵提取
    # FinRL 需要的特徵數據格式為:'timestamp'->'date', 'ticker'->'tic'
    df_1h_ohlcv = df_1h_ohlcv.rename(columns={
        'timestamp': 'date',
        'ticker': 'tic'
    })
    df_features = fe.preprocess_data(df_1h_ohlcv)
    print(df_features.head())  # 顯示提取後的特徵數據


if __name__ == "__main__":
    import sys
    # 將當前腳本所在目錄的父目錄添加到 sys.path
    sys.path.append(os.path.dirname(os.path.dirname(
        os.path.abspath(__file__))))
    main()

如何使用 PolygonIODownloader 進行數據抓取

PolygonIODownloader 範例

以下是一個使用 PolygonIODownloader 抓取 AAPLMSFT 股票 1 小時級別數據的範例:

# 初始化 PolygonIODownloader
polygon_wrapper = PolygonIODownloader()

# 設定股票代碼和日期範圍
ticker_list = ['AAPL', 'MSFT']
start_date_str = '2019-07-01'
end_date_str = '2024-09-01'

# 獲取1小時級別的OHLCV數據
df_1h_ohlcv = polygon_wrapper.fetch_ohlcv(ticker_list, start_date_str, end_date_str, timespan='hour')

# 修改timestamp欄位名稱為FinRL預期的 'date', 並將ticker修改為 'tic'
df_1h_ohlcv = df_1h_ohlcv.rename(columns={'timestamp': 'date', 'ticker': 'tic'})

print(df_1h_ohlcv.head())  # 顯示OHLCV數據

PolygonIODownloader 的核心功能

PolygonIODownloader 類封裝了從 Polygon.io 抓取數據的核心功能,具體包括:

  • 初始化客戶端:自動使用環境變數中的 API 金鑰進行初始化。
  • 抓取 OHLCV 數據:支持多個股票代碼和多種時間框架(如每小時、每分鐘)。
  • 自動處理分頁:確保抓取大量數據時不會遺漏。
  • 轉換數據格式:將抓取的數據轉換為與 FinRL 兼容的 OHLCV 格式。

fetch_ohlcv 回傳的數據格式

抓取的數據包含以下欄位:

  • timestamp 或修改後的 date: 每筆交易的時間戳(UTC格式)
  • open: 開盤價
  • high: 最高價
  • low: 最低價
  • close: 收盤價
  • volume: 交易量
  • ticker 或修改後的 tic: 股票代碼(例如 AAPL, MSFT

特徵提取與 FinRL 整合

FinRL 的特徵提取功能要求數據欄位名稱為 date 而非 timestamp。在處理非日線級別數據時,只需簡單修改欄位名稱即可使其兼容。

此外,技術指標(如 MACDRSI)依賴於參數設置。如果將這些指標應用於小時或分鐘級別數據,而不調整參數,可能會導致不準確的特徵提取結果。因此,對於短時間框架的數據,在使用 FinRL 提取特徵時,需特別注意指標參數的調整,以確保結果準確。

以下是使用 FinRL 的特徵提取工具進行數據處理的範例:

# 初始化 FinRL 的特徵提取工具
from finrl.meta.preprocessor.preprocessors import FeatureEngineer

fe = FeatureEngineer(
    use_technical_indicator=True, 
    tech_indicator_list=["macd", "rsi", "cci", "adx"]
)

# 將抓取的OHLCV數據進行特徵提取
df_features = fe.preprocess_data(df_1h_ohlcv)
print(df_features.head())  # 顯示提取後的特徵數據

結論

通過封裝 PolygonIODownloader 類,簡化了從 Polygon.io 抓取數據的流程,並確保數據格式與 FinRL 的兼容性。這使得用戶能夠更輕鬆地進行高頻或短線交易策略的數據抓取與分析。結合 FinRL 的特徵提取工具,可以進行更高級的量化分析,但需謹慎調整技術指標參數,以適應不同時間框架的數據需求。


上一篇
Day 12 - 使用 Polygon.io 抓大量日內數據 (1/3)
系列文
自動交易程式探索13
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言