From fc0e76439d24cada67a3e6d3804015368e01262c Mon Sep 17 00:00:00 2001 From: Huarch Date: Thu, 4 Jun 2026 18:19:29 +0800 Subject: [PATCH] =?UTF-8?q?fix(chat):=20=E8=A7=A3=E5=86=B3token=E4=BC=A0?= =?UTF-8?q?=E8=BE=93=E3=80=81=E6=9C=AC=E5=9C=B0=E6=96=87=E4=BB=B6=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E9=A1=BA=E5=BA=8F=E3=80=81=E8=AF=BB=E5=8F=96=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/routes/chat.ts | 46 ++++++++++++++++++++++++++--------- src/utils/fileStore.ts | 13 +++++++--- tests/utils/fileStore.test.ts | 33 +++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 16 deletions(-) create mode 100644 tests/utils/fileStore.test.ts diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 74cb94e..a36a1c5 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -823,12 +823,30 @@ export const buildChatRouter = ( }; activeRuns.set(clientSessionId, activeRun); lastRunStatuses.set(clientSessionId, "running"); - await sessionUiStateStore.write(toSessionUiStateContext(activeSessionRecord), { + const sessionUiStateContext = toSessionUiStateContext(activeSessionRecord); + let persistQueue = sessionUiStateStore.write(sessionUiStateContext, { sessionId: activeSessionRecord.sessionId, isTitleManuallyEdited: initialSessionState?.isTitleManuallyEdited ?? false, messages: initialMessages, branchGroups, }); + const queueSessionUiStatePersist = () => { + const snapshot = { + sessionId: activeSessionRecord.sessionId, + isTitleManuallyEdited: initialSessionState?.isTitleManuallyEdited ?? false, + messages: activeRun.messages, + branchGroups, + }; + persistQueue = persistQueue + .catch((error) => { + logger.warn( + { err: error, sessionId: clientSessionId }, + "failed to persist previous chat stream state", + ); + }) + .then(() => sessionUiStateStore.write(sessionUiStateContext, snapshot)); + return persistQueue; + }; const primarySubscriber: StreamSubscriber = { write: (event, data) => { if (!streamClosed && !res.writableEnded && !res.destroyed) { @@ -850,7 +868,7 @@ export const buildChatRouter = ( req.on("close", handleClientClose); res.on("close", handleClientClose); - const publish = async (event: string, data: Record) => { + const publish = (event: string, data: Record) => { if (event === "token") { activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({ ...message, @@ -887,15 +905,12 @@ export const buildChatRouter = ( })); } - await sessionUiStateStore.write(toSessionUiStateContext(activeSessionRecord), { - sessionId: activeSessionRecord.sessionId, - isTitleManuallyEdited: initialSessionState?.isTitleManuallyEdited ?? false, - messages: activeRun.messages, - branchGroups, - }); for (const subscriber of activeRun.subscribers) { subscriber.write(event, data); } + void queueSessionUiStatePersist().catch((error) => { + logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state"); + }); }; try { @@ -920,11 +935,12 @@ export const buildChatRouter = ( projectId: requestContext.projectId, signal: abortController.signal, write: (event, data) => { - void publish(event, data).catch((error) => { - logger.warn({ err: error, sessionId: clientSessionId }, "failed to publish chat stream event"); - }); + publish(event, data); }, }); + await persistQueue.catch((error) => { + logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state"); + }); if (!streamResult.aborted && !streamResult.failed) { const messages = await runtime.messages(binding.sessionId, 60); @@ -965,13 +981,19 @@ export const buildChatRouter = ( sessionTitle && sessionTitle !== existingSessionTitle ) { - await publish("session_title", { + publish("session_title", { session_id: clientSessionId, title: sessionTitle, }); + await persistQueue.catch((error) => { + logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state"); + }); } } } finally { + await persistQueue.catch((error) => { + logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state"); + }); sessionBridge.finalizeRequest(clientSessionId); activeRun.status = abortController.signal.aborted ? activeRun.status === "aborted" diff --git a/src/utils/fileStore.ts b/src/utils/fileStore.ts index 321d1e6..d843113 100644 --- a/src/utils/fileStore.ts +++ b/src/utils/fileStore.ts @@ -1,4 +1,4 @@ -import { createHash } from "node:crypto"; +import { createHash, randomUUID } from "node:crypto"; import { mkdir, readFile, readdir, rename, rm, stat, writeFile } from "node:fs/promises"; import { basename, dirname, join, relative } from "node:path"; @@ -13,9 +13,14 @@ export const ensureDirectory = async (path: string) => { export const atomicWriteFile = async (path: string, content: string) => { await ensureDirectory(dirname(path)); - const tempPath = `${path}.${process.pid}.${Date.now().toString(36)}.tmp`; - await writeFile(tempPath, content, "utf8"); - await rename(tempPath, path); + const tempPath = `${path}.${process.pid}.${Date.now().toString(36)}.${randomUUID()}.tmp`; + try { + await writeFile(tempPath, content, "utf8"); + await rename(tempPath, path); + } catch (error) { + await removeFileIfExists(tempPath); + throw error; + } }; type HistoricalWriteOptions = { diff --git a/tests/utils/fileStore.test.ts b/tests/utils/fileStore.test.ts new file mode 100644 index 0000000..510bea9 --- /dev/null +++ b/tests/utils/fileStore.test.ts @@ -0,0 +1,33 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtemp, readdir, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { atomicWriteFile, readTextFile } from "../../src/utils/fileStore.js"; + +describe("fileStore", () => { + const originalDateNow = Date.now; + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), "tjwater-file-store-")); + }); + + afterEach(async () => { + Date.now = originalDateNow; + await rm(tempDir, { force: true, recursive: true }); + }); + + it("uses unique temp paths for concurrent writes in the same millisecond", async () => { + Date.now = () => 1_801_578_600_000; + const path = join(tempDir, "state.json"); + const values = Array.from({ length: 24 }, (_, index) => `value-${index}`); + + await Promise.all(values.map((value) => atomicWriteFile(path, value))); + + const written = await readTextFile(path); + expect(written).not.toBeNull(); + expect(values).toContain(written as string); + expect((await readdir(tempDir)).filter((name) => name.endsWith(".tmp"))).toEqual([]); + }); +});