iT邦幫忙

2025 iThome 鐵人賽

DAY 21
0
生成式 AI

練習AI系列 第 22

RAG「增量索引」+ 併發控制 + 刪除檢測

  • 分享至 

  • xImage
  •  

🆕 程式碼

  1. src/utils/hash.js(新增)
    // src/utils/hash.js
    import fs from "fs";
    import crypto from "crypto";

export function sha256String(s = "") {
return crypto.createHash("sha256").update(s, "utf8").digest("hex");
}

export function sha256File(filePath) {
const buf = fs.readFileSync(filePath);
return crypto.createHash("sha256").update(buf).digest("hex");
}

  1. src/day21_indexer.js(新增:增量索引核心)
    // src/day21_indexer.js
    import fs from "fs";
    import path from "path";
    import { openai } from "./aiClient.js";
    import { chunkText, clean } from "./utils/text.js";
    import { ensureTenantNS } from "./utils/tenantfs.js";
    import { sha256File, sha256String } from "./utils/hash.js";

const EMBED_MODEL = process.env.OPENAI_EMBEDDING_MODEL || "text-embedding-3-small";
const BATCH = 64;

async function embedMany(texts = []) {
if (!texts.length) return [];
const res = await openai.embeddings.create({ model: EMBED_MODEL, input: texts });
return res.data.map(d => d.embedding);
}

function listDocs(dir) {
return fs.readdirSync(dir).filter(f => /.md$|.txt$/i.test(f)).map(f => path.join(dir, f));
}

function loadIndex(idxFile) {
if (!fs.existsSync(idxFile)) return null;
try { return JSON.parse(fs.readFileSync(idxFile, "utf-8")); }
catch { return null; }
}

/**

  • 增量索引(檔案/chunk 雜湊對比)
  • index 檔結構:
  • {
  • builtAt, model,
  • index: [{id, docId, text, vector}],
  • files: {
  • "<docId>": {
    
  •   size, mtimeMs, fileHash,
    
  •   chunks: [{ chunkId, hash, textLen }]
    
  • }
    
  • }
  • }
    */
    export async function buildIndexIncremental({
    tenant, ns, chunkSize = 800, overlap = 80, concurrency = 4
    }) {
    const { kbDir, idxFile } = ensureTenantNS(tenant, ns);
    const files = listDocs(kbDir);
    const prev = loadIndex(idxFile) || { builtAt: 0, model: EMBED_MODEL, index: [], files: {} };

// 以檔名映射既有 chunks,方便快速查找
const prevByDoc = prev.files || {};
const prevIndex = prev.index || [];

// 1) 掃描目前檔案,決定新增/更新/保留
const plan = { toEmbed: [], keep: [], deleted: [] };
const nowDocs = new Set();

for (const fp of files) {
const docId = path.basename(fp);
nowDocs.add(docId);
const stat = fs.statSync(fp);
const fileHash = sha256File(fp);

const raw = clean(fs.readFileSync(fp, "utf-8"));
const chunks = chunkText(raw, chunkSize, overlap).map(c => ({
  chunkId: c.id,
  text: c.content,
  hash: sha256String(c.content),
  textLen: c.content.length,
  docId,
}));

const prevMeta = prevByDoc[docId];
// 判斷是否完全相同(整檔 hash)
const sameFile = prevMeta && prevMeta.fileHash === fileHash;

if (sameFile) {
  // 完全相同 → 保留既有 chunks(避免重嵌入)
  plan.keep.push({ docId, chunksCount: prevMeta.chunks?.length || 0 });
} else {
  // 比對 chunk hash:只嵌入變更的片段
  const prevChunksById = new Map((prevMeta?.chunks || []).map(c => [c.chunkId, c]));
  const toEmbed = [];
  const toKeep = [];

  for (const ch of chunks) {
    const old = prevChunksById.get(ch.chunkId);
    if (!old || old.hash !== ch.hash) toEmbed.push(ch);
    else toKeep.push(ch);
  }

  plan.toEmbed.push({ docId, toEmbed, toKeep, fileHash, size: stat.size, mtimeMs: stat.mtimeMs });
}

}

// 2) 找出被刪除的檔案
for (const docId of Object.keys(prevByDoc)) {
if (!nowDocs.has(docId)) plan.deleted.push(docId);
}

// 3) 執行嵌入(僅 toEmbed 的片段)
const allToEmbed = plan.toEmbed.flatMap(x => x.toEmbed.map(ch => ({ ...ch })));
const embeddedVectors = [];

// 併發:按 BATCH 分批,每批 embedding 仍是單請求(模型會接受多 input)
for (let i = 0; i < allToEmbed.length; i += BATCH) {
const batch = allToEmbed.slice(i, i + BATCH);
const vecs = await embedMany(batch.map(b => b.text));
for (let k = 0; k < batch.length; k++) {
embeddedVectors.push({ id: ${batch[k].docId}#${batch[k].chunkId}, vector: vecs[k] });
}
}

const vectorMap = new Map(embeddedVectors.map(v => [v.id, v.vector]));

// 4) 合併新舊索引
// 先建立「存活檔案」集合,用於過濾舊 index 中的條目
const aliveDocs = new Set(files.map(fp => path.basename(fp)));
const merged = [];

// 4.1 保留舊 index 中「仍存在且未變更的 chunk」
for (const item of prevIndex) {
const [docId] = item.id.split("#");
if (!aliveDocs.has(docId)) continue; // 檔案已刪
const meta = prevByDoc[docId];
// 若整檔未變更,無條件保留
if (meta && files.find(fp => path.basename(fp) === docId)) {
const fileHash = meta.fileHash;
// 若該 docId 在 toEmbed 清單內,對於未變更 chunk 也保留(在 toKeep 之列)
const planDoc = plan.toEmbed.find(p => p.docId === docId);
if (!planDoc) {
merged.push(item);
} else {
// 僅保留 toKeep 對應的 chunk
const keepSet = new Set(planDoc.toKeep.map(c => ${docId}#${c.chunkId}));
if (keepSet.has(item.id)) merged.push(item);
}
}
}

// 4.2 加入新嵌入的 chunk
for (const doc of plan.toEmbed) {
for (const ch of doc.toEmbed) {
const id = ${ch.docId}#${ch.chunkId};
merged.push({
id,
docId: ch.docId,
text: ch.text,
vector: vectorMap.get(id),
});
}
}

// 5) 重建 files metadata
const filesMeta = {};
// 5.1 尚存文件的 meta(from plan.keep & plan.toEmbed)
for (const fp of files) {
const docId = path.basename(fp);
// 重新 derive 此檔案的 chunks(從 merged 過濾)
const chunksOfDoc = merged.filter(x => x.docId === docId).map(x => {
const chunkId = x.id.split("#")[1];
return { chunkId, hash: sha256String(x.text), textLen: x.text.length };
});
const stat = fs.statSync(fp);
const fileHash = sha256File(fp);
filesMeta[docId] = {
size: stat.size,
mtimeMs: stat.mtimeMs,
fileHash,
chunks: chunksOfDoc
};
}

// 6) 寫回索引檔
const out = {
builtAt: Date.now(),
model: EMBED_MODEL,
index: merged,
files: filesMeta,
};
const dir = path.dirname(idxFile);
if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true });
fs.writeFileSync(idxFile, JSON.stringify(out, null, 2), "utf-8");

// 7) 回傳摘要
const addedChunks = plan.toEmbed.reduce((s, d) => s + d.toEmbed.length, 0);
const keptChunks = plan.keep.reduce((s, d) => s + d.chunksCount, 0) +
plan.toEmbed.reduce((s, d) => s + d.toKeep.length, 0);
const deletedFiles = plan.deleted.length;
const totalChunks = out.index.length;

return {
idxFile,
addedChunks,
keptChunks,
deletedFiles,
totalChunks,
filesScanned: files.length,
docsChanged: plan.toEmbed.length,
};
}

  1. API:增量索引 reindex-incremental

app/api/kb/[tenant]/[ns]/reindex-incremental/route.js(新增)

import { NextResponse } from "next/server";
import { withAuth } from "../../../../../src/api/withAuth.js";
import { buildIndexIncremental } from "../../../../../src/day21_indexer.js";

export const runtime = "nodejs";

export const POST = withAuth(async (req, ctx) => {
if (ctx.user.role === "viewer") return NextResponse.json({ ok:false, error:"Editor/Admin only" }, { status:403 });
const { tenant, ns } = ctx.params;
const { chunkSize = 800, overlap = 80, concurrency = 4 } = await req.json().catch(()=>({}));

const out = await buildIndexIncremental({ tenant, ns, chunkSize, overlap, concurrency });
return NextResponse.json({ ok:true, ...out });
}, ["editor","admin"]);

  1. API:索引狀態 status

app/api/kb/[tenant]/[ns]/status/route.js(新增)

import { NextResponse } from "next/server";
import fs from "fs";
import { withAuth } from "../../../../../src/api/withAuth.js";
import { ensureTenantNS } from "../../../../../src/utils/tenantfs.js";

export const runtime = "nodejs";

export const GET = withAuth(async (_req, ctx) => {
const { tenant, ns } = ctx.params;
const { idxFile } = ensureTenantNS(tenant, ns);
if (!fs.existsSync(idxFile)) return NextResponse.json({ ok:true, exists:false });
const json = JSON.parse(fs.readFileSync(idxFile, "utf-8"));
const totalChunks = (json.index || []).length;
const filesCount = Object.keys(json.files || {}).length;
return NextResponse.json({
ok: true,
exists: true,
builtAt: json.builtAt || 0,
model: json.model || "",
totalChunks,
filesCount
});
}, ["viewer","editor","admin"]);

  1. 前端 /studio(修改:加「增量索引」+狀態顯示)

只貼 需要新增/替換 的片段;若你想省事,直接把這段「合併」到 Day 18/19 的 app/studio/page.tsx。

// 1) state:補上索引狀態
const [idxStatus, setIdxStatus] = useState<{exists?:boolean; builtAt?:number; totalChunks?:number; filesCount?:number} | null>(null);
const [reindexingInc, setReindexingInc] = useState(false);

// 2) 拉狀態
async function fetchStatus() {
if (!tenant || !ns) return;
try {
const r = await fetch(/api/kb/${tenant}/${ns}/status, { headers: { ...authHeaders() }});
const j = await r.json();
if (j.ok) setIdxStatus(j); else setIdxStatus(null);
} catch { setIdxStatus(null); }
}

// 在 useEffect:當 tenant/ns/token 變更時同步拉狀態
useEffect(()=>{ if (tenant && ns && token) fetchStatus(); }, [tenant, ns, token]);

// 3) 觸發增量索引
async function reindexIncremental() {
setReindexingInc(true); setErr("");
try {
const r = await fetch(/api/kb/${tenant}/${ns}/reindex-incremental, {
method:"POST",
headers: { "Content-Type":"application/json", ...authHeaders() },
body: JSON.stringify({ chunkSize: 800, overlap: 80, concurrency: 4 })
});
const j = await r.json();
if (!j.ok) throw new Error(j.error);
alert(增量完成:+${j.addedChunks} 新片段,總片段 ${j.totalChunks});
await fetchStatus();
} catch(e:any){ setErr(e.message); }
finally { setReindexingInc(false); }
}

把下列 UI 元件插到「知識庫管理卡片」的標題列右側(與「重建索引」按鈕同一排):

{/* 索引狀態顯示 */}
{idxStatus?.exists ? (

{/* 增量索引按鈕(viewer 不顯示) */}
{(role !== "viewer") && (
<button className={btn btn-outline btn-sm ${reindexingInc?"btn-disabled":""}} onClick={reindexIncremental}>
{reindexingInc ? "增量索引中..." : "增量索引"}

)}

提醒:保留原本「重建索引」按鈕(全量 rebuild);增量索引用於日常更新。

  1. package.json(新增 scripts)
    {
    "scripts": {
    "day21:reindex:inc": "curl -s -X POST http://localhost:3000/api/kb/acme/faq/reindex-incremental -H 'Authorization: Bearer ' -H 'Content-Type: application/json' -d '{"chunkSize":800,"overlap":80,"concurrency":4}'",
    "day21:status": "curl -s http://localhost:3000/api/kb/acme/faq/status -H 'Authorization: Bearer '"
    }
    }

上一篇
AI 數據分析助手(CSV/JSON → 統計摘要+洞察+可選圖表)
下一篇
智慧切塊(語義/段落)+穩定 Chunk ID(降低重算抖動)
系列文
練習AI24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言