iT邦幫忙

2025 iThome 鐵人賽

DAY 14
0

目前仍然還在構思中、先寫一個價格切割器:


from __future__ import annotations
import sys, os, argparse, datetime as _dt
from typing import List, Tuple, Optional

import numpy as np
import pandas as pd

from PyQt5 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg

# ---------------------------------------------------------------------
# 視覺化(PyQtGraph)— 與題主給的實作風格一致
# ---------------------------------------------------------------------
class TimeAxisItem(pg.AxisItem):
    def tickStrings(self, values, scale, spacing):
        out = []
        for v in values:
            try:
                out.append(_dt.datetime.fromtimestamp(v).strftime("%Y-%m-%d\n%H:%M"))
            except Exception:
                out.append("")
        return out


class CandlestickItem(pg.GraphicsObject):
    """
    data: ndarray shape (N, 5): [t, open, high, low, close]
    透過 QPicture 快取,避免重複逐根繪圖造成的卡頓。
    """
    def __init__(self, data: np.ndarray):
        super().__init__()
        self.data = data
        self.picture = None
        self._bounds = QtCore.QRectF()
        self._generate()

    def _estimate_width(self) -> float:
        if len(self.data) < 2:
            return 30.0
        dt = float(np.median(np.diff(self.data[:, 0])))
        return max(1.0, 0.3 * dt)

    def _generate(self):
        pic = QtGui.QPicture()
        p = QtGui.QPainter(pic)
        up_brush = QtGui.QBrush(QtGui.QColor("#2ecc71"))
        dn_brush = QtGui.QBrush(QtGui.QColor("#e74c3c"))
        wick_pen = pg.mkPen(0.0)
        w = self._estimate_width()
        for t, o, h, l, c in self.data:
            # wick
            p.setPen(wick_pen)
            p.setBrush(QtCore.Qt.NoBrush)
            p.drawLine(QtCore.QPointF(t, l), QtCore.QPointF(t, h))
            # body
            if c >= o:
                p.setBrush(up_brush)
                rect = QtCore.QRectF(t - w, o, 2*w, max(c - o, 1e-12))
            else:
                p.setBrush(dn_brush)
                rect = QtCore.QRectF(t - w, c, 2*w, max(o - c, 1e-12))
            p.setPen(QtCore.Qt.NoPen)
            p.drawRect(rect)
        p.end()
        self.picture = pic
        self._bounds = QtCore.QRectF(self.picture.boundingRect())

    def paint(self, p, *args):
        if self.picture is not None:
            self.picture.play(p)

    def boundingRect(self):
        return self._bounds


# ---------------------------------------------------------------------
# 主視窗
# ---------------------------------------------------------------------
class LODCandlestickItem(pg.GraphicsObject):
    """
    針對大量 K 線(十萬以上)優化的繪圖物件:
    - 依視窗縮放自動選擇層級(level,代表每一繪製 bar 聚合多少原始 bar)
    - 分塊(tile)+ QPicture 快取,只繪製可見區域的少數 tile
    仍然遵循題主的繪圖思路(QPicture + QPainter),只是把資料量切小。
    """
    def __init__(self, t: np.ndarray, o: np.ndarray, h: np.ndarray, l: np.ndarray, c: np.ndarray,
                 vb: pg.ViewBox, tile_size: int = 2000):
        super().__init__()
        assert t.ndim == o.ndim == h.ndim == l.ndim == c.ndim == 1
        self.t = t.astype(float)
        self.o = o.astype(float)
        self.h = h.astype(float)
        self.l = l.astype(float)
        self.c = c.astype(float)
        self.vb = vb
        self.tile_size = int(max(256, tile_size))  # 每個層級下,單一 tile 的聚合 bar 數
        self.base_dt = float(np.median(np.diff(self.t))) if len(self.t) > 1 else 60.0

        # 快取: (level, tile_idx) -> QPicture
        self._cache = {}
        self._bounds = QtCore.QRectF(
            QtCore.QRectF(
                float(self.t.min()) if len(self.t) else 0.0,
                float(np.nanmin(self.l)) if len(self.l) else 0.0,
                float(self.t.max() - self.t.min()) if len(self.t) else 1.0,
                float(np.nanmax(self.h) - np.nanmin(self.l)) if len(self.h) else 1.0,
            )
        )
        self._last_level = None

        # 視圖變更時觸發重繪(決定 level 與可見 tile)
        if self.vb is not None:
            self.vb.sigXRangeChanged.connect(self._on_view_changed)
            try:
                self.vb.sigResized.connect(self._on_view_changed)
            except Exception:
                pass

    # ---- 公用 ----
    def boundingRect(self):
        return self._bounds

    # ---- 事件 ----
    def _on_view_changed(self, *args):
        self.update()

    # ---- LOD 選擇 ----
    def _choose_level(self) -> int:
        try:
            x0, x1 = self.vb.viewRange()[0]
            span = max(1e-9, float(x1 - x0))
            px = max(1.0, float(self.vb.width()))
        except Exception:
            # 看不到 view 資訊時退回 level=1
            return 1
        sec_per_px = span / px
        # 目標:每根 K 至少佔 ~1 px。level 代表聚合多少原始 bar。
        raw = max(1.0, sec_per_px / max(1e-9, self.base_dt))
        # 取 2 的冪次,利於重複利用快取
        lvl = int(2 ** np.ceil(np.log2(raw)))
        return max(1, min(lvl, 1 << 16))

    def _visible_tile_range(self, level: int):
        # 將目前可視 x 範圍轉換成 level 下的 tile 範圍
        try:
            x0, x1 = self.vb.viewRange()[0]
        except Exception:
            x0 = float(self.t[0]); x1 = float(self.t[-1])
        # 基準索引(原始陣列)
        i0 = int(np.searchsorted(self.t, x0, side='left'))
        i1 = int(np.searchsorted(self.t, x1, side='right'))
        # 轉為 level 下的聚合索引
        j0 = i0 // level
        j1 = max(j0, i1 // level)
        # 轉為 tile 索引
        tile0 = max(0, j0 // self.tile_size)
        tile1 = max(tile0, (j1 + self.tile_size - 1) // self.tile_size)
        return tile0, tile1

    # ---- 圖塊建構 ----
    def _build_tile_picture(self, level: int, tile_idx: int) -> QtGui.QPicture:
        key = (level, tile_idx)
        pic = self._cache.get(key)
        if pic is not None:
            return pic

        # 在原始資料索引空間下切出這個 tile 需要覆蓋的範圍
        base_i0 = tile_idx * self.tile_size * level
        base_i1 = min(len(self.t), base_i0 + self.tile_size * level)
        if base_i0 >= base_i1:
            # 建一個空 picture,避免重複計算
            empty = QtGui.QPicture(); self._cache[key] = empty; return empty

        # 準備繪圖
        pic = QtGui.QPicture()
        qp = QtGui.QPainter(pic)
        up_brush = QtGui.QBrush(QtGui.QColor("#2ecc71"))
        dn_brush = QtGui.QBrush(QtGui.QColor("#e74c3c"))
        wick_pen = pg.mkPen(0.0)
        w = max(1.0, 0.3 * self.base_dt * level)

        # 逐聚合 bar 畫(每個聚合含 level 根原始 K 線)
        # 單 tile 最多 self.tile_size 次迴圈,保證可控
        for j in range(self.tile_size):
            i0 = base_i0 + j * level
            if i0 >= base_i1:
                break
            i1 = min(i0 + level, base_i1)
            t0 = self.t[i0]
            o0 = self.o[i0]
            c0 = self.c[i1 - 1]
            # 高低價需要看整個聚合區間
            h0 = float(np.nanmax(self.h[i0:i1]))
            l0 = float(np.nanmin(self.l[i0:i1]))

            # wick
            qp.setPen(wick_pen)
            qp.setBrush(QtCore.Qt.NoBrush)
            qp.drawLine(QtCore.QPointF(t0, l0), QtCore.QPointF(t0, h0))

            # body
            if c0 >= o0:
                qp.setBrush(up_brush)
                rect = QtCore.QRectF(t0 - w, o0, 2*w, max(c0 - o0, 1e-12))
            else:
                qp.setBrush(dn_brush)
                rect = QtCore.QRectF(t0 - w, c0, 2*w, max(o0 - c0, 1e-12))
            qp.setPen(QtCore.Qt.NoPen)
            qp.drawRect(rect)

        qp.end()
        self._cache[key] = pic
        return pic

    # ---- 繪製 ----
    def paint(self, p, *args):
        level = self._choose_level()
        # 只在 level 改變或視圖改變時繪製;由於我們只回放 QPicture,這裡成本低
        t0, t1 = self._visible_tile_range(level)
        for tile_idx in range(t0, t1 + 1):
            pic = self._build_tile_picture(level, tile_idx)
            if pic is not None:
                pic.play(p)


class MainWindow(QtWidgets.QMainWindow):
    def __init__(self, csv_path: Optional[str] = None):
        super().__init__()
        self.setWindowTitle("CSV Candle Picker (1Y 範圍選取/另存)")
        self.resize(1280, 760)

        # 狀態
        self.df: Optional[pd.DataFrame] = None          # 原始資料(含 open_time/close_time 等)
        self._t: np.ndarray = np.array([], dtype=float)  # x 軸(秒)
        self._candles: Optional[CandlestickItem] = None
        self._sel_region: Optional[pg.LinearRegionItem] = None
        self._sel_start_ts: Optional[float] = None
        self._sel_end_ts: Optional[float] = None
        # 兼容舊版 pyqtgraph:額外用兩條無限垂直線當作邊框
        self._sel_left_line: Optional[pg.InfiniteLine] = None
        self._sel_right_line: Optional[pg.InfiniteLine] = None

        # 版面
        root = QtWidgets.QWidget(self)
        v = QtWidgets.QVBoxLayout(root)
        self.setCentralWidget(root)

        ctrl = QtWidgets.QHBoxLayout()
        self.btn_load = QtWidgets.QPushButton("載入 CSV…")
        self.btn_save = QtWidgets.QPushButton("另存選取…")
        self.btn_save.setEnabled(False)
        self.lab_file = QtWidgets.QLabel("狀態:尚未載入檔案")
        self.lab_file.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse)
        ctrl.addWidget(self.btn_load)
        ctrl.addWidget(self.btn_save)
        ctrl.addStretch(1)
        ctrl.addWidget(self.lab_file)
        v.addLayout(ctrl)

        self.time_axis = TimeAxisItem(orientation="bottom")
        self.pw = pg.PlotWidget(axisItems={"bottom": self.time_axis})
        self.pw.setBackground("w")
        self.pw.showGrid(x=True, y=True, alpha=0.2)
        self.pw.setLabel('left', "Price")
        v.addWidget(self.pw, 1)

        self.status = self.statusBar()

        # 事件
        self.btn_load.clicked.connect(self._on_load_clicked)
        self.btn_save.clicked.connect(self._on_save_clicked)
        self.pw.scene().sigMouseClicked.connect(self._on_mouse_clicked)

        # 預設載入
        if csv_path:
            self.load_csv(csv_path)

    # ------------------------- I/O -------------------------
    def _on_load_clicked(self):
        path, _ = QtWidgets.QFileDialog.getOpenFileName(
            self, "選擇 CSV 檔", os.getcwd(), "CSV Files (*.csv);;All Files (*)"
        )
        if path:
            self.load_csv(path)

    def load_csv(self, path: str):
        try:
            self.df = load_ohlcv_csv(path)
        except Exception as e:
            QtWidgets.QMessageBox.critical(self, "讀取失敗", f"讀取 CSV 發生錯誤\n{e}")
            return
        self.lab_file.setText(f"已載入:{os.path.abspath(path)}  ({len(self.df)} rows)")
        self.status.showMessage("CSV 載入完成,正在繪圖…", 3000)
        self._plot_all()
        self._clear_selection_visual()

    # ------------------------- 繪圖 -------------------------
    def _plot_all(self):
        self.pw.clear()
        self._candles = None
        if self.df is None or self.df.empty:
            return
        t = self.df['t'].to_numpy(dtype=float)
        o = self.df['open'].to_numpy(dtype=float)
        h = self.df['high'].to_numpy(dtype=float)
        l = self.df['low'].to_numpy(dtype=float)
        c = self.df['close'].to_numpy(dtype=float)
        # 建立大量資料友善的 LOD+Tile K 線物件
        vb = self.pw.plotItem.vb
        self._candles = LODCandlestickItem(t, o, h, l, c, vb=vb, tile_size=2000)
        self.pw.addItem(self._candles)
        self._t = t
        # 視窗範圍
        self.pw.setXRange(float(t.min()), float(t.max()), padding=0)
        self.status.showMessage(f"已繪製 K 線:{len(t)} 根", 5000)

    # ------------------------- 選取(點擊) -------------------------
    def _on_mouse_clicked(self, ev):
        if self.df is None or self._t.size == 0:
            return
        try:
            if ev.button() != QtCore.Qt.LeftButton:
                return
        except Exception:
            pass
        pos = ev.scenePos() if hasattr(ev, 'scenePos') else (ev[0].scenePos() if isinstance(ev, (list, tuple)) else None)
        if pos is None:
            return
        if not self.pw.plotItem.vb.sceneBoundingRect().contains(pos):
            return
        mouse_point = self.pw.plotItem.vb.mapSceneToView(pos)
        x = float(mouse_point.x())
        # 以最近的 candle 索引為準
        idx = int(np.argmin(np.abs(self._t - x)))
        if idx < 0 or idx >= self._t.size:
            return
        start_ts = float(self._t[idx])
        start_dt = pd.to_datetime(start_ts, unit='s')
        end_dt = start_dt + pd.DateOffset(years=1)  # 完整 1 年(使用 pandas DateOffset 處理閏年)
        end_ts = float(end_dt.timestamp())
        # 限制在資料範圍內
        tmin, tmax = float(self._t.min()), float(self._t.max())
        end_ts_clipped = min(end_ts, tmax)
        # 視覺化:半透明框選區域
        self._apply_selection_visual(start_ts, end_ts_clipped)
        # 儲存選取邏輯值給另存用
        self._sel_start_ts = start_ts
        self._sel_end_ts = end_ts
        sel_df = self._current_selection_df()
        nrows = 0 if sel_df is None else len(sel_df)
        self.btn_save.setEnabled(nrows > 0)
        self.status.showMessage(
            f"已選取自 {start_dt:%Y-%m-%d %H:%M} 起 1 年區間(實際切到 {pd.to_datetime(end_ts_clipped, unit='s'):%Y-%m-%d %H:%M})筆數 {nrows}",
            6000,
        )

    def _apply_selection_visual(self, x0: float, x1: float):
        # 先移除舊的視覺物件
        if self._sel_region is not None:
            try:
                self.pw.removeItem(self._sel_region)
            except Exception:
                pass
            self._sel_region = None
        for ln in (self._sel_left_line, self._sel_right_line):
            if ln is not None:
                try:
                    self.pw.removeItem(ln)
                except Exception:
                    pass
        self._sel_left_line = None
        self._sel_right_line = None

        # 新增 LinearRegionItem 作為半透明填色
        region = pg.LinearRegionItem(values=(x0, x1))
        region.setZValue(10)
        region.setMovable(False)
        region.setBrush(pg.mkBrush(52, 152, 219, 40))  # 半透明藍
        # 某些舊版 pyqtgraph 的 LinearRegionItem 沒有 setPen;有就設,沒有就忽略
        pen = pg.mkPen('#3498db', width=2)
        if hasattr(region, 'setPen'):
            try:
                region.setPen(pen)
            except Exception:
                pass
        self.pw.addItem(region)
        self._sel_region = region

        # 版本相容:額外加兩條邊界線當邊框(InfiniteLine 支援 setPen)
        self._sel_left_line = pg.InfiniteLine(pos=x0, angle=90, movable=False, pen=pen)
        self._sel_right_line = pg.InfiniteLine(pos=x1, angle=90, movable=False, pen=pen)
        self._sel_left_line.setZValue(11)
        self._sel_right_line.setZValue(11)
        self.pw.addItem(self._sel_left_line)
        self.pw.addItem(self._sel_right_line)
        
    def _clear_selection_visual(self):
        if self._sel_region is not None:
            try:
                self.pw.removeItem(self._sel_region)
            except Exception:
                pass
        for ln in (self._sel_left_line, self._sel_right_line):
            if ln is not None:
                try:
                    self.pw.removeItem(ln)
                except Exception:
                    pass
        self._sel_region = None
        self._sel_left_line = None
        self._sel_right_line = None
        self._sel_start_ts = None
        self._sel_end_ts = None
        self.btn_save.setEnabled(False)

    def _current_selection_df(self) -> Optional[pd.DataFrame]:
        if self.df is None or self._sel_start_ts is None or self._sel_end_ts is None:
            return None
        # 注意:儲存時使用 [start, end) 半開區間,比較穩定
        start_dt = pd.to_datetime(self._sel_start_ts, unit='s')
        end_dt = pd.to_datetime(self._sel_end_ts, unit='s')
        mask = (self.df['open_time'] >= start_dt) & (self.df['open_time'] < end_dt)
        sel = self.df.loc[mask].copy()
        return sel

    # ------------------------- 另存 -------------------------
    def _on_save_clicked(self):
        sel = self._current_selection_df()
        if sel is None or sel.empty:
            QtWidgets.QMessageBox.information(self, "沒有資料", "目前沒有可另存的選取資料。請先在圖上點擊一根 K 線。")
            return
        s0 = sel['open_time'].iloc[0]
        s1 = sel['open_time'].iloc[-1]
        default_name = f"slice_{s0:%Y%m%d_%H%M}_{s1:%Y%m%d_%H%M}.csv"
        path, _ = QtWidgets.QFileDialog.getSaveFileName(
            self, "另存選取為 CSV", os.path.join(os.getcwd(), default_name), "CSV Files (*.csv)"
        )
        if not path:
            return
        # 保留原始欄位順序輸出
        cols = list(self.df.columns)
        try:
            sel.to_csv(path, index=False, columns=cols)
        except Exception as e:
            QtWidgets.QMessageBox.critical(self, "寫入失敗", f"儲存 CSV 發生錯誤\n{e}")
            return
        self.status.showMessage(f"已另存:{path}", 6000)
        # 清除框選效果
        self._clear_selection_visual()


# ---------------------------------------------------------------------
# CSV 讀取
# ---------------------------------------------------------------------

def load_ohlcv_csv(path: str) -> pd.DataFrame:
    """
    讀入題主格式 CSV,回傳 DataFrame,並附加:
    - open_time(datetime64[ns])
    - close_time(datetime64[ns])
    - t:x 軸用之 Unix 秒(float)
    會依 open_time 排序並去除 open_time 缺失值。
    """
    # dtype 推測交由 pandas,僅明確指定數字欄避免 object
    df = pd.read_csv(path)
    required = [
        'open_time','open','high','low','close','volume',
        'close_time','quote_asset_volume','number_of_trades','taker_buy_base','taker_buy_quote'
    ]
    miss = [c for c in required if c not in df.columns]
    if miss:
        raise ValueError(f"CSV 欄位缺少:{miss}")
    # 解析時間
    df['open_time'] = pd.to_datetime(df['open_time'], errors='coerce')
    df['close_time'] = pd.to_datetime(df['close_time'], errors='coerce')
    # 清理
    df = df.dropna(subset=['open_time']).copy()
    df.sort_values('open_time', inplace=True)
    # x 軸(秒)
    df['t'] = df['open_time'].astype('int64') / 1e9
    # 確保數值欄為 float
    for col in ['open','high','low','close','volume','quote_asset_volume','taker_buy_base','taker_buy_quote']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    df['number_of_trades'] = pd.to_numeric(df['number_of_trades'], errors='coerce').astype('Int64')
    df = df.reset_index(drop=True)
    return df


# ---------------------------------------------------------------------
# 進入點
# ---------------------------------------------------------------------

def parse_args():
    ap = argparse.ArgumentParser(description="CSV Candle Picker")
    ap.add_argument('--csv', type=str, default=None, help='啟動時直接載入的 CSV 路徑')
    return ap.parse_args()


def main():
    args = parse_args()

    # 繪圖設定(與題主風格一致)
    pg.setConfigOptions(antialias=False)
    pg.setConfigOption('background', 'w')
    pg.setConfigOption('foreground', 'k')

    app = QtWidgets.QApplication(sys.argv)
    win = MainWindow(csv_path=args.csv)
    win.show()
    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

上一篇
Day13 - 本地開發價格繪製網頁,Debug3
下一篇
Day15 - Offline Debug
系列文
從零開始:AWS 部署 Python 自動交易程式與交易監測 Dashboard 實戰筆記18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言