iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0

什麼是原子交易?

在開始前,先把 「原子交易(Atomic Transaction)」 說清楚:

「它是一個把多筆狀態更新包起來的保護殼,保證這段操作要嘛全部成功、要嘛完全不生效。」

在原子交易中,你可以跨多個 set()、甚至穿過多個 await 做事。只有在整段成功時才一次提交並觸發效應重跑(單次 flush), 一旦中途失敗 就把所有被動到的 signal 還原成「進入交易前」的值,並且不把錯誤狀態推送出去。

它和一般的 batch/transaction(只負責合併重跑)不同,原子交易多了「回滾 rollback」語意:

「事成一次算、出錯全數撤。」

同時不改變我們核心的心智(computed 持續 lazy、依賴圖不被破壞)。

行為定義

有了上面的概念之後,我們來訂清楚接下來要實作的行為:

  • 成功:交易內所有 signal.set() 提交,最外層退出時一次 flushJobs(),our effects 重跑一次。
  • 失敗(throw/reject):把受影響的 signals 還原到進入本交易層時的值不 flush(避免把錯誤狀態推送到效應);其下游 computed 會標記為 stale,下一次讀取時 Lazy 重算。
  • 巢狀:每一層有自己的寫入日誌;
    • 內層成功 → 日誌合併進外層(保留外層第一次的舊值)。
    • 內層失敗 → 只回滾內層,外層可選擇 catch 後繼續或讓錯往上拋。
  • 等值:與核心一致,Object.is(prev, next) 視為無變更:不記錄、也不排程。

API 與檔案結構

  • scheduler.ts 新增:
    • atomic(fn):原子交易(支援同步/非同步)
    • inAtomic():是否在任何原子交易層
    • recordAtomicWrite(node, prev):於第一次寫入該層交易時,記錄舊值
  • scheduleJob:沿用我們原先的批次/交易 gating(用 batchDepth 決定是否延後 microtask)
  • signal.set():在「等值檢查通過、確定要寫」的那一刻,插一行 if (inAtomic()) recordAtomicWrite(node, prev)

實作 schduler.ts 功能擴充

import { markStale } from "./computed.js";
import type { Node } from "./graph.js";

export interface Schedulable { run(): void; disposed?: boolean }

// signal/computed 內部節點
export type InternalNode<T = unknown> = { value: T };

// 原子交易寫入日誌
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;

const queue = new Set<Schedulable>();
let scheduled = false;

// >0 代表在批次/交易中(延後 microtask)
let batchDepth = 0;

// 原子交易層級與日誌堆疊
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];

// 回滾時暫停排程,避免 scheduleJob 產生新的工作 
let muted = 0;

export function scheduleJob(job: Schedulable) {
  if (job.disposed) return;
  queue.add(job);
  if (!scheduled && batchDepth === 0) {
    scheduled = true;
    queueMicrotask(flushJobs);
  }
}

export function batch<T>(fn: () => T): T {
  batchDepth++;
  try {
    return fn();
  } finally {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
  }
}

// Promise 判斷
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
  return v != null && typeof v.then === "function";
}

export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
  batchDepth++;
  try {
    const out = fn();
    if (isPromiseLike<T>(out)) {
      return Promise.resolve(out).finally(() => {
        batchDepth--;
        if (batchDepth === 0) flushJobs();
      });
    }
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    return out as T;
  } catch (e) {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    throw e;
  }
}

// 原子交易(帶 rollback)
export function inAtomic() {
  return atomicDepth > 0;
}

// 記錄「本層第一次寫入」的舊值;由 signal.set() 在確定要寫入時呼叫
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
  const log = atomicLogs[atomicLogs.length - 1];
  if (!log) return; // 防呆:沒有 active atomic 層
  if (!log.has(node)) log.set(node, prevValue);
}

function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
  if ("value" in node) (node as { value: T }).value = v;
}

function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
  for (const [node, prev] of child) {
    if (!parent.has(node)) parent.set(node, prev);
  }
}

export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): Promise<T>;
export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> {
  // 進入原子層:抑制 flush(共用 batchDepth),開始記錄寫入
  batchDepth++;
  atomicDepth++;
  atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());

  const exitCommit = () => {
    const log = atomicLogs.pop()!;
    atomicDepth--;
    // 內層成功 → 合併「首見舊值」到父層
    if (atomicDepth > 0) {
      mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!);
    }
    // 最外層退出才 flush
    batchDepth--;
    if (batchDepth === 0) flushJobs();
  };

  const exitRollback = () => {
    const log = atomicLogs.pop()!;
    atomicDepth--;
    // 靜音回寫:避免在回滾過程再排程
    muted++;
    try {
      for (const [node, prev] of log) {
        writeNodeValue(node, prev);
        if ((node as Node).kind === "signal") {
          for (const sub of (node as Node).subs) {
            if (sub.kind === "computed") markStale(sub);
            // sub.kind === "effect" 不必 schedule(muted 會擋 & 稍後也不 flush)
          }
        }
      }
      queue.clear(); // 清掉這層期間的任務
      scheduled = false;
    } finally {
      muted--;
    }
    // 失敗不 flush;僅退出 batch/atomic 層級
    batchDepth--;
  };

  try {
    const out = fn();
    if (isPromiseLike<T>(out)) {
      return Promise.resolve(out).then(
        (v) => { exitCommit(); return v; },
        (err) => { exitRollback(); throw err; }
      );
    }
    // 同步成功
    exitCommit();
    return out as T;
  } catch (e) {
    // 同步失敗 → 回滾
    exitRollback();
    throw e;
  }
}

export function flushSync() {
  if (!scheduled && queue.size === 0) return;
  flushJobs();
}

function flushJobs() {
  scheduled = false;
  let guard = 0;
  while (queue.size) {
    const list = Array.from(queue);
    queue.clear();
    for (const job of list) job.run();
    if (++guard > 10000) throw new Error("Infinite update loop");
  }
}

一致性:

  • 原子交易失敗時,所有受影響的 computed 都會被標記為 stale
  • 回滾後第一次被讀取時,會按最新 signal 值懶重算。
  • UI 不會看到錯誤快照,也不會在回滾當下 flush。

實作 signal.ts 調整

import { markStale } from "./computed.js";
import { link, track, unlink, type Node } from "./graph.js";
import { SymbolRegistry as Effects } from "./registry.js";
import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";

type Comparator<T> = (a: T, b: T) => boolean;
const defaultEquals = Object.is;

export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) {
  const node: Node & InternalNode<T> &{ kind: 'signal'; equals: Comparator<T> } = {
    kind: 'signal',
    deps: new Set(),
    subs: new Set(),
    value: initial,
    equals,
  };

  const get = () => {
    track(node);
    return node.value;
  };

  const set = (next: T | ((prev: T) => T)) => {
    const prev = node.value;
    const nxtVal = typeof next === 'function' ? (next as (p: T) => T)(node.value) : next;
    if (node.equals(node.value, nxtVal)) return;
    // 原子交易掛鉤:只在「確定要寫入」前、且本層第一次觸及時記錄舊值
    if (inAtomic()) recordAtomicWrite(node, prev);
    // 原本的寫入
    node.value = nxtVal;

    // 無下游訂閱:可直接結束(避免空跑)
    if (node.subs.size === 0) return;
    
    // 有下游訂閱:依原本邏輯同步更新
    for (const sub of node.subs) {
      if (sub.kind === 'effect') {
        Effects.get(sub)?.schedule();
      } else if (sub.kind === 'computed') {
        markStale(sub);
      }
    }
  };

  const subscribe = (observer: Node) => {
    if (observer.kind === 'signal') {
      throw new Error('A signal cannot subscribe to another node');
    }
    link(observer, node);
    return () => unlink(observer, node);
  };

  return { get, set, subscribe, peek: () => node.value };
}

使用情境對照

內層失敗、外層接續(只回滾內層)

const a = signal(0);
const b = signal(0);

await atomic(async () => { // 外層
  a.set(1); // OK
  try {
    await atomic(async () => { // 內層
      b.set(1);
      throw new Error("boom"); // 內層失敗 → 還原 b=0
    });
  } catch {}
  // 走到這裡:a=1, b=0
}); // 外層成功 → 一次 flush

上層失敗(全部回滾)

const a = signal(0);
const b = signal(0);
try {
  await atomic(async () => {
    a.set(1);
    await Promise.resolve();
    b.set(2);
    throw new Error("oops"); // 整個交易失敗 → a,b 都還原
  });
} catch {}
// a=0, b=0;且本次沒有 flush

React / Vue 範例

在 React 中怎麼用(原子交易 + 回滾)

情境:編輯標題。點「Save」時用 atomic 做樂觀寫入;成功才提交,失敗就回滾。

import { useState, useEffect } from "react";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import {
  useSignalValue,
  useSignalState,
  useComputed,
} from "../hook/react_adapter.js";

// ---- 模擬 API ----
async function postTitle(v: string, shouldFail = false) {
  await new Promise((r) => setTimeout(r, 300)); // 模擬延遲
  if (shouldFail) throw new Error("server says no");
  return true;
}

// ---- 狀態 ----
const titleSig = signal("Hello");

// for unit test
export type EditorTestProps = { __sig?: ReturnType<typeof signal<string>> };

export function Editor({ __sig }: EditorTestProps = {}) {
  const sig = __sig ?? titleSig; // ← 預設用 module-scope 的;測試可覆蓋

  const committed = useSignalValue(sig); // 讀外部 signal 的快照
  const [draft, setDraft] = useSignalState(committed); // 本地草稿
  useEffect(() => setDraft(committed), [committed]);

  const len = useComputed(() => titleSig.get().length); // ✅ hook 回傳「值」

  const [saving, setSaving] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const [shouldFail, setShouldFail] = useState(false);

  const save = async () => {
    setSaving(true);
    setError(null);
    try {
      await atomic(async () => {
        sig.set(draft); // 樂觀寫入(不立刻 flush)
        await postTitle(draft, shouldFail); // 可能 throw → 回滾 + 不 flush
      });
      // 成功:退出 atomic 才 flush,一次更新 committed/len
    } catch (e: any) {
      setError(e?.message ?? "save failed");
    } finally {
      setSaving(false);
    }
  };

  return (
    <section>
      <input
        value={draft}
        onChange={(e) => setDraft(e.target.value)}
        disabled={saving}
      />
      <button onClick={save} disabled={saving}>
        {saving ? "Saving..." : "Save"}
      </button>
      <label style={{ marginLeft: 8 }}>
        <input
          type="checkbox"
          checked={shouldFail}
          onChange={(e) => setShouldFail(e.target.checked)}
        />
        simulate failure
      </label>

      <hr />
      <p>
        Committed title: <b>{committed}</b>
      </p>
      <p>
        Derived length (computed): <b>{len}</b>
      </p>
      {error && <p style={{ color: "crimson" }}>Error: {error}</p>}
    </section>
  );
}

  • 成功:點 Save → 等 300ms → committedlen 同一輪更新(一次 flush)。
  • 失敗:點 Save(勾選 simulate failure)→ 仍顯示舊值;computed 被標髒標記,下次讀取時(例如下一次成功或其它更新驅動)會重算到一致的快照。

在 Vue 中怎麼用(原子交易 + 回滾)

同樣情境:SFC 版本;透過 useSignalRef / useComputedRef 橋接。

<script setup lang="ts">
import { ref, watch } from "vue";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import { useSignalRef, useComputedRef } from "../hook/vue_adapter.js";

// ---- 模擬 API ----
async function postTitle(v: string, shouldFail = false) {
  await new Promise((r) => setTimeout(r, 300));
  if (shouldFail) throw new Error("server says no");
  return true;
}

// ---- 狀態 ----
const titleSig = signal("Hello");
const committed = useSignalRef(titleSig);
const titleLen = useComputedRef(() => titleSig.get().length);

const draft = ref(committed.value);
watch(committed, (v) => (draft.value = v)); // 外部變更時同步草稿

const saving = ref(false);
const error = ref<string | null>(null);
const shouldFail = ref(false);

async function save() {
  saving.value = true;
  error.value = null;
  try {
    await atomic(async () => {
      // 樂觀寫入,不會馬上 flush
      titleSig.set(draft.value);
      await postTitle(draft.value, shouldFail.value); // 可能 throw
    });
    // 成功:退出 atomic 才 flush → 模板一次更新
  } catch (e: any) {
    // 失敗:回滾、不 flush → 模板維持舊值
    error.value = e?.message ?? "save failed";
  } finally {
    saving.value = false;
  }
}
</script>

<template>
  <section>
    <div>
      <label>
        Draft:
        <input v-model="draft" :disabled="saving" />
      </label>
      <button @click="save" :disabled="saving">
        {{ saving ? "Saving..." : "Save" }}
      </button>
      <label style="margin-left: 8px">
        <input type="checkbox" v-model="shouldFail" :disabled="saving" />
        simulate failure
      </label>
    </div>

    <hr />

    <p>
      Committed title: <b>{{ committed }}</b>
    </p>
    <p>
      Derived length (computed): <b>{{ titleLen }}</b>
    </p>
    <p v-if="error" style="color: crimson">Error: {{ error }}</p>
  </section>
</template>
  • 成功:Save 後,committedtitleLen 同一輪 patch 更新。
  • 失敗:UI 保持舊值;由於回滾時已把受影響的 computed 標髒標記,之後任何一次讀取都會 Lazy 重算到正確狀態。

執行的時序圖(成功 vs 失敗)

https://ithelp.ithome.com.tw/upload/images/20250828/20129020gDJvHcKy4W.png

結語

透過 atomic 可以讓我們的狀態更新機制更完整,就算 async 失敗仍能保持之前的狀態,成功還原一開始講的「它是一個把多筆狀態更新包起來的保護殼,保證這段操作要嘛全部成功、要嘛完全不生效。」

下一篇,我們來聊 Scheduler 進階的議題。


上一篇
進階內核(I):Async Transaction 實作
系列文
Reactivity 小技巧大變革:掌握 Signals 就這麼簡單!24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言