目前仍然還在構思中、先寫一個價格切割器:
from __future__ import annotations
import sys, os, argparse, datetime as _dt
from typing import List, Tuple, Optional
import numpy as np
import pandas as pd
from PyQt5 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg
# ---------------------------------------------------------------------
# 視覺化(PyQtGraph)— 與題主給的實作風格一致
# ---------------------------------------------------------------------
class TimeAxisItem(pg.AxisItem):
def tickStrings(self, values, scale, spacing):
out = []
for v in values:
try:
out.append(_dt.datetime.fromtimestamp(v).strftime("%Y-%m-%d\n%H:%M"))
except Exception:
out.append("")
return out
class CandlestickItem(pg.GraphicsObject):
"""
data: ndarray shape (N, 5): [t, open, high, low, close]
透過 QPicture 快取,避免重複逐根繪圖造成的卡頓。
"""
def __init__(self, data: np.ndarray):
super().__init__()
self.data = data
self.picture = None
self._bounds = QtCore.QRectF()
self._generate()
def _estimate_width(self) -> float:
if len(self.data) < 2:
return 30.0
dt = float(np.median(np.diff(self.data[:, 0])))
return max(1.0, 0.3 * dt)
def _generate(self):
pic = QtGui.QPicture()
p = QtGui.QPainter(pic)
up_brush = QtGui.QBrush(QtGui.QColor("#2ecc71"))
dn_brush = QtGui.QBrush(QtGui.QColor("#e74c3c"))
wick_pen = pg.mkPen(0.0)
w = self._estimate_width()
for t, o, h, l, c in self.data:
# wick
p.setPen(wick_pen)
p.setBrush(QtCore.Qt.NoBrush)
p.drawLine(QtCore.QPointF(t, l), QtCore.QPointF(t, h))
# body
if c >= o:
p.setBrush(up_brush)
rect = QtCore.QRectF(t - w, o, 2*w, max(c - o, 1e-12))
else:
p.setBrush(dn_brush)
rect = QtCore.QRectF(t - w, c, 2*w, max(o - c, 1e-12))
p.setPen(QtCore.Qt.NoPen)
p.drawRect(rect)
p.end()
self.picture = pic
self._bounds = QtCore.QRectF(self.picture.boundingRect())
def paint(self, p, *args):
if self.picture is not None:
self.picture.play(p)
def boundingRect(self):
return self._bounds
# ---------------------------------------------------------------------
# 主視窗
# ---------------------------------------------------------------------
class LODCandlestickItem(pg.GraphicsObject):
"""
針對大量 K 線(十萬以上)優化的繪圖物件:
- 依視窗縮放自動選擇層級(level,代表每一繪製 bar 聚合多少原始 bar)
- 分塊(tile)+ QPicture 快取,只繪製可見區域的少數 tile
仍然遵循題主的繪圖思路(QPicture + QPainter),只是把資料量切小。
"""
def __init__(self, t: np.ndarray, o: np.ndarray, h: np.ndarray, l: np.ndarray, c: np.ndarray,
vb: pg.ViewBox, tile_size: int = 2000):
super().__init__()
assert t.ndim == o.ndim == h.ndim == l.ndim == c.ndim == 1
self.t = t.astype(float)
self.o = o.astype(float)
self.h = h.astype(float)
self.l = l.astype(float)
self.c = c.astype(float)
self.vb = vb
self.tile_size = int(max(256, tile_size)) # 每個層級下,單一 tile 的聚合 bar 數
self.base_dt = float(np.median(np.diff(self.t))) if len(self.t) > 1 else 60.0
# 快取: (level, tile_idx) -> QPicture
self._cache = {}
self._bounds = QtCore.QRectF(
QtCore.QRectF(
float(self.t.min()) if len(self.t) else 0.0,
float(np.nanmin(self.l)) if len(self.l) else 0.0,
float(self.t.max() - self.t.min()) if len(self.t) else 1.0,
float(np.nanmax(self.h) - np.nanmin(self.l)) if len(self.h) else 1.0,
)
)
self._last_level = None
# 視圖變更時觸發重繪(決定 level 與可見 tile)
if self.vb is not None:
self.vb.sigXRangeChanged.connect(self._on_view_changed)
try:
self.vb.sigResized.connect(self._on_view_changed)
except Exception:
pass
# ---- 公用 ----
def boundingRect(self):
return self._bounds
# ---- 事件 ----
def _on_view_changed(self, *args):
self.update()
# ---- LOD 選擇 ----
def _choose_level(self) -> int:
try:
x0, x1 = self.vb.viewRange()[0]
span = max(1e-9, float(x1 - x0))
px = max(1.0, float(self.vb.width()))
except Exception:
# 看不到 view 資訊時退回 level=1
return 1
sec_per_px = span / px
# 目標:每根 K 至少佔 ~1 px。level 代表聚合多少原始 bar。
raw = max(1.0, sec_per_px / max(1e-9, self.base_dt))
# 取 2 的冪次,利於重複利用快取
lvl = int(2 ** np.ceil(np.log2(raw)))
return max(1, min(lvl, 1 << 16))
def _visible_tile_range(self, level: int):
# 將目前可視 x 範圍轉換成 level 下的 tile 範圍
try:
x0, x1 = self.vb.viewRange()[0]
except Exception:
x0 = float(self.t[0]); x1 = float(self.t[-1])
# 基準索引(原始陣列)
i0 = int(np.searchsorted(self.t, x0, side='left'))
i1 = int(np.searchsorted(self.t, x1, side='right'))
# 轉為 level 下的聚合索引
j0 = i0 // level
j1 = max(j0, i1 // level)
# 轉為 tile 索引
tile0 = max(0, j0 // self.tile_size)
tile1 = max(tile0, (j1 + self.tile_size - 1) // self.tile_size)
return tile0, tile1
# ---- 圖塊建構 ----
def _build_tile_picture(self, level: int, tile_idx: int) -> QtGui.QPicture:
key = (level, tile_idx)
pic = self._cache.get(key)
if pic is not None:
return pic
# 在原始資料索引空間下切出這個 tile 需要覆蓋的範圍
base_i0 = tile_idx * self.tile_size * level
base_i1 = min(len(self.t), base_i0 + self.tile_size * level)
if base_i0 >= base_i1:
# 建一個空 picture,避免重複計算
empty = QtGui.QPicture(); self._cache[key] = empty; return empty
# 準備繪圖
pic = QtGui.QPicture()
qp = QtGui.QPainter(pic)
up_brush = QtGui.QBrush(QtGui.QColor("#2ecc71"))
dn_brush = QtGui.QBrush(QtGui.QColor("#e74c3c"))
wick_pen = pg.mkPen(0.0)
w = max(1.0, 0.3 * self.base_dt * level)
# 逐聚合 bar 畫(每個聚合含 level 根原始 K 線)
# 單 tile 最多 self.tile_size 次迴圈,保證可控
for j in range(self.tile_size):
i0 = base_i0 + j * level
if i0 >= base_i1:
break
i1 = min(i0 + level, base_i1)
t0 = self.t[i0]
o0 = self.o[i0]
c0 = self.c[i1 - 1]
# 高低價需要看整個聚合區間
h0 = float(np.nanmax(self.h[i0:i1]))
l0 = float(np.nanmin(self.l[i0:i1]))
# wick
qp.setPen(wick_pen)
qp.setBrush(QtCore.Qt.NoBrush)
qp.drawLine(QtCore.QPointF(t0, l0), QtCore.QPointF(t0, h0))
# body
if c0 >= o0:
qp.setBrush(up_brush)
rect = QtCore.QRectF(t0 - w, o0, 2*w, max(c0 - o0, 1e-12))
else:
qp.setBrush(dn_brush)
rect = QtCore.QRectF(t0 - w, c0, 2*w, max(o0 - c0, 1e-12))
qp.setPen(QtCore.Qt.NoPen)
qp.drawRect(rect)
qp.end()
self._cache[key] = pic
return pic
# ---- 繪製 ----
def paint(self, p, *args):
level = self._choose_level()
# 只在 level 改變或視圖改變時繪製;由於我們只回放 QPicture,這裡成本低
t0, t1 = self._visible_tile_range(level)
for tile_idx in range(t0, t1 + 1):
pic = self._build_tile_picture(level, tile_idx)
if pic is not None:
pic.play(p)
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, csv_path: Optional[str] = None):
super().__init__()
self.setWindowTitle("CSV Candle Picker (1Y 範圍選取/另存)")
self.resize(1280, 760)
# 狀態
self.df: Optional[pd.DataFrame] = None # 原始資料(含 open_time/close_time 等)
self._t: np.ndarray = np.array([], dtype=float) # x 軸(秒)
self._candles: Optional[CandlestickItem] = None
self._sel_region: Optional[pg.LinearRegionItem] = None
self._sel_start_ts: Optional[float] = None
self._sel_end_ts: Optional[float] = None
# 兼容舊版 pyqtgraph:額外用兩條無限垂直線當作邊框
self._sel_left_line: Optional[pg.InfiniteLine] = None
self._sel_right_line: Optional[pg.InfiniteLine] = None
# 版面
root = QtWidgets.QWidget(self)
v = QtWidgets.QVBoxLayout(root)
self.setCentralWidget(root)
ctrl = QtWidgets.QHBoxLayout()
self.btn_load = QtWidgets.QPushButton("載入 CSV…")
self.btn_save = QtWidgets.QPushButton("另存選取…")
self.btn_save.setEnabled(False)
self.lab_file = QtWidgets.QLabel("狀態:尚未載入檔案")
self.lab_file.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse)
ctrl.addWidget(self.btn_load)
ctrl.addWidget(self.btn_save)
ctrl.addStretch(1)
ctrl.addWidget(self.lab_file)
v.addLayout(ctrl)
self.time_axis = TimeAxisItem(orientation="bottom")
self.pw = pg.PlotWidget(axisItems={"bottom": self.time_axis})
self.pw.setBackground("w")
self.pw.showGrid(x=True, y=True, alpha=0.2)
self.pw.setLabel('left', "Price")
v.addWidget(self.pw, 1)
self.status = self.statusBar()
# 事件
self.btn_load.clicked.connect(self._on_load_clicked)
self.btn_save.clicked.connect(self._on_save_clicked)
self.pw.scene().sigMouseClicked.connect(self._on_mouse_clicked)
# 預設載入
if csv_path:
self.load_csv(csv_path)
# ------------------------- I/O -------------------------
def _on_load_clicked(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, "選擇 CSV 檔", os.getcwd(), "CSV Files (*.csv);;All Files (*)"
)
if path:
self.load_csv(path)
def load_csv(self, path: str):
try:
self.df = load_ohlcv_csv(path)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "讀取失敗", f"讀取 CSV 發生錯誤\n{e}")
return
self.lab_file.setText(f"已載入:{os.path.abspath(path)} ({len(self.df)} rows)")
self.status.showMessage("CSV 載入完成,正在繪圖…", 3000)
self._plot_all()
self._clear_selection_visual()
# ------------------------- 繪圖 -------------------------
def _plot_all(self):
self.pw.clear()
self._candles = None
if self.df is None or self.df.empty:
return
t = self.df['t'].to_numpy(dtype=float)
o = self.df['open'].to_numpy(dtype=float)
h = self.df['high'].to_numpy(dtype=float)
l = self.df['low'].to_numpy(dtype=float)
c = self.df['close'].to_numpy(dtype=float)
# 建立大量資料友善的 LOD+Tile K 線物件
vb = self.pw.plotItem.vb
self._candles = LODCandlestickItem(t, o, h, l, c, vb=vb, tile_size=2000)
self.pw.addItem(self._candles)
self._t = t
# 視窗範圍
self.pw.setXRange(float(t.min()), float(t.max()), padding=0)
self.status.showMessage(f"已繪製 K 線:{len(t)} 根", 5000)
# ------------------------- 選取(點擊) -------------------------
def _on_mouse_clicked(self, ev):
if self.df is None or self._t.size == 0:
return
try:
if ev.button() != QtCore.Qt.LeftButton:
return
except Exception:
pass
pos = ev.scenePos() if hasattr(ev, 'scenePos') else (ev[0].scenePos() if isinstance(ev, (list, tuple)) else None)
if pos is None:
return
if not self.pw.plotItem.vb.sceneBoundingRect().contains(pos):
return
mouse_point = self.pw.plotItem.vb.mapSceneToView(pos)
x = float(mouse_point.x())
# 以最近的 candle 索引為準
idx = int(np.argmin(np.abs(self._t - x)))
if idx < 0 or idx >= self._t.size:
return
start_ts = float(self._t[idx])
start_dt = pd.to_datetime(start_ts, unit='s')
end_dt = start_dt + pd.DateOffset(years=1) # 完整 1 年(使用 pandas DateOffset 處理閏年)
end_ts = float(end_dt.timestamp())
# 限制在資料範圍內
tmin, tmax = float(self._t.min()), float(self._t.max())
end_ts_clipped = min(end_ts, tmax)
# 視覺化:半透明框選區域
self._apply_selection_visual(start_ts, end_ts_clipped)
# 儲存選取邏輯值給另存用
self._sel_start_ts = start_ts
self._sel_end_ts = end_ts
sel_df = self._current_selection_df()
nrows = 0 if sel_df is None else len(sel_df)
self.btn_save.setEnabled(nrows > 0)
self.status.showMessage(
f"已選取自 {start_dt:%Y-%m-%d %H:%M} 起 1 年區間(實際切到 {pd.to_datetime(end_ts_clipped, unit='s'):%Y-%m-%d %H:%M})筆數 {nrows}",
6000,
)
def _apply_selection_visual(self, x0: float, x1: float):
# 先移除舊的視覺物件
if self._sel_region is not None:
try:
self.pw.removeItem(self._sel_region)
except Exception:
pass
self._sel_region = None
for ln in (self._sel_left_line, self._sel_right_line):
if ln is not None:
try:
self.pw.removeItem(ln)
except Exception:
pass
self._sel_left_line = None
self._sel_right_line = None
# 新增 LinearRegionItem 作為半透明填色
region = pg.LinearRegionItem(values=(x0, x1))
region.setZValue(10)
region.setMovable(False)
region.setBrush(pg.mkBrush(52, 152, 219, 40)) # 半透明藍
# 某些舊版 pyqtgraph 的 LinearRegionItem 沒有 setPen;有就設,沒有就忽略
pen = pg.mkPen('#3498db', width=2)
if hasattr(region, 'setPen'):
try:
region.setPen(pen)
except Exception:
pass
self.pw.addItem(region)
self._sel_region = region
# 版本相容:額外加兩條邊界線當邊框(InfiniteLine 支援 setPen)
self._sel_left_line = pg.InfiniteLine(pos=x0, angle=90, movable=False, pen=pen)
self._sel_right_line = pg.InfiniteLine(pos=x1, angle=90, movable=False, pen=pen)
self._sel_left_line.setZValue(11)
self._sel_right_line.setZValue(11)
self.pw.addItem(self._sel_left_line)
self.pw.addItem(self._sel_right_line)
def _clear_selection_visual(self):
if self._sel_region is not None:
try:
self.pw.removeItem(self._sel_region)
except Exception:
pass
for ln in (self._sel_left_line, self._sel_right_line):
if ln is not None:
try:
self.pw.removeItem(ln)
except Exception:
pass
self._sel_region = None
self._sel_left_line = None
self._sel_right_line = None
self._sel_start_ts = None
self._sel_end_ts = None
self.btn_save.setEnabled(False)
def _current_selection_df(self) -> Optional[pd.DataFrame]:
if self.df is None or self._sel_start_ts is None or self._sel_end_ts is None:
return None
# 注意:儲存時使用 [start, end) 半開區間,比較穩定
start_dt = pd.to_datetime(self._sel_start_ts, unit='s')
end_dt = pd.to_datetime(self._sel_end_ts, unit='s')
mask = (self.df['open_time'] >= start_dt) & (self.df['open_time'] < end_dt)
sel = self.df.loc[mask].copy()
return sel
# ------------------------- 另存 -------------------------
def _on_save_clicked(self):
sel = self._current_selection_df()
if sel is None or sel.empty:
QtWidgets.QMessageBox.information(self, "沒有資料", "目前沒有可另存的選取資料。請先在圖上點擊一根 K 線。")
return
s0 = sel['open_time'].iloc[0]
s1 = sel['open_time'].iloc[-1]
default_name = f"slice_{s0:%Y%m%d_%H%M}_{s1:%Y%m%d_%H%M}.csv"
path, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "另存選取為 CSV", os.path.join(os.getcwd(), default_name), "CSV Files (*.csv)"
)
if not path:
return
# 保留原始欄位順序輸出
cols = list(self.df.columns)
try:
sel.to_csv(path, index=False, columns=cols)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "寫入失敗", f"儲存 CSV 發生錯誤\n{e}")
return
self.status.showMessage(f"已另存:{path}", 6000)
# 清除框選效果
self._clear_selection_visual()
# ---------------------------------------------------------------------
# CSV 讀取
# ---------------------------------------------------------------------
def load_ohlcv_csv(path: str) -> pd.DataFrame:
"""
讀入題主格式 CSV,回傳 DataFrame,並附加:
- open_time(datetime64[ns])
- close_time(datetime64[ns])
- t:x 軸用之 Unix 秒(float)
會依 open_time 排序並去除 open_time 缺失值。
"""
# dtype 推測交由 pandas,僅明確指定數字欄避免 object
df = pd.read_csv(path)
required = [
'open_time','open','high','low','close','volume',
'close_time','quote_asset_volume','number_of_trades','taker_buy_base','taker_buy_quote'
]
miss = [c for c in required if c not in df.columns]
if miss:
raise ValueError(f"CSV 欄位缺少:{miss}")
# 解析時間
df['open_time'] = pd.to_datetime(df['open_time'], errors='coerce')
df['close_time'] = pd.to_datetime(df['close_time'], errors='coerce')
# 清理
df = df.dropna(subset=['open_time']).copy()
df.sort_values('open_time', inplace=True)
# x 軸(秒)
df['t'] = df['open_time'].astype('int64') / 1e9
# 確保數值欄為 float
for col in ['open','high','low','close','volume','quote_asset_volume','taker_buy_base','taker_buy_quote']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df['number_of_trades'] = pd.to_numeric(df['number_of_trades'], errors='coerce').astype('Int64')
df = df.reset_index(drop=True)
return df
# ---------------------------------------------------------------------
# 進入點
# ---------------------------------------------------------------------
def parse_args():
ap = argparse.ArgumentParser(description="CSV Candle Picker")
ap.add_argument('--csv', type=str, default=None, help='啟動時直接載入的 CSV 路徑')
return ap.parse_args()
def main():
args = parse_args()
# 繪圖設定(與題主風格一致)
pg.setConfigOptions(antialias=False)
pg.setConfigOption('background', 'w')
pg.setConfigOption('foreground', 'k')
app = QtWidgets.QApplication(sys.argv)
win = MainWindow(csv_path=args.csv)
win.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()