iT邦幫忙

2025 iThome 鐵人賽

DAY 12
0

經過一段時間的研究,我覺得目前在網頁上畫數據這個功能有點難開發,主要是數據量有點大;目前可能會改成網頁提供下載價格跟訂單的log檔案;然後,想要畫的話,再用本地端得程式去畫log,網頁端還是以盈虧數據跟簡單的資產vs市場價格折線圖為主。

以下是本地端用來畫較大數據量價格圖的程式,一分鐘得數據畫起來還是很卡,仍需想辦法切割數據跟優化。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
高速 K 線繪圖(讀取 data.csv)
- 欄位需求:open_time, open, high, low, close
- 主要優化:
  1) 單一 QPainterPath 合併繪製(紅/綠 K、影線)
  2) 視窗範圍裁切,只渲染可視區間
  3) LOD:當每根寬度 < 1px 時改畫收盤價線
"""

import sys
import numpy as np
import pandas as pd
from PyQt5 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg


# ------------------------- 時間軸顯示 -------------------------
class TimeAxisItem(pg.AxisItem):
    def tickStrings(self, values, scale, spacing):
        out = []
        for v in values:
            try:
                out.append(QtCore.QDateTime.fromSecsSinceEpoch(int(v)).toString("yyyy-MM-dd\nHH:mm"))
            except Exception:
                out.append("")
        return out


# ------------------------- I/O 與預處理 -------------------------
def to_epoch_seconds(series: pd.Series) -> pd.Series:
    s = pd.to_datetime(series, errors="coerce", utc=False)
    # naive 直接當本地時間處理
    s = s.dt.tz_localize(None)
    # 轉 epoch(以本地時區理解後再視作本地 epoch;對單機回放來說足夠)
    return (pd.to_datetime(s).astype("int64") // 10**9).astype("int64")


def load_prices_csv(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    req = {"open_time", "open", "high", "low", "close"}
    if not req.issubset(df.columns):
        raise ValueError(f"data.csv 缺少欄位:{req - set(df.columns)}")

    # 時間與型別
    df["open_time"] = pd.to_datetime(df["open_time"], errors="coerce")
    df = df.dropna(subset=["open_time"]).copy()
    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(subset=["open", "high", "low", "close"]).copy()

    # 收盤時間 = 開盤 + 1 分鐘
    df["close_time"] = df["open_time"] + pd.Timedelta(minutes=1)
    df = df.sort_values("close_time").reset_index(drop=True)

    # X 軸(epoch 秒)
    df["t"] = to_epoch_seconds(df["close_time"]).astype("int64")
    return df[["t", "open", "high", "low", "close"]]


# ------------------------- 高速蠟燭圖 Item -------------------------
class FastCandlesItem(pg.GraphicsObject):
    """
    高速蠟燭圖:
    - 只針對可視 x 範圍做裁切渲染(以二分法找 index)
    - 將紅/綠實體與影線「各自合併成單一 QPainterPath」
    - LOD:當 (可視範圍像素 / 可視資料點數) < 1 px 時,改以 close 線替代
    """
    sigRangeChanged = QtCore.pyqtSignal()

    def __init__(self, t: np.ndarray, o: np.ndarray, h: np.ndarray, l: np.ndarray, c: np.ndarray, viewbox: pg.ViewBox):
        super().__init__()
        self.t = t.astype(float)
        self.o = o.astype(float)
        self.h = h.astype(float)
        self.l = l.astype(float)
        self.c = c.astype(float)
        self.vb = viewbox

        self._cached_range = None   # (x1,x2,y1,y2) -> 用來避免重複重算
        self._picture = None        # 最終合成圖
        self._closeCurve = None     # LOD 模式下的 close 線
        self._pen_wick = QtGui.QPen(QtGui.QColor(0, 0, 0, 180), 1)
        self._brush_up = QtGui.QBrush(QtGui.QColor("#2ecc71"))
        self._brush_dn = QtGui.QBrush(QtGui.QColor("#e74c3c"))

        # 監聽視窗範圍變更
        self.vb.sigRangeChanged.connect(self._on_range_changed)

    # ---- 範圍 → 可視索引 ----
    def _visible_index(self, x1: float, x2: float):
        # 二分找左右邊界
        left = max(0, int(np.searchsorted(self.t, x1, side="left") - 1))
        right = min(len(self.t), int(np.searchsorted(self.t, x2, side="right") + 1))
        if right <= left:
            left, right = 0, 0
        return left, right

    # ---- 估算每根寬度的像素大小 ----
    def _candle_px_width(self):
        view_rect = self.vb.viewRect()
        x_span = view_rect.width()
        if x_span <= 0:
            return 0.0
        # 以時間間隔估算
        if len(self.t) > 1:
            dt = np.median(np.diff(self.t))
        else:
            dt = 60.0
        # 把 dt 映射到像素
        p0 = self.vb.mapViewToScene(QtCore.QPointF(self.t[0], 0)).x()
        p1 = self.vb.mapViewToScene(QtCore.QPointF(self.t[0] + dt, 0)).x()
        return abs(p1 - p0)

    # ---- 建立合併 QPicture(蠟燭實體與影線)----
    def _build_picture(self, i0: int, i1: int):
        # LOD:若每根 < 1px,改畫 close 線
        if self._candle_px_width() < 1.0:
            self._picture = None
            # 建一次 close 曲線
            x = self.t[i0:i1]
            y = self.c[i0:i1]
            path = QtGui.QPainterPath()
            if len(x) > 0:
                path.moveTo(x[0], y[0])
                for k in range(1, len(x)):
                    path.lineTo(x[k], y[k])
            pic = QtGui.QPicture()
            p = QtGui.QPainter(pic)
            p.setPen(QtGui.QPen(QtGui.QColor(30, 30, 30), 1))
            p.setBrush(QtCore.Qt.NoBrush)
            p.drawPath(path)
            p.end()
            self._closeCurve = pic
            return

        self._closeCurve = None

        up_path = QtGui.QPainterPath()
        dn_path = QtGui.QPainterPath()
        wl_path = QtGui.QPainterPath()  # wicks

        # 蠟燭實體寬度(用中位數間隔 * 0.6)
        if i1 - i0 > 1:
            w = np.median(np.diff(self.t[i0:i1])) * 0.6
        else:
            w = 30.0

        # 建構 path
        T = self.t[i0:i1]
        O = self.o[i0:i1]
        H = self.h[i0:i1]
        L = self.l[i0:i1]
        C = self.c[i0:i1]

        # 影線
        for x, lo, hi in zip(T, L, H):
            wl_path.moveTo(x, lo)
            wl_path.lineTo(x, hi)

        # 實體
        inc = C >= O
        dec = ~inc

        # 上漲
        for x, o, c in zip(T[inc], O[inc], C[inc]):
            y = min(o, c); h = max(o, c) - y
            if h <= 1e-12:  # 最小高度
                h = 1e-12
            up_path.addRect(QtCore.QRectF(x - w, y, 2*w, h))

        # 下跌
        for x, o, c in zip(T[dec], O[dec], C[dec]):
            y = min(o, c); h = max(o, c) - y
            if h <= 1e-12:
                h = 1e-12
            dn_path.addRect(QtCore.QRectF(x - w, y, 2*w, h))

        pic = QtGui.QPicture()
        p = QtGui.QPainter(pic)
        # 影線
        p.setPen(self._pen_wick)
        p.setBrush(QtCore.Qt.NoBrush)
        p.drawPath(wl_path)
        # 上漲實體
        p.setPen(QtCore.Qt.NoPen)
        p.setBrush(self._brush_up)
        p.drawPath(up_path)
        # 下跌實體
        p.setBrush(self._brush_dn)
        p.drawPath(dn_path)
        p.end()

        self._picture = pic

    # ---- 監聽範圍變化並重建圖像 ----
    def _on_range_changed(self):
        vr = self.vb.viewRect()
        xr = (vr.left(), vr.right(), vr.top(), vr.bottom())
        if self._cached_range == xr:
            return
        self._cached_range = xr

        # 計算可視索引
        i0, i1 = self._visible_index(xr[0], xr[1])
        self._build_picture(i0, i1)
        self.prepareGeometryChange()
        self.update()

    def paint(self, p, *args):
        # 沒有快取就先建(初始化)
        if self._picture is None and self._closeCurve is None:
            # 初次以全域視窗估算
            vb_rect = self.vb.viewRect()
            i0, i1 = self._visible_index(vb_rect.left(), vb_rect.right())
            self._build_picture(i0, i1)

        if self._closeCurve is not None:
            self._closeCurve.play(p)
        elif self._picture is not None:
            self._picture.play(p)

    def boundingRect(self):
        # 使用全資料範圍即可
        return QtCore.QRectF(self.t.min(), min(self.l.min(), self.o.min(), self.c.min()),
                             self.t.max() - self.t.min(),
                             max(self.h.max(), self.o.max(), self.c.max()) - min(self.l.min(), self.o.min(), self.c.min()))


# ------------------------- 主視窗 -------------------------
class MainWindow(QtWidgets.QMainWindow):
    def __init__(self, df: pd.DataFrame):
        super().__init__()
        self.setWindowTitle("高速 K 線圖(data.csv)")
        self.resize(1200, 720)

        # 中央繪圖
        axis_bottom = TimeAxisItem(orientation="bottom")
        self.plot = pg.PlotWidget(axisItems={"bottom": axis_bottom})
        self.setCentralWidget(self.plot)

        # 視覺設定(效能)
        self.plot.setBackground("w")
        pg.setConfigOptions(antialias=False)          # 關閉抗鋸齒
        pg.setConfigOption('foreground', 'k')
        self.plot.showGrid(x=True, y=True, alpha=0.15)

        # 放資料
        t = df["t"].to_numpy(dtype=float)
        o = df["open"].to_numpy(dtype=float)
        h = df["high"].to_numpy(dtype=float)
        l = df["low"].to_numpy(dtype=float)
        c = df["close"].to_numpy(dtype=float)

        self.vb = self.plot.getPlotItem().getViewBox()
        self.candles = FastCandlesItem(t, o, h, l, c, self.vb)
        self.plot.addItem(self.candles)

        # 初始視窗:顯示全部
        self.plot.setXRange(float(t.min()), float(t.max()), padding=0)
        ymin = float(np.nanmin([l.min(), o.min(), c.min()]))
        ymax = float(np.nanmax([h.max(), o.max(), c.max()]))
        self.plot.setYRange(ymin, ymax, padding=0.05)

        # 提示:滑鼠操作
        self._set_help()

    def _set_help(self):
        msg = ("滑鼠操作:滑輪縮放、拖曳平移。當太密時自動改為收盤價線(LOD)以提速。")
        sb = self.statusBar()
        sb.showMessage(msg, 8000)


# ------------------------- 進入點 -------------------------
def main():
    df = load_prices_csv("data.csv")

    app = QtWidgets.QApplication(sys.argv)
    win = MainWindow(df)
    win.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()

上一篇
Day11 - 本地開發價格繪製網頁,仍在Debug中
系列文
從零開始:AWS 部署 Python 自動交易程式與交易監測 Dashboard 實戰筆記12
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言