iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0

在繪製交易結果跟績效時,只有log沒法還原交易內容,因此我寫了一個抓價格的程式:

import os
import re
import shutil
import zipfile
import argparse
from datetime import datetime, timedelta
from pathlib import Path
from typing import List

import pandas as pd
import requests

from typing import Tuple, List, Dict

# 需要转换为浮点数的列
_FLOAT_COLS = [
    "open",
    "high",
    "low",
    "close",
    "volume",
    "quote_asset_volume",
    "taker_buy_base",
    "taker_buy_quote",
]

class BinanceKlinesFetcher:
    """
    下載並整理 Binance UM futures K 線(klines)資料的工具類別。
    """
    
    _COLUMNS = [
        "open_time(ms)", "open", "high", "low", "close",
        "volume", "close_time(ms)", "quote_volume",
        "trade_count", "taker_buy_base_volume",
        "taker_buy_quote_volume", "ignore"
    ]

    # Binance 官方允許的頻率(interval)
    _ALLOWED_FREQS = {
        "12h", "15m", "1d", "1h", "1m", "1mo", "1w", "2h",
        "30m", "3d", "3m", "4h", "5m", "6h", "8h"
    }

    _BASE_URL = (
        "https://data.binance.vision/data/futures/um/daily/klines/"
        "{asset}/{freq}/{asset}-{freq}-{date}.zip"
    )

    def __init__(self,
                 session=None,
                 timeout: int = 20,
                 chunk_size: int = 1 << 16):
        """
        :param session: 共用 requests.Session(可重用連線),若為 None 則自建。
        :param timeout: requests 逾時秒數。
        :param chunk_size: 下載時每次讀取的位元組數。
        """
        self.session = session or requests.Session()
        self.timeout = timeout
        self.chunk_size = chunk_size

    def _read_clean_csv(self, path: Path) -> pd.DataFrame:
        """
        讀單一日 csv:
        1. 強制加入欄位名稱 (_COLUMNS)
        2. 若第一列 open_time 不是數字,代表它是標頭列 → 丟掉
        """
        df_tmp = pd.read_csv(path, header=None, names=self._COLUMNS)

        # 判斷第一筆 open_time 是否純數字(毫秒整數);若不是就剃掉那一列
        if not str(df_tmp.iat[0, 0]).isdigit():
            df_tmp = df_tmp.iloc[1:]

        return df_tmp

    # --------------------------------------------------------------------- #
    #  Public API
    # --------------------------------------------------------------------- #
    def fetch(
        self,
        asset: str = "ETHUSDT",
        start_date: str = "2019-12-31",
        end_date: str = "20250506",
        freq: str = "1m",
        cache_dir=None,
    ) -> pd.DataFrame:
        """
        批次抓取日 zip、解壓後拼接為單一 DataFrame。

        :param asset: 幣種(例:BTCUSDT, ETHUSDT)
        :param start_date: 開始日期(YYYY-MM-DD 或 YYYYMMDD)
        :param end_date: 結束日期(YYYY-MM-DD 或 YYYYMMDD)
        :param freq: 週期(必須在 _ALLOWED_FREQS 中)
        :param cache_dir: csv 快取資料夾;預設為「{asset}」資料夾
        :return: 拼接後的 pandas.DataFrame
        :raises ValueError: 參數錯誤
        :raises RuntimeError: 某一天的資料抓取失敗
        """
        self._validate_params(asset, start_date, end_date, freq)

        cache_path = Path(cache_dir or asset)
        cache_path.mkdir(parents=True, exist_ok=True)

        all_paths: List[Path] = []
        for d in self._daterange(start_date, end_date):
            csv_path = cache_path / f"{asset}-{freq}-{d}.csv"
            if not csv_path.exists():
                # 尚未快取 → 下載並解壓
                url = self._BASE_URL.format(asset=asset, freq=freq, date=d)
                self._download_and_extract(url, csv_path)
            all_paths.append(csv_path)

        # 讀取並串接
        dfs = [self._read_clean_csv(p) for p in all_paths]
        df = pd.concat(dfs, ignore_index=True)
        return df

    # --------------------------------------------------------------------- #
    #  Internal helpers
    # --------------------------------------------------------------------- #
    def _validate_params(self, asset: str, start: str, end: str, freq: str) -> None:
        if not re.fullmatch(r"[A-Z0-9]{6,}", asset):
            raise ValueError(f"asset 格式錯誤:{asset!r}")
        if freq not in self._ALLOWED_FREQS:
            raise ValueError(
                f"freq 必須為 {sorted(self._ALLOWED_FREQS)} 之一,目前為 {freq!r}"
            )
        # 日期解析驗證
        self._parse_date(start)
        self._parse_date(end)

    @staticmethod
    def _parse_date(date_str: str) -> str:
        """
        支援 'YYYY-MM-DD' 或 'YYYYMMDD',回傳 'YYYY-MM-DD'。
        """
        try:
            if "-" in date_str:
                dt = datetime.strptime(date_str, "%Y-%m-%d")
            else:
                dt = datetime.strptime(date_str, "%Y%m%d")
        except ValueError:
            raise ValueError(f"日期格式錯誤:{date_str!r}")
        return dt.strftime("%Y-%m-%d")

    def _daterange(self, start: str, end: str):
        start_dt = datetime.strptime(self._parse_date(start), "%Y-%m-%d")
        end_dt = datetime.strptime(self._parse_date(end), "%Y-%m-%d")
        if start_dt > end_dt:
            raise ValueError("start_date 不可晚於 end_date")
        cur = start_dt
        while cur <= end_dt:
            yield cur.strftime("%Y-%m-%d")
            cur += timedelta(days=1)

    def _download_and_extract(self, url: str, target_csv: Path) -> None:
        """
        下載 zip → 解壓縮 → 刪除 zip → 產生 csv。
        :raises RuntimeError: 下載失敗或 zip 格式不符
        """
        tmp_zip = target_csv.with_suffix(".zip")
        try:
            with self.session.get(url, stream=True, timeout=self.timeout) as resp:
                if resp.status_code != 200:
                    raise RuntimeError(f"{url} 回傳 {resp.status_code}")

                with open(tmp_zip, "wb") as fp:
                    for chunk in resp.iter_content(chunk_size=self.chunk_size):
                        if chunk:
                            fp.write(chunk)

            with zipfile.ZipFile(tmp_zip) as zf:
                # Binance 的 zip 通常只有 1 個 csv
                members = [m for m in zf.namelist() if m.endswith(".csv")]
                if not members:
                    raise RuntimeError(f"{url} 內無 csv 檔")
                # 解壓到 cache 目錄;改名為 target_csv
                zf.extract(members[0], target_csv.parent)
                extracted = target_csv.parent / members[0]
                extracted.rename(target_csv)
        finally:
            # 刪除 zip,即使失敗也嘗試清掉
            if tmp_zip.exists():
                tmp_zip.unlink(missing_ok=True)

class DataChecker:
    """
    檢查 K 線 DataFrame 是否在指定 interval 上完整 (missing)、
    或存在重複 (duplicates)。
    """

    # Binance → pandas offset alias 對照
    _FREQ_MAP = {
        "1m": "1T",   "3m": "3T",   "5m": "5T",   "15m": "15T",  "30m": "30T",
        "1h": "1H",   "2h": "2H",   "4h": "4H",   "6h": "6H",
        "8h": "8H",   "12h": "12H",
        "1d": "1D",   "3d": "3D",
        "1w": "1W",   # 周一對齊 (pandas 預設)
        "1mo": "1M",  # 每月期初
    }

    @classmethod
    def check(
        cls,
        df: pd.DataFrame,
        freq: str,
        time_col: str = "open_time",
        inplace_sort: bool = False,
    ) -> Tuple[List[pd.Timestamp], Dict[pd.Timestamp, int]]:
        """
        依據 freq 檢查缺漏與重複。

        Parameters
        ----------
        df : pandas.DataFrame
            至少含有 `time_col` 欄位;型別為 datetime64[ns] 或 timestamp (ms) 皆可。
        freq : str
            Binance 週期字串(1m, 3m, ..., 1d, 1w, 1mo)
        time_col : str, default "open_time"
            指定時間欄位名稱。
        inplace_sort : bool, default False
            是否就地排序;預設為 `False` 會回傳排序後的新 DataFrame。

        Returns
        -------
        missing : List[pandas.Timestamp]
            缺少的時間戳(依 freq 應該要有但沒出現)。
        duplicates : Dict[pandas.Timestamp, int]
            重複出現的時間戳及其次數(次數 ≥ 2)。
        """
        # --- 1. freq 轉換 ---
        alias = cls._FREQ_MAP.get(freq)
        if alias is None:
            raise ValueError(f"不支援的 freq:{freq}")

        # --- 2. 轉換 / 排序 ---
        series = df[time_col]
        if pd.api.types.is_integer_dtype(series):
            # 假設是毫秒 timestamp
            series = pd.to_datetime(series, unit="ms")
        elif not pd.api.types.is_datetime64_any_dtype(series):
            raise TypeError(f"{time_col} 必須為 datetime 或 int(ms)")

        if not inplace_sort:
            series = series.sort_values()

        # --- 3. 找重複 ---
        dup_mask = series.duplicated(keep=False)
        duplicates = (
            series[dup_mask]
            .value_counts()
            .to_dict()
        )

        # --- 4. 找缺漏 ---
        # 以第一筆、最後一筆建立完整區間
        start, end = series.iloc[0], series.iloc[-1]
        expected = pd.date_range(start=start, end=end, freq=alias, inclusive="both")
        missing = expected.difference(series.unique()).to_list()

        # --- 5. 簡易統計輸出 ---
        print(f"[DataChecker] 週期={freq} | 應有={len(expected):,} 筆"
              f" | 實際={series.nunique():,} | 缺漏={len(missing):,} | 重複={len(duplicates):,}")

        return missing, duplicates

# ------------------------------ 使用示例 ------------------------------ #
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='下载并处理 Binance 期货 K线数据')
    parser.add_argument('--asset', type=str, default='ETHUSDT',
                      help='交易对名称 (默认: ETHUSDT)')
    parser.add_argument('--start-date', type=str, default='2019-12-31',
                      help='开始日期 (YYYY-MM-DD 或 YYYYMMDD) (默认: 2019-12-31)')
    parser.add_argument('--end-date', type=str, default='2025-05-06',
                      help='结束日期 (YYYY-MM-DD 或 YYYYMMDD) (默认: 2025-05-06)')
    parser.add_argument('--freq', type=str, default='1m',
                      help='K线周期 (默认: 1m)')
    parser.add_argument('--cache_dir', type=str, default='',
                      help='K线周期 (默认: 1m)')
    parser.add_argument('--out', type=str,
                      help='输出CSV文件名 (默认: {asset}_{freq}.csv)')
    
    args = parser.parse_args()
    
    # 如果没有指定输出文件名,则使用参数组合
    start_date = args.start_date.replace("-", "")
    end_date = args.end_date.replace("-", "")
    
    if args.out is None:
        # 将日期格式统一转换为 YYYYMMDD
        default_out = f"{args.asset}_{args.freq}_{start_date}_{end_date}"
        args.out = f"{default_out}.csv"
    
    if args.cache_dir is None or args.cache_dir == '':
        args.cache_dir = f"_{args.asset}_{args.freq}_cache"
        os.makedirs(args.cache_dir, exist_ok=True)
    
    fetcher = BinanceKlinesFetcher()
    df = fetcher.fetch(
        asset=args.asset,
        start_date=args.start_date,
        end_date=args.end_date,
        freq=args.freq,
        cache_dir=args.cache_dir
    )
    print(df.head())

    df["open_time"] = pd.to_datetime(df["open_time(ms)"], unit="ms")
    df["close_time"] = pd.to_datetime(df["close_time(ms)"], unit="ms")
    df["timestamp"] = df["close_time"]

    missing, duplicates = DataChecker.check(df, freq=args.freq, time_col="open_time")

    # 若要強制資料完整,可在這裡 raise
    if missing or duplicates:
        raise RuntimeError(
            f"缺漏 {len(missing)} 個時間點,重複 {len(duplicates)} 個時間點,請檢查資料完整性"
        )
    
    # 處理DataFrame:根據預定義映射重命名列
    df = df.rename(columns={
        "trade_count": "trades",
        "quote_volume": "quote_asset_volume"
    })
    
    # 將_FLOAT_COLS中的列轉換為浮點數
    for col in _FLOAT_COLS:
        if col in df.columns:
            df[col] = df[col].astype(float)
    
    # 保存处理后的数据到CSV
    df.to_csv(args.out, index=False)
    print(f"数据已保存到: {args.out}")

上一篇
Day 16 - 部屬程式
下一篇
Day 18 - 回測程式
系列文
從零開始:AWS 部署 Python 自動交易程式與交易監測 Dashboard 實戰筆記18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言