iT邦幫忙

2025 iThome 鐵人賽

DAY 5
0

從過去幾天的探索,我們發現 NautilusTrader 雖然提供的是 Python 使用介面,但核心運算其實是由 Rust 驅動

這種設計的好處,是能讓開發者享受 Python 的易用性,同時又能利用 Rust 的高效能與安全性。


在 NautilusTrader 的架構演進中:

  • v1 架構 使用 Cython 作為 Python 與 Rust 之間的橋樑
  • v2 架構 則逐漸轉向 PyO3,這也是目前 Rust ↔ Python 最受歡迎的整合方式

觀察他們的 GitHub,可以發現 v2 的第一個 commit 才在上個月 18 號,轉移工作才剛開始,因此本次我們將聚焦在 PyO3 的介紹與實作,先從原理與範例掌握概念,再回頭比較 NautilusTrader v1 的做法。

PyO3 套件專門提供 Rust/Python 相互呼叫的函式庫,封裝了底層 FFI 所需的型別轉換與呼叫規則,非常適合用來將 計算密集型的邏輯交給 Rust 處理,而非全交由 Python。


今天會先示範一個例子,之後幾天會做更深度的討論

首先 先安裝 maturin 可以參考底下網站 選擇你想要的方式

https://pyo3.rs/v0.25.1/getting-started.html

安裝完後,建立ㄧ個新的專案夾

進入專案夾

maturin init

選擇 Pyo3

接著安裝 Pyo3套件

cargo add pyo3

修改 Cargo.toml

[package]
name = "vwap_pnl"
version = "0.1.0"
edition = "2021"

[lib]
name = "vwap_pnl"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { version = "0.22", features = ["extension-module"] }
numpy = "0.22"

[profile.release]
codegen-units = 1
lto = "fat"
opt-level = 3

撰寫 Rust

use numpy::{PyArray1, PyReadonlyArray1};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};

#[pyfunction]
fn compute_vwap_pnl<'py>(
    py: Python<'py>,
    bid: PyReadonlyArray1<'py, f64>,
    ask: PyReadonlyArray1<'py, f64>,
    side: PyReadonlyArray1<'py, i8>,   // 1=buy, -1=sell
    size: PyReadonlyArray1<'py, f64>,  // >0
) -> PyResult<Bound<'py, PyDict>> {
    // zero-copy slice views
    let bid = bid.as_slice()?;
    let ask = ask.as_slice()?;
    let side = side.as_slice()?;
    let size = size.as_slice()?;

    let n = bid.len();
    if ask.len() != n || side.len() != n || size.len() != n {
        return Err(pyo3::exceptions::PyValueError::new_err("array length mismatch"));
    }

    let mut fill_price = vec![0.0f64; n];
    let mut pnl_per_fill = vec![0.0f64; n];

    let mut notional_sum = 0.0f64;
    let mut qty_sum = 0.0f64;

    use std::collections::VecDeque;
    // (qty, px): qty>0 long lot, qty<0 short lot
    let mut lots: VecDeque<(f64, f64)> = VecDeque::new();
    let mut realized_pnl = 0.0f64;

    for i in 0..n {
        let s = side[i] as i32;
        let q = size[i];
        if q <= 0.0 {
            continue;
        }

        let px = if s > 0 { ask[i] } else { bid[i] };
        fill_price[i] = px;

        notional_sum += px * q;
        qty_sum += q;

        if s > 0 {
            // BUY:先對沖 short
            let mut remain = q;
            while remain > 0.0 {
                if let Some(&(lot_q, lot_px)) = lots.front() {
                    if lot_q < 0.0 {
                        let match_q = remain.min(-lot_q);
                        realized_pnl += (lot_px - px) * match_q; // short pnl
                        pnl_per_fill[i] += (lot_px - px) * match_q;

                        let new_q = lot_q + match_q; // lot_q<0 → toward 0
                        lots.pop_front();
                        if new_q != 0.0 {
                            lots.push_front((new_q, lot_px));
                        }
                        remain -= match_q;
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            }
            if remain > 0.0 {
                lots.push_back((remain, px)); // 新增 long lot
            }
        } else {
            // SELL:先對沖 long
            let mut remain = q;
            while remain > 0.0 {
                if let Some(&(lot_q, lot_px)) = lots.front() {
                    if lot_q > 0.0 {
                        let match_q = remain.min(lot_q);
                        realized_pnl += (px - lot_px) * match_q; // long pnl
                        pnl_per_fill[i] += (px - lot_px) * match_q;

                        let new_q = lot_q - match_q;
                        lots.pop_front();
                        if new_q != 0.0 {
                            lots.push_front((new_q, lot_px));
                        }
                        remain -= match_q;
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            }
            if remain > 0.0 {
                lots.push_back((-remain, px)); // 新增 short lot
            }
        }
    }

    let vwap = if qty_sum > 0.0 { notional_sum / qty_sum } else { 0.0 };

    // 以 *Bound 風格* 建立回傳物件
    let out = PyDict::new_bound(py);
    let fill_price_arr = PyArray1::from_vec_bound(py, fill_price);
    let pnl_per_fill_arr = PyArray1::from_vec_bound(py, pnl_per_fill);

    out.set_item("fill_price", fill_price_arr)?;
    out.set_item("pnl_per_fill", pnl_per_fill_arr)?;
    out.set_item("vwap", vwap)?;
    out.set_item("realized_pnl", realized_pnl)?;
    Ok(out)
}

#[pymodule]
fn vwap_pnl(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(compute_vwap_pnl, m)?)?;
    Ok(())
}

建置專案,並安裝套件以提供Python呼叫

maturin develop

撰寫測試腳本

import time
import numpy as np
import vwap_pnl

# 造一批資料
N = 1_000_000
rng = np.random.default_rng(7)
mid = 30000 + 1000 * rng.standard_normal(N)
spr = np.clip(rng.normal(5, 1, N), 0.1, None)
bid = (mid - spr/2).astype(np.float64)
ask = (mid + spr/2).astype(np.float64)
side = rng.choice([1, -1], size=N, p=[0.5, 0.5]).astype(np.int8)
size = np.abs(rng.normal(0.01, 0.003, N)).astype(np.float64)

# PyO3
t0 = time.perf_counter()
out = vwap_pnl.compute_vwap_pnl(bid, ask, side, size)
t1 = time.perf_counter()
print("[PyO3] vwap:", out["vwap"], "realized_pnl:", out["realized_pnl"], "time:", round(t1-t0, 4), "s")

# 參考:純 Python(慢)
def py_compute(bid, ask, side, size):
    fill_price = np.empty_like(bid)
    realized = 0.0
    pnl_per = np.zeros_like(bid)
    from collections import deque
    lots = deque()
    for i in range(len(bid)):
        s = int(side[i]); q = float(size[i])
        if q <= 0:
            fill_price[i] = np.nan; continue
        px = ask[i] if s > 0 else bid[i]
        fill_price[i] = px
        if s > 0:
            remain = q
            while remain > 0 and lots and lots[0][0] < 0:
                lot_q, lot_px = lots[0]
                match_q = min(remain, -lot_q)
                realized += (lot_px - px) * match_q
                pnl_per[i] += (lot_px - px) * match_q
                lot_q += match_q
                lots.popleft()
                if lot_q != 0: lots.appendleft((lot_q, lot_px))
                remain -= match_q
            if remain > 0: lots.append((remain, px))
        else:
            remain = q
            while remain > 0 and lots and lots[0][0] > 0:
                lot_q, lot_px = lots[0]
                match_q = min(remain, lot_q)
                realized += (px - lot_px) * match_q
                pnl_per[i] += (px - lot_px) * match_q
                lot_q -= match_q
                lots.popleft()
                if lot_q != 0: lots.appendleft((lot_q, lot_px))
                remain -= match_q
            if remain > 0: lots.append((-remain, px))
    vwap = float(np.sum(fill_price * size) / np.sum(size))
    return {"fill_price": fill_price, "pnl_per_fill": pnl_per, "vwap": vwap, "realized_pnl": float(realized)}

t0 = time.perf_counter()
out_py = py_compute(bid, ask, side, size)
t1 = time.perf_counter()
print("[PurePy] vwap:", out_py["vwap"], "realized_pnl:", out_py["realized_pnl"], "time:", round(t1-t0, 4), "s")

底下是腳本輸出結果 可以看到效能明顯提升

https://ithelp.ithome.com.tw/upload/images/20250809/201779996nMuh5uES2.png

透過這次的範例,驗證了 PyO3 在高效能運算場景下的優勢,這種 Python 作為外層接口、Rust 作為核心引擎 的設計思路,能讓交易系統既保有靈活性,又能在毫秒級延遲的市場中保持競爭力。

接下來的幾天,我們會先介紹一下Rust,接著會實作一個簡單的python交易系統,完成後再根據其中可以用Rust優化的地方做加強,等我們足夠了解交易系統的架構後,就會來說明 NautilusTrader 的 V1 架構設計.


上一篇
【Day4】- 如何在 NautilusTrader 運行策略(Live&Sandbox)
下一篇
【Day6】- Rust(Concepts, Ownership)
系列文
NautilusTrader 架構解析:Rust 在高效能量化交易平台中的角色與優勢22
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言