經過一段時間的研究,我覺得目前在網頁上畫數據這個功能有點難開發,主要是數據量有點大;目前可能會改成網頁提供下載價格跟訂單的log檔案;然後,想要畫的話,再用本地端得程式去畫log,網頁端還是以盈虧數據跟簡單的資產vs市場價格折線圖為主。
以下是本地端用來畫較大數據量價格圖的程式,一分鐘得數據畫起來還是很卡,仍需想辦法切割數據跟優化。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
高速 K 線繪圖(讀取 data.csv)
- 欄位需求:open_time, open, high, low, close
- 主要優化:
1) 單一 QPainterPath 合併繪製(紅/綠 K、影線)
2) 視窗範圍裁切,只渲染可視區間
3) LOD:當每根寬度 < 1px 時改畫收盤價線
"""
import sys
import numpy as np
import pandas as pd
from PyQt5 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg
# ------------------------- 時間軸顯示 -------------------------
class TimeAxisItem(pg.AxisItem):
def tickStrings(self, values, scale, spacing):
out = []
for v in values:
try:
out.append(QtCore.QDateTime.fromSecsSinceEpoch(int(v)).toString("yyyy-MM-dd\nHH:mm"))
except Exception:
out.append("")
return out
# ------------------------- I/O 與預處理 -------------------------
def to_epoch_seconds(series: pd.Series) -> pd.Series:
s = pd.to_datetime(series, errors="coerce", utc=False)
# naive 直接當本地時間處理
s = s.dt.tz_localize(None)
# 轉 epoch(以本地時區理解後再視作本地 epoch;對單機回放來說足夠)
return (pd.to_datetime(s).astype("int64") // 10**9).astype("int64")
def load_prices_csv(path: str) -> pd.DataFrame:
df = pd.read_csv(path)
req = {"open_time", "open", "high", "low", "close"}
if not req.issubset(df.columns):
raise ValueError(f"data.csv 缺少欄位:{req - set(df.columns)}")
# 時間與型別
df["open_time"] = pd.to_datetime(df["open_time"], errors="coerce")
df = df.dropna(subset=["open_time"]).copy()
for c in ["open", "high", "low", "close"]:
df[c] = pd.to_numeric(df[c], errors="coerce")
df = df.dropna(subset=["open", "high", "low", "close"]).copy()
# 收盤時間 = 開盤 + 1 分鐘
df["close_time"] = df["open_time"] + pd.Timedelta(minutes=1)
df = df.sort_values("close_time").reset_index(drop=True)
# X 軸(epoch 秒)
df["t"] = to_epoch_seconds(df["close_time"]).astype("int64")
return df[["t", "open", "high", "low", "close"]]
# ------------------------- 高速蠟燭圖 Item -------------------------
class FastCandlesItem(pg.GraphicsObject):
"""
高速蠟燭圖:
- 只針對可視 x 範圍做裁切渲染(以二分法找 index)
- 將紅/綠實體與影線「各自合併成單一 QPainterPath」
- LOD:當 (可視範圍像素 / 可視資料點數) < 1 px 時,改以 close 線替代
"""
sigRangeChanged = QtCore.pyqtSignal()
def __init__(self, t: np.ndarray, o: np.ndarray, h: np.ndarray, l: np.ndarray, c: np.ndarray, viewbox: pg.ViewBox):
super().__init__()
self.t = t.astype(float)
self.o = o.astype(float)
self.h = h.astype(float)
self.l = l.astype(float)
self.c = c.astype(float)
self.vb = viewbox
self._cached_range = None # (x1,x2,y1,y2) -> 用來避免重複重算
self._picture = None # 最終合成圖
self._closeCurve = None # LOD 模式下的 close 線
self._pen_wick = QtGui.QPen(QtGui.QColor(0, 0, 0, 180), 1)
self._brush_up = QtGui.QBrush(QtGui.QColor("#2ecc71"))
self._brush_dn = QtGui.QBrush(QtGui.QColor("#e74c3c"))
# 監聽視窗範圍變更
self.vb.sigRangeChanged.connect(self._on_range_changed)
# ---- 範圍 → 可視索引 ----
def _visible_index(self, x1: float, x2: float):
# 二分找左右邊界
left = max(0, int(np.searchsorted(self.t, x1, side="left") - 1))
right = min(len(self.t), int(np.searchsorted(self.t, x2, side="right") + 1))
if right <= left:
left, right = 0, 0
return left, right
# ---- 估算每根寬度的像素大小 ----
def _candle_px_width(self):
view_rect = self.vb.viewRect()
x_span = view_rect.width()
if x_span <= 0:
return 0.0
# 以時間間隔估算
if len(self.t) > 1:
dt = np.median(np.diff(self.t))
else:
dt = 60.0
# 把 dt 映射到像素
p0 = self.vb.mapViewToScene(QtCore.QPointF(self.t[0], 0)).x()
p1 = self.vb.mapViewToScene(QtCore.QPointF(self.t[0] + dt, 0)).x()
return abs(p1 - p0)
# ---- 建立合併 QPicture(蠟燭實體與影線)----
def _build_picture(self, i0: int, i1: int):
# LOD:若每根 < 1px,改畫 close 線
if self._candle_px_width() < 1.0:
self._picture = None
# 建一次 close 曲線
x = self.t[i0:i1]
y = self.c[i0:i1]
path = QtGui.QPainterPath()
if len(x) > 0:
path.moveTo(x[0], y[0])
for k in range(1, len(x)):
path.lineTo(x[k], y[k])
pic = QtGui.QPicture()
p = QtGui.QPainter(pic)
p.setPen(QtGui.QPen(QtGui.QColor(30, 30, 30), 1))
p.setBrush(QtCore.Qt.NoBrush)
p.drawPath(path)
p.end()
self._closeCurve = pic
return
self._closeCurve = None
up_path = QtGui.QPainterPath()
dn_path = QtGui.QPainterPath()
wl_path = QtGui.QPainterPath() # wicks
# 蠟燭實體寬度(用中位數間隔 * 0.6)
if i1 - i0 > 1:
w = np.median(np.diff(self.t[i0:i1])) * 0.6
else:
w = 30.0
# 建構 path
T = self.t[i0:i1]
O = self.o[i0:i1]
H = self.h[i0:i1]
L = self.l[i0:i1]
C = self.c[i0:i1]
# 影線
for x, lo, hi in zip(T, L, H):
wl_path.moveTo(x, lo)
wl_path.lineTo(x, hi)
# 實體
inc = C >= O
dec = ~inc
# 上漲
for x, o, c in zip(T[inc], O[inc], C[inc]):
y = min(o, c); h = max(o, c) - y
if h <= 1e-12: # 最小高度
h = 1e-12
up_path.addRect(QtCore.QRectF(x - w, y, 2*w, h))
# 下跌
for x, o, c in zip(T[dec], O[dec], C[dec]):
y = min(o, c); h = max(o, c) - y
if h <= 1e-12:
h = 1e-12
dn_path.addRect(QtCore.QRectF(x - w, y, 2*w, h))
pic = QtGui.QPicture()
p = QtGui.QPainter(pic)
# 影線
p.setPen(self._pen_wick)
p.setBrush(QtCore.Qt.NoBrush)
p.drawPath(wl_path)
# 上漲實體
p.setPen(QtCore.Qt.NoPen)
p.setBrush(self._brush_up)
p.drawPath(up_path)
# 下跌實體
p.setBrush(self._brush_dn)
p.drawPath(dn_path)
p.end()
self._picture = pic
# ---- 監聽範圍變化並重建圖像 ----
def _on_range_changed(self):
vr = self.vb.viewRect()
xr = (vr.left(), vr.right(), vr.top(), vr.bottom())
if self._cached_range == xr:
return
self._cached_range = xr
# 計算可視索引
i0, i1 = self._visible_index(xr[0], xr[1])
self._build_picture(i0, i1)
self.prepareGeometryChange()
self.update()
def paint(self, p, *args):
# 沒有快取就先建(初始化)
if self._picture is None and self._closeCurve is None:
# 初次以全域視窗估算
vb_rect = self.vb.viewRect()
i0, i1 = self._visible_index(vb_rect.left(), vb_rect.right())
self._build_picture(i0, i1)
if self._closeCurve is not None:
self._closeCurve.play(p)
elif self._picture is not None:
self._picture.play(p)
def boundingRect(self):
# 使用全資料範圍即可
return QtCore.QRectF(self.t.min(), min(self.l.min(), self.o.min(), self.c.min()),
self.t.max() - self.t.min(),
max(self.h.max(), self.o.max(), self.c.max()) - min(self.l.min(), self.o.min(), self.c.min()))
# ------------------------- 主視窗 -------------------------
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, df: pd.DataFrame):
super().__init__()
self.setWindowTitle("高速 K 線圖(data.csv)")
self.resize(1200, 720)
# 中央繪圖
axis_bottom = TimeAxisItem(orientation="bottom")
self.plot = pg.PlotWidget(axisItems={"bottom": axis_bottom})
self.setCentralWidget(self.plot)
# 視覺設定(效能)
self.plot.setBackground("w")
pg.setConfigOptions(antialias=False) # 關閉抗鋸齒
pg.setConfigOption('foreground', 'k')
self.plot.showGrid(x=True, y=True, alpha=0.15)
# 放資料
t = df["t"].to_numpy(dtype=float)
o = df["open"].to_numpy(dtype=float)
h = df["high"].to_numpy(dtype=float)
l = df["low"].to_numpy(dtype=float)
c = df["close"].to_numpy(dtype=float)
self.vb = self.plot.getPlotItem().getViewBox()
self.candles = FastCandlesItem(t, o, h, l, c, self.vb)
self.plot.addItem(self.candles)
# 初始視窗:顯示全部
self.plot.setXRange(float(t.min()), float(t.max()), padding=0)
ymin = float(np.nanmin([l.min(), o.min(), c.min()]))
ymax = float(np.nanmax([h.max(), o.max(), c.max()]))
self.plot.setYRange(ymin, ymax, padding=0.05)
# 提示:滑鼠操作
self._set_help()
def _set_help(self):
msg = ("滑鼠操作:滑輪縮放、拖曳平移。當太密時自動改為收盤價線(LOD)以提速。")
sb = self.statusBar()
sb.showMessage(msg, 8000)
# ------------------------- 進入點 -------------------------
def main():
df = load_prices_csv("data.csv")
app = QtWidgets.QApplication(sys.argv)
win = MainWindow(df)
win.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()