在開始前,先把 「原子交易(Atomic Transaction)」 說清楚:
「它是一個把多筆狀態更新包起來的保護殼,保證這段操作要嘛全部成功、要嘛完全不生效。」
在原子交易中,你可以跨多個 set()
、甚至穿過多個 await
做事。只有在整段成功時才一次提交並觸發效應重跑(單次 flush), 一旦中途失敗 就把所有被動到的 signal 還原成「進入交易前」的值,並且不把錯誤狀態推送出去。
它和一般的 batch/transaction
(只負責合併重跑)不同,原子交易多了「回滾 rollback」語意:
「事成一次算、出錯全數撤。」
同時不改變我們核心的心智(computed 持續 lazy、依賴圖不被破壞)。
有了上面的概念之後,我們來訂清楚接下來要實作的行為:
signal.set()
提交,最外層退出時一次 flushJobs()
,our effects 重跑一次。computed
會標記為 stale
,下一次讀取時 Lazy 重算。Object.is(prev, next)
視為無變更:不記錄、也不排程。scheduler.ts
新增:
atomic(fn)
:原子交易(支援同步/非同步)inAtomic()
:是否在任何原子交易層recordAtomicWrite(node, prev)
:於第一次寫入該層交易時,記錄舊值scheduleJob
:沿用我們原先的批次/交易 gating(用 batchDepth
決定是否延後 microtask)signal.set()
:在「等值檢查通過、確定要寫」的那一刻,插一行 if (inAtomic()) recordAtomicWrite(node, prev)
schduler.ts
功能擴充import { markStale } from "./computed.js";
import type { Node } from "./graph.js";
export interface Schedulable { run(): void; disposed?: boolean }
// signal/computed 內部節點
export type InternalNode<T = unknown> = { value: T };
// 原子交易寫入日誌
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;
const queue = new Set<Schedulable>();
let scheduled = false;
// >0 代表在批次/交易中(延後 microtask)
let batchDepth = 0;
// 原子交易層級與日誌堆疊
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];
// 回滾時暫停排程,避免 scheduleJob 產生新的工作
let muted = 0;
export function scheduleJob(job: Schedulable) {
if (job.disposed) return;
queue.add(job);
if (!scheduled && batchDepth === 0) {
scheduled = true;
queueMicrotask(flushJobs);
}
}
export function batch<T>(fn: () => T): T {
batchDepth++;
try {
return fn();
} finally {
batchDepth--;
if (batchDepth === 0) flushJobs();
}
}
// Promise 判斷
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
return v != null && typeof v.then === "function";
}
export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
batchDepth++;
try {
const out = fn();
if (isPromiseLike<T>(out)) {
return Promise.resolve(out).finally(() => {
batchDepth--;
if (batchDepth === 0) flushJobs();
});
}
batchDepth--;
if (batchDepth === 0) flushJobs();
return out as T;
} catch (e) {
batchDepth--;
if (batchDepth === 0) flushJobs();
throw e;
}
}
// 原子交易(帶 rollback)
export function inAtomic() {
return atomicDepth > 0;
}
// 記錄「本層第一次寫入」的舊值;由 signal.set() 在確定要寫入時呼叫
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
const log = atomicLogs[atomicLogs.length - 1];
if (!log) return; // 防呆:沒有 active atomic 層
if (!log.has(node)) log.set(node, prevValue);
}
function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
if ("value" in node) (node as { value: T }).value = v;
}
function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
for (const [node, prev] of child) {
if (!parent.has(node)) parent.set(node, prev);
}
}
export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): Promise<T>;
export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> {
// 進入原子層:抑制 flush(共用 batchDepth),開始記錄寫入
batchDepth++;
atomicDepth++;
atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());
const exitCommit = () => {
const log = atomicLogs.pop()!;
atomicDepth--;
// 內層成功 → 合併「首見舊值」到父層
if (atomicDepth > 0) {
mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!);
}
// 最外層退出才 flush
batchDepth--;
if (batchDepth === 0) flushJobs();
};
const exitRollback = () => {
const log = atomicLogs.pop()!;
atomicDepth--;
// 靜音回寫:避免在回滾過程再排程
muted++;
try {
for (const [node, prev] of log) {
writeNodeValue(node, prev);
if ((node as Node).kind === "signal") {
for (const sub of (node as Node).subs) {
if (sub.kind === "computed") markStale(sub);
// sub.kind === "effect" 不必 schedule(muted 會擋 & 稍後也不 flush)
}
}
}
queue.clear(); // 清掉這層期間的任務
scheduled = false;
} finally {
muted--;
}
// 失敗不 flush;僅退出 batch/atomic 層級
batchDepth--;
};
try {
const out = fn();
if (isPromiseLike<T>(out)) {
return Promise.resolve(out).then(
(v) => { exitCommit(); return v; },
(err) => { exitRollback(); throw err; }
);
}
// 同步成功
exitCommit();
return out as T;
} catch (e) {
// 同步失敗 → 回滾
exitRollback();
throw e;
}
}
export function flushSync() {
if (!scheduled && queue.size === 0) return;
flushJobs();
}
function flushJobs() {
scheduled = false;
let guard = 0;
while (queue.size) {
const list = Array.from(queue);
queue.clear();
for (const job of list) job.run();
if (++guard > 10000) throw new Error("Infinite update loop");
}
}
一致性:
computed
都會被標記為 stale
。signal.ts
調整import { markStale } from "./computed.js";
import { link, track, unlink, type Node } from "./graph.js";
import { SymbolRegistry as Effects } from "./registry.js";
import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";
type Comparator<T> = (a: T, b: T) => boolean;
const defaultEquals = Object.is;
export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) {
const node: Node & InternalNode<T> &{ kind: 'signal'; equals: Comparator<T> } = {
kind: 'signal',
deps: new Set(),
subs: new Set(),
value: initial,
equals,
};
const get = () => {
track(node);
return node.value;
};
const set = (next: T | ((prev: T) => T)) => {
const prev = node.value;
const nxtVal = typeof next === 'function' ? (next as (p: T) => T)(node.value) : next;
if (node.equals(node.value, nxtVal)) return;
// 原子交易掛鉤:只在「確定要寫入」前、且本層第一次觸及時記錄舊值
if (inAtomic()) recordAtomicWrite(node, prev);
// 原本的寫入
node.value = nxtVal;
// 無下游訂閱:可直接結束(避免空跑)
if (node.subs.size === 0) return;
// 有下游訂閱:依原本邏輯同步更新
for (const sub of node.subs) {
if (sub.kind === 'effect') {
Effects.get(sub)?.schedule();
} else if (sub.kind === 'computed') {
markStale(sub);
}
}
};
const subscribe = (observer: Node) => {
if (observer.kind === 'signal') {
throw new Error('A signal cannot subscribe to another node');
}
link(observer, node);
return () => unlink(observer, node);
};
return { get, set, subscribe, peek: () => node.value };
}
const a = signal(0);
const b = signal(0);
await atomic(async () => { // 外層
a.set(1); // OK
try {
await atomic(async () => { // 內層
b.set(1);
throw new Error("boom"); // 內層失敗 → 還原 b=0
});
} catch {}
// 走到這裡:a=1, b=0
}); // 外層成功 → 一次 flush
const a = signal(0);
const b = signal(0);
try {
await atomic(async () => {
a.set(1);
await Promise.resolve();
b.set(2);
throw new Error("oops"); // 整個交易失敗 → a,b 都還原
});
} catch {}
// a=0, b=0;且本次沒有 flush
情境:編輯標題。點「Save」時用 atomic 做樂觀寫入;成功才提交,失敗就回滾。
import { useState, useEffect } from "react";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import {
useSignalValue,
useSignalState,
useComputed,
} from "../hook/react_adapter.js";
// ---- 模擬 API ----
async function postTitle(v: string, shouldFail = false) {
await new Promise((r) => setTimeout(r, 300)); // 模擬延遲
if (shouldFail) throw new Error("server says no");
return true;
}
// ---- 狀態 ----
const titleSig = signal("Hello");
// for unit test
export type EditorTestProps = { __sig?: ReturnType<typeof signal<string>> };
export function Editor({ __sig }: EditorTestProps = {}) {
const sig = __sig ?? titleSig; // ← 預設用 module-scope 的;測試可覆蓋
const committed = useSignalValue(sig); // 讀外部 signal 的快照
const [draft, setDraft] = useSignalState(committed); // 本地草稿
useEffect(() => setDraft(committed), [committed]);
const len = useComputed(() => titleSig.get().length); // ✅ hook 回傳「值」
const [saving, setSaving] = useState(false);
const [error, setError] = useState<string | null>(null);
const [shouldFail, setShouldFail] = useState(false);
const save = async () => {
setSaving(true);
setError(null);
try {
await atomic(async () => {
sig.set(draft); // 樂觀寫入(不立刻 flush)
await postTitle(draft, shouldFail); // 可能 throw → 回滾 + 不 flush
});
// 成功:退出 atomic 才 flush,一次更新 committed/len
} catch (e: any) {
setError(e?.message ?? "save failed");
} finally {
setSaving(false);
}
};
return (
<section>
<input
value={draft}
onChange={(e) => setDraft(e.target.value)}
disabled={saving}
/>
<button onClick={save} disabled={saving}>
{saving ? "Saving..." : "Save"}
</button>
<label style={{ marginLeft: 8 }}>
<input
type="checkbox"
checked={shouldFail}
onChange={(e) => setShouldFail(e.target.checked)}
/>
simulate failure
</label>
<hr />
<p>
Committed title: <b>{committed}</b>
</p>
<p>
Derived length (computed): <b>{len}</b>
</p>
{error && <p style={{ color: "crimson" }}>Error: {error}</p>}
</section>
);
}
committed
與 len
同一輪更新(一次 flush)。computed
被標髒標記,下次讀取時(例如下一次成功或其它更新驅動)會重算到一致的快照。同樣情境:SFC 版本;透過
useSignalRef
/useComputedRef
橋接。
<script setup lang="ts">
import { ref, watch } from "vue";
import { signal } from "../core/signal.js";
import { atomic } from "../core/scheduler.js";
import { useSignalRef, useComputedRef } from "../hook/vue_adapter.js";
// ---- 模擬 API ----
async function postTitle(v: string, shouldFail = false) {
await new Promise((r) => setTimeout(r, 300));
if (shouldFail) throw new Error("server says no");
return true;
}
// ---- 狀態 ----
const titleSig = signal("Hello");
const committed = useSignalRef(titleSig);
const titleLen = useComputedRef(() => titleSig.get().length);
const draft = ref(committed.value);
watch(committed, (v) => (draft.value = v)); // 外部變更時同步草稿
const saving = ref(false);
const error = ref<string | null>(null);
const shouldFail = ref(false);
async function save() {
saving.value = true;
error.value = null;
try {
await atomic(async () => {
// 樂觀寫入,不會馬上 flush
titleSig.set(draft.value);
await postTitle(draft.value, shouldFail.value); // 可能 throw
});
// 成功:退出 atomic 才 flush → 模板一次更新
} catch (e: any) {
// 失敗:回滾、不 flush → 模板維持舊值
error.value = e?.message ?? "save failed";
} finally {
saving.value = false;
}
}
</script>
<template>
<section>
<div>
<label>
Draft:
<input v-model="draft" :disabled="saving" />
</label>
<button @click="save" :disabled="saving">
{{ saving ? "Saving..." : "Save" }}
</button>
<label style="margin-left: 8px">
<input type="checkbox" v-model="shouldFail" :disabled="saving" />
simulate failure
</label>
</div>
<hr />
<p>
Committed title: <b>{{ committed }}</b>
</p>
<p>
Derived length (computed): <b>{{ titleLen }}</b>
</p>
<p v-if="error" style="color: crimson">Error: {{ error }}</p>
</section>
</template>
committed
與 titleLen
同一輪 patch 更新。computed
標髒標記,之後任何一次讀取都會 Lazy 重算到正確狀態。透過 atomic
可以讓我們的狀態更新機制更完整,就算 async
失敗仍能保持之前的狀態,成功還原一開始講的「它是一個把多筆狀態更新包起來的保護殼,保證這段操作要嘛全部成功、要嘛完全不生效。」
下一篇,我們來聊 Scheduler 進階的議題。