From 5d80961930e943dc36dc45ce820761db4cb37456 Mon Sep 17 00:00:00 2001 From: Huarch Date: Thu, 21 May 2026 15:41:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BC=9A=E8=AF=9D=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=8A=9F=E8=83=BD=EF=BC=8C=E7=94=B1=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=20opencode=20=E5=8F=91=E6=94=BE=20sessionId=EF=BC=8C=E5=90=8E?= =?UTF-8?q?=E7=AB=AF=E5=81=9A=20scope?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .opencode/tools/dynamic_http_call.ts | 10 +- .opencode/tools/fetch_result_ref.ts | 10 +- .opencode/tools/memory_manager.ts | 4 +- .opencode/tools/session_search.ts | 10 +- .opencode/tools/store_render_ref.ts | 10 +- src/chat/sessionBridge.ts | 318 ++++++++----------------- src/config.ts | 4 +- src/conversations/store.ts | 120 ++++++++++ src/history/store.ts | 69 +++++- src/learning/orchestrator.ts | 15 +- src/routes/chat.ts | 164 +++++++++---- src/routes/chatSession.ts | 48 +++- src/runtime/opencode.ts | 17 -- src/server.ts | 57 +++-- src/session/registry.ts | 80 ------- src/session/toolContextStore.ts | 11 + src/utils/fileStore.ts | 6 + tests/conversations/store.test.ts | 64 +++++ tests/history/store.test.ts | 138 +++++++++++ tests/session/toolContextStore.test.ts | 51 ++++ 20 files changed, 816 insertions(+), 390 deletions(-) create mode 100644 src/conversations/store.ts delete mode 100644 src/session/registry.ts create mode 100644 tests/conversations/store.test.ts create mode 100644 tests/history/store.test.ts create mode 100644 tests/session/toolContextStore.test.ts diff --git a/.opencode/tools/dynamic_http_call.ts b/.opencode/tools/dynamic_http_call.ts index 2570da2..80a5138 100644 --- a/.opencode/tools/dynamic_http_call.ts +++ b/.opencode/tools/dynamic_http_call.ts @@ -1,7 +1,10 @@ import { tool } from "@opencode-ai/plugin"; +import { ToolSessionContextStore } from "../../src/session/toolContextStore.js"; const internalBaseUrl = process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8787"; const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; +const toolContextStore = new ToolSessionContextStore(); +const initializePromise = toolContextStore.initialize(); export default tool({ description: @@ -21,6 +24,11 @@ export default tool({ .describe("Query arguments object."), }, async execute(args, context) { + await initializePromise; + const sessionContext = await toolContextStore.read(context.sessionID); + if (!sessionContext) { + throw new Error(`session context not found for ${context.sessionID}`); + } // 工具本身不直接持有用户 token;通过 sessionID 回调 Agent 服务,由服务侧补齐用户上下文。 const response = await fetch(`${internalBaseUrl}/internal/tools/dynamic-http-call`, { method: "POST", @@ -29,7 +37,7 @@ export default tool({ "x-agent-internal-token": internalToken, }, body: JSON.stringify({ - sessionId: context.sessionID, + sessionScopeKey: sessionContext.sessionScopeKey, reason: args.reason, path: args.path, method: args.method, diff --git a/.opencode/tools/fetch_result_ref.ts b/.opencode/tools/fetch_result_ref.ts index 10ac288..32a45c0 100644 --- a/.opencode/tools/fetch_result_ref.ts +++ b/.opencode/tools/fetch_result_ref.ts @@ -1,7 +1,10 @@ import { tool } from "@opencode-ai/plugin"; +import { ToolSessionContextStore } from "../../src/session/toolContextStore.js"; const internalBaseUrl = process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8787"; const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; +const toolContextStore = new ToolSessionContextStore(); +const initializePromise = toolContextStore.initialize(); export default tool({ description: @@ -19,6 +22,11 @@ export default tool({ .describe("Optional maximum number of top-level items or fields to return."), }, async execute(args, context) { + await initializePromise; + const sessionContext = await toolContextStore.read(context.sessionID); + if (!sessionContext) { + throw new Error(`session context not found for ${context.sessionID}`); + } const response = await fetch(`${internalBaseUrl}/internal/tools/fetch-result-ref`, { method: "POST", headers: { @@ -26,7 +34,7 @@ export default tool({ "x-agent-internal-token": internalToken, }, body: JSON.stringify({ - sessionId: context.sessionID, + sessionScopeKey: sessionContext.sessionScopeKey, result_ref: args.result_ref, max_items: args.max_items, }), diff --git a/.opencode/tools/memory_manager.ts b/.opencode/tools/memory_manager.ts index a1892c3..22f7795 100644 --- a/.opencode/tools/memory_manager.ts +++ b/.opencode/tools/memory_manager.ts @@ -80,7 +80,7 @@ export default tool({ if (args.action === "add") { const result = await memoryStore.upsert(scope, scopeKey, { content: args.content ?? "", - sessionId: context.sessionID, + sessionId: sessionContext.clientSessionId, source: "tool", traceId: sessionContext.traceId, }); @@ -105,7 +105,7 @@ export default tool({ if (args.action === "replace") { const result = await memoryStore.replace(scope, scopeKey, args.target_id ?? "", { content: args.content ?? "", - sessionId: context.sessionID, + sessionId: sessionContext.clientSessionId, source: "tool", traceId: sessionContext.traceId, }); diff --git a/.opencode/tools/session_search.ts b/.opencode/tools/session_search.ts index 152804e..9631b3c 100644 --- a/.opencode/tools/session_search.ts +++ b/.opencode/tools/session_search.ts @@ -1,8 +1,11 @@ import { tool } from "@opencode-ai/plugin"; +import { ToolSessionContextStore } from "../../src/session/toolContextStore.js"; const internalBaseUrl = process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8787"; const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; +const toolContextStore = new ToolSessionContextStore(); +const initializePromise = toolContextStore.initialize(); export default tool({ description: @@ -22,6 +25,11 @@ export default tool({ .describe("Optional maximum number of hits to return."), }, async execute(args, context) { + await initializePromise; + const sessionContext = await toolContextStore.read(context.sessionID); + if (!sessionContext) { + throw new Error(`session context not found for ${context.sessionID}`); + } const response = await fetch(`${internalBaseUrl}/internal/tools/session-search`, { method: "POST", headers: { @@ -31,7 +39,7 @@ export default tool({ body: JSON.stringify({ max_results: args.max_results, query: args.query, - sessionId: context.sessionID, + sessionScopeKey: sessionContext.sessionScopeKey, }), }); const text = await response.text(); diff --git a/.opencode/tools/store_render_ref.ts b/.opencode/tools/store_render_ref.ts index f9966cb..4b5496e 100644 --- a/.opencode/tools/store_render_ref.ts +++ b/.opencode/tools/store_render_ref.ts @@ -1,7 +1,10 @@ import { tool } from "@opencode-ai/plugin"; +import { ToolSessionContextStore } from "../../src/session/toolContextStore.js"; const internalBaseUrl = process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8787"; const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; +const toolContextStore = new ToolSessionContextStore(); +const initializePromise = toolContextStore.initialize(); export default tool({ description: @@ -15,6 +18,11 @@ export default tool({ .describe("Absolute path to a local JSON file containing the render payload or a wrapper object with data."), }, async execute(args, context) { + await initializePromise; + const sessionContext = await toolContextStore.read(context.sessionID); + if (!sessionContext) { + throw new Error(`session context not found for ${context.sessionID}`); + } const response = await fetch(`${internalBaseUrl}/internal/tools/store-render-ref`, { method: "POST", headers: { @@ -22,7 +30,7 @@ export default tool({ "x-agent-internal-token": internalToken, }, body: JSON.stringify({ - sessionId: context.sessionID, + sessionScopeKey: sessionContext.sessionScopeKey, file_path: args.file_path, }), }); diff --git a/src/chat/sessionBridge.ts b/src/chat/sessionBridge.ts index ef76879..b59d32b 100644 --- a/src/chat/sessionBridge.ts +++ b/src/chat/sessionBridge.ts @@ -2,10 +2,25 @@ import { randomUUID } from "node:crypto"; import { logger } from "../logger.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; -import { type SessionBinding, type SessionContext, SessionRegistry } from "../session/registry.js"; -import { ToolSessionContextStore } from "../session/toolContextStore.js"; +import { + buildToolSessionScopeKey, + ToolSessionContextStore, +} from "../session/toolContextStore.js"; import { toActorKey, toProjectKey } from "../utils/fileStore.js"; +export type SessionBinding = { + clientSessionId: string; + sessionId: string; + startedAt: number; +}; + +export type SessionContext = { + clientSessionId: string; + accessToken?: string; + projectId?: string; + userId?: string; +}; + export type ChatRequestContext = SessionContext & { actorKey: string; projectKey: string; @@ -13,15 +28,12 @@ export type ChatRequestContext = SessionContext & { }; export class ChatSessionBridge { - // 这里额外保存 session -> 用户上下文,供工具桥在服务端代发真实后端请求时复用。 - private readonly sessionContexts = new Map(); - private readonly sessionTitles = new Map(); + // runtime session 仅在单次请求生命周期内有效;线程连续性由 clientSessionId 对应的持久状态承担。 + private readonly activeRuntimeSessions = new Map(); + private readonly activeSensitiveContexts = new Map(); private readonly toolContextStore = new ToolSessionContextStore(); - constructor( - private readonly registry: SessionRegistry, - private readonly runtime: OpencodeRuntimeAdapter, - ) {} + constructor(private readonly runtime: OpencodeRuntimeAdapter) {} async resolve(context: { clientSessionId?: string; @@ -34,61 +46,22 @@ export class ChatSessionBridge { requestContext: ChatRequestContext; created: boolean; }> { - const requestContext: ChatRequestContext = { - clientSessionId: - context.clientSessionId?.trim() || `agent-${randomUUID().slice(0, 12)}`, - accessToken: context.accessToken, - actorKey: toActorKey(context.userId), - projectId: context.projectId, - projectKey: toProjectKey(context.projectId), - traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, - userId: context.userId?.trim(), - }; - - this.cleanupExpired(); - - const current = this.registry.get(requestContext); - if (current) { - this.sessionContexts.set(current.sessionId, requestContext); - await this.toolContextStore.write({ - actorKey: requestContext.actorKey, - allowLearningWrite: true, - clientSessionId: requestContext.clientSessionId, - learningMode: "interactive", - projectId: requestContext.projectId, - projectKey: requestContext.projectKey, - sessionId: current.sessionId, - traceId: requestContext.traceId, - }); - try { - // 只有 opencode 侧 session 仍存在时,才复用本地映射。 - await this.runtime.getSession(current.sessionId); - await this.runtime.waitForSessionIdle(current.sessionId).catch((error) => { - logger.warn( - { - clientSessionId: requestContext.clientSessionId, - sessionId: current.sessionId, - err: error, - }, - "failed while waiting for reused opencode session to become idle", - ); - }); - return { binding: current, requestContext, created: false }; - } catch (error) { - logger.warn( - { - clientSessionId: requestContext.clientSessionId, - sessionId: current.sessionId, - err: error, - }, - "existing opencode session lookup failed, creating a new session", - ); - } - } + const requestContext = this.buildRequestContext(context); + await this.abortActiveRuntime(requestContext.clientSessionId); const session = await this.runtime.createSession(requestContext.clientSessionId); - const binding = this.registry.upsert(requestContext, session.id); - this.sessionContexts.set(binding.sessionId, requestContext); + const binding: SessionBinding = { + clientSessionId: requestContext.clientSessionId, + sessionId: session.id, + startedAt: Date.now(), + }; + const sessionScopeKey = buildToolSessionScopeKey( + requestContext.actorKey, + requestContext.projectKey, + requestContext.clientSessionId, + ); + this.activeRuntimeSessions.set(requestContext.clientSessionId, session.id); + this.activeSensitiveContexts.set(sessionScopeKey, requestContext); await this.toolContextStore.write({ actorKey: requestContext.actorKey, allowLearningWrite: true, @@ -96,105 +69,70 @@ export class ChatSessionBridge { learningMode: "interactive", projectId: requestContext.projectId, projectKey: requestContext.projectKey, - sessionId: binding.sessionId, + sessionId: session.id, + sessionScopeKey, traceId: requestContext.traceId, }); + return { binding, requestContext, created: true }; } count(): number { - return this.registry.count(); + return this.activeRuntimeSessions.size; } - getSessionContext(sessionId: string) { - return this.sessionContexts.get(sessionId) ?? null; + createClientSessionId() { + return `agent-${randomUUID().slice(0, 12)}`; } - getSessionTitle(sessionId: string) { - return this.sessionTitles.get(sessionId); - } - - setSessionTitle(sessionId: string, title: string) { - const normalized = title.trim(); - if (!normalized) { - return; - } - this.sessionTitles.set(sessionId, normalized); - } - - cloneSessionTitle(sourceSessionId: string, targetSessionId: string) { - const existingTitle = this.sessionTitles.get(sourceSessionId); - if (!existingTitle) { - return; - } - this.sessionTitles.set(targetSessionId, existingTitle); + getActiveSensitiveContext(sessionScopeKey: string) { + return this.activeSensitiveContexts.get(sessionScopeKey) ?? null; } async abort(context: { clientSessionId?: string; - accessToken?: string; - projectId?: string; - traceId?: string; - userId?: string; }): Promise { const clientSessionId = context.clientSessionId?.trim(); if (!clientSessionId) { return null; } - const requestContext: ChatRequestContext = { - clientSessionId, - accessToken: context.accessToken, - actorKey: toActorKey(context.userId), - projectId: context.projectId, - projectKey: toProjectKey(context.projectId), - traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, - userId: context.userId?.trim(), - }; - - this.cleanupExpired(); - - const binding = this.registry.get(requestContext); - if (!binding) { + const sessionId = this.activeRuntimeSessions.get(clientSessionId); + if (!sessionId) { return null; } - this.sessionContexts.set(binding.sessionId, requestContext); - await this.toolContextStore.write({ - actorKey: requestContext.actorKey, - allowLearningWrite: true, - clientSessionId: requestContext.clientSessionId, - learningMode: "interactive", - projectId: requestContext.projectId, - projectKey: requestContext.projectKey, - sessionId: binding.sessionId, - traceId: requestContext.traceId, - }); - await this.runtime.abortSession(binding.sessionId); - await this.runtime.waitForSessionIdle(binding.sessionId).catch((error) => { - logger.warn( - { clientSessionId, sessionId: binding.sessionId, err: error }, - "failed while waiting for aborted opencode session to become idle", - ); - }); - return binding; + await this.abortActiveRuntime(clientSessionId); + return { + clientSessionId, + sessionId, + startedAt: Date.now(), + }; } - async fork(context: { + async releaseRuntimeSession(clientSessionId: string, sessionId: string) { + const activeSessionId = this.activeRuntimeSessions.get(clientSessionId); + if (activeSessionId === sessionId) { + this.activeRuntimeSessions.delete(clientSessionId); + } + this.activeSensitiveContexts.delete(findScopeKey(this.activeSensitiveContexts, clientSessionId)); + await this.toolContextStore.remove(sessionId).catch((error) => { + logger.debug({ sessionId, err: error }, "failed to cleanup runtime tool context"); + }); + await this.runtime.abortSession(sessionId).catch((error) => { + logger.debug({ sessionId, err: error }, "failed to cleanup runtime session"); + }); + } + + private buildRequestContext(context: { clientSessionId?: string; accessToken?: string; projectId?: string; traceId?: string; - keepMessageCount: number; userId?: string; - }): Promise<{ - binding: SessionBinding; - requestContext: ChatRequestContext; - created: boolean; - }> { - const currentClientSessionId = context.clientSessionId?.trim(); - const nextRequestContext: ChatRequestContext = { - clientSessionId: `agent-${randomUUID().slice(0, 12)}`, + }): ChatRequestContext { + return { + clientSessionId: context.clientSessionId?.trim() || this.createClientSessionId(), accessToken: context.accessToken, actorKey: toActorKey(context.userId), projectId: context.projectId, @@ -202,95 +140,39 @@ export class ChatSessionBridge { traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, userId: context.userId?.trim(), }; - - this.cleanupExpired(); - - if (!currentClientSessionId || context.keepMessageCount <= 0) { - const session = await this.runtime.createSession(nextRequestContext.clientSessionId); - const binding = this.registry.upsert(nextRequestContext, session.id); - this.sessionContexts.set(binding.sessionId, nextRequestContext); - await this.toolContextStore.write({ - actorKey: nextRequestContext.actorKey, - allowLearningWrite: true, - clientSessionId: nextRequestContext.clientSessionId, - learningMode: "interactive", - projectId: nextRequestContext.projectId, - projectKey: nextRequestContext.projectKey, - sessionId: binding.sessionId, - traceId: nextRequestContext.traceId, - }); - return { binding, requestContext: nextRequestContext, created: true }; - } - - const currentContext: ChatRequestContext = { - clientSessionId: currentClientSessionId, - accessToken: context.accessToken, - actorKey: toActorKey(context.userId), - projectId: context.projectId, - projectKey: toProjectKey(context.projectId), - traceId: nextRequestContext.traceId, - userId: context.userId?.trim(), - }; - - const current = this.registry.get(currentContext); - if (!current) { - const session = await this.runtime.createSession(nextRequestContext.clientSessionId); - const binding = this.registry.upsert(nextRequestContext, session.id); - this.sessionContexts.set(binding.sessionId, nextRequestContext); - await this.toolContextStore.write({ - actorKey: nextRequestContext.actorKey, - allowLearningWrite: true, - clientSessionId: nextRequestContext.clientSessionId, - learningMode: "interactive", - projectId: nextRequestContext.projectId, - projectKey: nextRequestContext.projectKey, - sessionId: binding.sessionId, - traceId: nextRequestContext.traceId, - }); - return { binding, requestContext: nextRequestContext, created: true }; - } - - await this.runtime.getSession(current.sessionId); - const messages = await this.runtime.messages( - current.sessionId, - Math.max(100, context.keepMessageCount + 20), - ); - const chatMessages = messages.filter( - (message) => message.info.role === "user" || message.info.role === "assistant", - ); - const keepMessage = chatMessages[context.keepMessageCount - 1]; - - if (!keepMessage) { - throw new Error(`fork keep point not found for message count ${context.keepMessageCount}`); - } - - const session = await this.runtime.forkSession(current.sessionId, keepMessage.info.id); - const binding = this.registry.upsert(nextRequestContext, session.id); - this.sessionContexts.set(binding.sessionId, nextRequestContext); - await this.toolContextStore.write({ - actorKey: nextRequestContext.actorKey, - allowLearningWrite: true, - clientSessionId: nextRequestContext.clientSessionId, - learningMode: "interactive", - projectId: nextRequestContext.projectId, - projectKey: nextRequestContext.projectKey, - sessionId: binding.sessionId, - traceId: nextRequestContext.traceId, - }); - this.cloneSessionTitle(current.sessionId, binding.sessionId); - return { binding, requestContext: nextRequestContext, created: true }; } - cleanupExpired(): void { - const expiredSessionIds = this.registry.evictExpired(); - for (const sessionId of expiredSessionIds) { - this.sessionContexts.delete(sessionId); - this.sessionTitles.delete(sessionId); - void this.toolContextStore.remove(sessionId); - // 这里用 abort 做轻量清理;即使失败,也不阻断本地过期回收。 - void this.runtime.abortSession(sessionId).catch((error) => { - logger.debug({ sessionId, err: error }, "ignoring failed abort for expired session"); - }); + private async abortActiveRuntime(clientSessionId: string) { + const activeSessionId = this.activeRuntimeSessions.get(clientSessionId); + if (!activeSessionId) { + return; } + this.activeRuntimeSessions.delete(clientSessionId); + this.activeSensitiveContexts.delete(findScopeKey(this.activeSensitiveContexts, clientSessionId)); + await this.toolContextStore.remove(activeSessionId).catch(() => undefined); + await this.runtime.abortSession(activeSessionId).catch((error) => { + logger.warn( + { clientSessionId, sessionId: activeSessionId, err: error }, + "failed to abort previous active runtime session", + ); + }); + await this.runtime.waitForSessionIdle(activeSessionId).catch((error) => { + logger.warn( + { clientSessionId, sessionId: activeSessionId, err: error }, + "failed while waiting for previous runtime session to become idle", + ); + }); } } + +const findScopeKey = ( + contexts: Map, + clientSessionId: string, +) => { + for (const [scopeKey, context] of contexts.entries()) { + if (context.clientSessionId === clientSessionId) { + return scopeKey; + } + } + return clientSessionId; +}; diff --git a/src/config.ts b/src/config.ts index c7d59a9..23af3be 100644 --- a/src/config.ts +++ b/src/config.ts @@ -45,8 +45,6 @@ const envSchema = z OPENCODE_MODEL: z.string().default("deepseek/deepseek-v4-pro"), // client 模式下,目标 opencode server 的基础地址。 OPENCODE_CLIENT_BASE_URL: z.string().url().optional(), - // chat session 在本地注册表中的保活时长(秒)。 - SESSION_TTL_SECONDS: z.coerce.number().int().positive().default(1800), // 提供给本地 opencode tools 读取的会话上下文目录。 SESSION_CONTEXT_STORAGE_DIR: z.string().default("./data/session-contexts"), // TJWater 后端 API 的基础地址。 @@ -65,6 +63,8 @@ const envSchema = z MEMORY_MAX_PROMPT_CHARS: z.coerce.number().int().positive().default(1800), // session transcript 持久化目录。 SESSION_HISTORY_STORAGE_DIR: z.string().default("./data/session-history"), + // conversation metadata 持久化目录。 + CONVERSATION_STORAGE_DIR: z.string().default("./data/conversations"), // 每个会话最多保留多少轮 transcript,超过后裁剪旧记录。 SESSION_HISTORY_MAX_TURNS_PER_SESSION: z.coerce .number() diff --git a/src/conversations/store.ts b/src/conversations/store.ts new file mode 100644 index 0000000..f7ec26b --- /dev/null +++ b/src/conversations/store.ts @@ -0,0 +1,120 @@ +import { randomUUID } from "node:crypto"; +import { join } from "node:path"; + +import { config } from "../config.js"; +import { atomicWriteJson, ensureDirectory, readJsonFile } from "../utils/fileStore.js"; +import { toConversationScopeKey } from "../utils/fileStore.js"; + +export type ConversationStatus = "active" | "archived"; + +export type ConversationRecord = { + sessionId: string; + sessionScopeKey: string; + actorKey: string; + ownerUserId?: string; + projectId?: string; + projectKey: string; + parentSessionId?: string; + createdAt: string; + updatedAt: string; + status: ConversationStatus; + title?: string; +}; + +type ConversationContext = { + actorKey: string; + userId?: string; + projectId?: string; + projectKey: string; +}; + +type EnsureConversationInput = ConversationContext & { + sessionId?: string; + parentSessionId?: string; +}; + +export class ConversationStore { + constructor(private readonly baseDir = config.CONVERSATION_STORAGE_DIR) {} + + async initialize() { + await ensureDirectory(this.baseDir); + } + + async ensure(input: EnsureConversationInput) { + const sessionId = normalizeSessionId(input.sessionId) ?? createConversationSessionId(); + const sessionScopeKey = toConversationScopeKey( + input.actorKey, + input.projectKey, + sessionId, + ); + const existing = await readJsonFile(this.filePath(sessionScopeKey)); + if (existing) { + return { created: false, record: existing }; + } + + const now = new Date().toISOString(); + const record: ConversationRecord = { + sessionId, + sessionScopeKey, + actorKey: input.actorKey, + ownerUserId: input.userId?.trim(), + projectId: input.projectId, + projectKey: input.projectKey, + parentSessionId: normalizeSessionId(input.parentSessionId), + createdAt: now, + updatedAt: now, + status: "active", + }; + await atomicWriteJson(this.filePath(sessionScopeKey), record); + return { created: true, record }; + } + + async get(context: ConversationContext, sessionId: string) { + const normalizedSessionId = normalizeSessionId(sessionId); + if (!normalizedSessionId) { + return null; + } + return await readJsonFile( + this.filePath( + toConversationScopeKey(context.actorKey, context.projectKey, normalizedSessionId), + ), + ); + } + + async touch(record: ConversationRecord, updates: Partial> = {}) { + const next: ConversationRecord = { + ...record, + ...normalizeConversationUpdates(updates), + updatedAt: new Date().toISOString(), + }; + await atomicWriteJson(this.filePath(record.sessionScopeKey), next); + return next; + } + + private filePath(sessionScopeKey: string) { + return join(this.baseDir, `${sessionScopeKey}.json`); + } +} + +export const createConversationSessionId = () => `chat-${randomUUID().slice(0, 16)}`; + +const normalizeSessionId = (value?: string) => { + const normalized = value?.trim(); + return normalized ? normalized.slice(0, 128) : undefined; +}; + +const normalizeConversationUpdates = ( + updates: Partial>, +) => { + const normalized: Partial> = {}; + if (updates.status === "active" || updates.status === "archived") { + normalized.status = updates.status; + } + if (typeof updates.title === "string") { + const trimmed = updates.title.trim(); + if (trimmed) { + normalized.title = trimmed; + } + } + return normalized; +}; diff --git a/src/history/store.ts b/src/history/store.ts index cd7525b..11244b0 100644 --- a/src/history/store.ts +++ b/src/history/store.ts @@ -85,6 +85,7 @@ export class SessionHistoryStore { userMessage, }; transcript.clientSessionId = context.clientSessionId ?? transcript.clientSessionId; + transcript.sessionId = context.sessionId; transcript.turns.push(record); if (transcript.turns.length > config.SESSION_HISTORY_MAX_TURNS_PER_SESSION) { transcript.turns = transcript.turns.slice( @@ -108,6 +109,25 @@ export class SessionHistoryStore { return transcript.turns.slice(-Math.max(1, limit)); } + async cloneThread( + sourceContext: SessionHistoryContext, + targetContext: SessionHistoryContext, + keepMessageCount: number, + ) { + const sourceTranscript = await this.readTranscript(sourceContext); + const timestamp = new Date().toISOString(); + const nextTranscript: SessionTranscriptRecord = { + actorKey: targetContext.actorKey, + clientSessionId: targetContext.clientSessionId, + projectKey: targetContext.projectKey, + sessionId: targetContext.sessionId, + turns: projectTurnsForFork(sourceTranscript?.turns ?? [], keepMessageCount), + updatedAt: timestamp, + }; + await atomicWriteJson(this.filePath(targetContext), nextTranscript); + return nextTranscript; + } + async search( context: Pick, query: string, @@ -156,7 +176,38 @@ export class SessionHistoryStore { } private async readTranscript(context: SessionHistoryContext) { - return await readJsonFile(this.filePath(context)); + const direct = await readJsonFile(this.filePath(context)); + if (direct) { + return direct; + } + + const clientSessionId = context.clientSessionId?.trim(); + if (!clientSessionId) { + return null; + } + + const files = await listJsonFiles(this.baseDir); + const matches: SessionTranscriptRecord[] = []; + for (const file of files) { + const transcript = await readJsonFile(file); + if (!transcript) { + continue; + } + if ( + transcript.actorKey !== context.actorKey || + transcript.projectKey !== context.projectKey || + transcript.clientSessionId !== clientSessionId + ) { + continue; + } + matches.push(transcript); + } + + if (matches.length === 0) { + return null; + } + + return matches.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt))[0] ?? null; } private filePath(context: SessionHistoryContext) { @@ -211,3 +262,19 @@ const buildSnippet = (text: string, query: string) => { const suffix = end < compact.length ? "..." : ""; return `${prefix}${snippet}${suffix}`; }; + +const projectTurnsForFork = ( + turns: SessionTurnRecord[], + keepMessageCount: number, +): SessionTurnRecord[] => { + if (keepMessageCount <= 0) { + return []; + } + + const keepTurnCount = Math.floor(keepMessageCount / 2); + if (keepTurnCount <= 0) { + return []; + } + + return turns.slice(0, keepTurnCount); +}; diff --git a/src/learning/orchestrator.ts b/src/learning/orchestrator.ts index f955b4a..7fae655 100644 --- a/src/learning/orchestrator.ts +++ b/src/learning/orchestrator.ts @@ -9,7 +9,10 @@ import { LearningStateStore } from "./stateStore.js"; import { MemoryStore, type MemoryScope } from "../memory/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; import { SkillStore } from "../skills/store.js"; -import { ToolSessionContextStore } from "../session/toolContextStore.js"; +import { + buildToolSessionScopeKey, + ToolSessionContextStore, +} from "../session/toolContextStore.js"; import { sanitizePersistentDocument, sanitizePersistentLine, @@ -150,6 +153,11 @@ export class LearningOrchestrator { projectId: input.requestContext.projectId, projectKey: input.requestContext.projectKey, sessionId: gateSession.id, + sessionScopeKey: buildToolSessionScopeKey( + input.requestContext.actorKey, + input.requestContext.projectKey, + input.requestContext.clientSessionId, + ), traceId: input.requestContext.traceId, }); await this.runtime.prompt( @@ -239,6 +247,11 @@ export class LearningOrchestrator { projectId: input.requestContext.projectId, projectKey: input.requestContext.projectKey, sessionId: reviewSession.id, + sessionScopeKey: buildToolSessionScopeKey( + input.requestContext.actorKey, + input.requestContext.projectKey, + input.requestContext.clientSessionId, + ), traceId: input.requestContext.traceId, }); try { diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 070a55f..9f568cf 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -2,17 +2,18 @@ import { Router } from "express"; import { z } from "zod"; import { type LearningOrchestrator } from "../learning/orchestrator.js"; +import { type SessionHistoryStore } from "../history/store.js"; import { logger } from "../logger.js"; import { MemoryStore } from "../memory/store.js"; +import { type ConversationStore } from "../conversations/store.js"; import { type ResultReferenceResolver } from "../results/resolver.js"; import { RESULT_REFERENCE_KIND } from "../results/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; import { type ChatSessionBridge } from "../chat/sessionBridge.js"; -import { toActorKey } from "../utils/fileStore.js"; +import { toActorKey, toProjectKey } from "../utils/fileStore.js"; import { buildPromptWithLearningContext, generateSessionTitle, - getConversationTurnStats, } from "./chatSession.js"; import { collectTextContent, @@ -31,6 +32,11 @@ const abortPayloadSchema = z.object({ session_id: z.string().max(128), }); +const createSessionPayloadSchema = z.object({ + session_id: z.string().max(128).optional(), + parent_session_id: z.string().max(128).optional(), +}); + const forkPayloadSchema = z.object({ session_id: z.string().max(128).optional(), keep_message_count: z.coerce.number().int().min(0), @@ -39,12 +45,48 @@ const forkPayloadSchema = z.object({ export const buildChatRouter = ( sessionBridge: ChatSessionBridge, runtime: OpencodeRuntimeAdapter, + conversationStore: ConversationStore, memoryStore: MemoryStore, + sessionHistoryStore: SessionHistoryStore, learningOrchestrator: LearningOrchestrator, resultReferenceResolver: ResultReferenceResolver, ) => { const chatRouter = Router(); + chatRouter.post("/session", async (req, res) => { + const parsed = createSessionPayloadSchema.safeParse(req.body ?? {}); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + const projectId = req.header("x-project-id") ?? undefined; + const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + + const { record, created } = await conversationStore.ensure({ + actorKey, + parentSessionId: parsed.data.parent_session_id, + projectId, + projectKey, + sessionId: parsed.data.session_id, + userId, + }); + + res.status(created ? 201 : 200).json({ + session_id: record.sessionId, + created_at: record.createdAt, + updated_at: record.updatedAt, + status: record.status, + title: record.title, + parent_session_id: record.parentSessionId, + }); + }); + chatRouter.get("/render-ref/:renderRef", async (req, res) => { const renderRef = req.params.renderRef?.trim(); const userId = req.header("x-user-id")?.trim(); @@ -99,20 +141,8 @@ export const buildChatRouter = ( } try { - const authHeader = req.header("authorization"); - const accessToken = authHeader?.startsWith("Bearer ") - ? authHeader.slice("Bearer ".length) - : authHeader; - const projectId = req.header("x-project-id") ?? undefined; - const traceId = req.header("x-trace-id") ?? undefined; - const userId = req.header("x-user-id") ?? undefined; - const binding = await sessionBridge.abort({ clientSessionId: parsed.data.session_id, - accessToken, - projectId, - traceId, - userId, }); if (!binding) { @@ -124,8 +154,6 @@ export const buildChatRouter = ( { clientSessionId: parsed.data.session_id, sessionId: binding.sessionId, - traceId, - projectId, }, "aborted chat session by client request", ); @@ -154,37 +182,69 @@ export const buildChatRouter = ( } try { - const authHeader = req.header("authorization"); - const accessToken = authHeader?.startsWith("Bearer ") - ? authHeader.slice("Bearer ".length) - : authHeader; const projectId = req.header("x-project-id") ?? undefined; const traceId = req.header("x-trace-id") ?? undefined; const userId = req.header("x-user-id") ?? undefined; - const { binding, requestContext } = await sessionBridge.fork({ - clientSessionId: parsed.data.session_id, - accessToken, + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const sourceClientSessionId = parsed.data.session_id?.trim(); + const sourceConversation = sourceClientSessionId + ? await conversationStore.get( + { + actorKey, + projectId, + projectKey, + userId, + }, + sourceClientSessionId, + ) + : null; + const { record: targetConversation } = await conversationStore.ensure({ + actorKey, + parentSessionId: sourceClientSessionId, projectId, - traceId, - keepMessageCount: parsed.data.keep_message_count, + projectKey, userId, }); + const nextClientSessionId = targetConversation.sessionId; + + if (sourceClientSessionId && parsed.data.keep_message_count > 0) { + await sessionHistoryStore.cloneThread( + { + actorKey, + clientSessionId: sourceClientSessionId, + projectKey, + sessionId: sourceClientSessionId, + }, + { + actorKey, + clientSessionId: nextClientSessionId, + projectKey, + sessionId: nextClientSessionId, + }, + parsed.data.keep_message_count, + ); + if (sourceConversation?.title) { + await conversationStore.touch(targetConversation, { + title: sourceConversation.title, + }); + } + } logger.info( { sourceClientSessionId: parsed.data.session_id, - clientSessionId: requestContext.clientSessionId, - sessionId: binding.sessionId, - traceId: requestContext.traceId, - projectId: requestContext.projectId, + clientSessionId: nextClientSessionId, + traceId, + projectId, keepMessageCount: parsed.data.keep_message_count, }, "forked chat session", ); res.status(200).json({ - session_id: requestContext.clientSessionId, + session_id: nextClientSessionId, }); } catch (error) { const detail = error instanceof Error ? error.message : String(error); @@ -214,20 +274,38 @@ export const buildChatRouter = ( const projectId = req.header("x-project-id") ?? undefined; const traceId = req.header("x-trace-id") ?? undefined; const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const { record: conversation, created: conversationCreated } = + await conversationStore.ensure({ + actorKey, + projectId, + projectKey, + sessionId: parsed.data.session_id, + userId, + }); + const activeConversation = await conversationStore.touch(conversation); const { binding, requestContext, created } = await sessionBridge.resolve({ - clientSessionId: parsed.data.session_id, + clientSessionId: activeConversation.sessionId, accessToken, projectId, traceId, userId, }); + const historyContext = { + actorKey: requestContext.actorKey, + clientSessionId: requestContext.clientSessionId, + projectKey: requestContext.projectKey, + sessionId: requestContext.clientSessionId, + }; + const recentTurns = await sessionHistoryStore.getRecentTurns(historyContext, 8); logger.info( { clientSessionId: requestContext.clientSessionId, sessionId: binding.sessionId, - created, + created: created || conversationCreated, model: parsed.data.model, traceId: requestContext.traceId, projectId: requestContext.projectId, @@ -260,6 +338,7 @@ export const buildChatRouter = ( memoryStore, requestContext.actorKey, requestContext.projectKey, + recentTurns, parsed.data.message, ); const streamResult = await streamPromptResponse({ @@ -285,23 +364,21 @@ export const buildChatRouter = ( .reverse() .find((message) => message.info.role === "assistant"); const assistantText = collectTextContent(assistantMessage?.parts ?? []); - const existingSessionTitle = sessionBridge.getSessionTitle(binding.sessionId); + const existingSessionTitle = activeConversation.title; let sessionTitle = existingSessionTitle; - const { userMessageCount, assistantMessageCount } = - await getConversationTurnStats(runtime, binding.sessionId); - const shouldGenerateTitle = - userMessageCount <= 3 && - assistantMessageCount >= 1; + const shouldGenerateTitle = recentTurns.length <= 1; if (shouldGenerateTitle) { sessionTitle = await generateSessionTitle(runtime, { sessionId: binding.sessionId, latestUserMessage: parsed.data.message, fallbackTitle: existingSessionTitle, }); - if (sessionTitle !== existingSessionTitle) { - sessionBridge.setSessionTitle(binding.sessionId, sessionTitle); - } } + const nextConversation = await conversationStore.touch(activeConversation, { + ...(sessionTitle && sessionTitle !== existingSessionTitle + ? { title: sessionTitle } + : {}), + }); if (!streamClosed && !res.writableEnded && !res.destroyed) { if ( shouldGenerateTitle && @@ -321,18 +398,19 @@ export const buildChatRouter = ( assistantMessage: assistantText, model: parsed.data.model, requestContext, - sessionId: binding.sessionId, + sessionId: clientSessionId, toolCallCount: streamResult.toolCallCount, userMessage: parsed.data.message, }).catch((error) => { logger.warn( - { err: error, sessionId: binding.sessionId }, + { err: error, sessionId: clientSessionId }, "post-turn learning failed", ); }); } } } finally { + await sessionBridge.releaseRuntimeSession(clientSessionId, binding.sessionId); streamClosed = true; req.off("close", handleClientClose); res.off("close", handleClientClose); diff --git a/src/routes/chatSession.ts b/src/routes/chatSession.ts index 7cb21dc..9bc3758 100644 --- a/src/routes/chatSession.ts +++ b/src/routes/chatSession.ts @@ -1,4 +1,5 @@ import { logger } from "../logger.js"; +import { type SessionTurnRecord } from "../history/store.js"; import { MemoryStore } from "../memory/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; @@ -8,6 +9,9 @@ const TITLE_PROMPT_TIMEOUT_MS = 5000; const TITLE_CONTEXT_MESSAGE_LIMIT = 40; const TITLE_CONTEXT_CHAR_LIMIT = 2400; const TITLE_CONTEXT_MESSAGE_CHAR_LIMIT = 240; +const RESTORE_TURN_LIMIT = 8; +const RESTORE_MESSAGE_CHAR_LIMIT = 480; +const RESTORE_CONTEXT_CHAR_LIMIT = 3200; const buildSessionTitle = (message: string) => { const normalized = message.replace(/\s+/g, " ").trim(); @@ -155,11 +159,51 @@ export const buildPromptWithLearningContext = async ( memoryStore: MemoryStore, actorKey: string, projectKey: string, + recentTurns: SessionTurnRecord[], message: string, ) => { const snapshot = await memoryStore.buildPromptSnapshot({ actorKey, projectKey }); - if (!snapshot) { + const restoredConversation = buildRestoredConversationContext(recentTurns); + if (!snapshot && !restoredConversation) { return message; } - return `${snapshot}\n\n[Current user request]\n${message}`; + return [snapshot, restoredConversation, `[Current user request]\n${message}`] + .filter(Boolean) + .join("\n\n"); +}; + +const buildRestoredConversationContext = (recentTurns: SessionTurnRecord[]) => { + const formattedTurns = recentTurns + .slice(-RESTORE_TURN_LIMIT) + .flatMap((turn) => [ + `用户:${compactMessage(turn.userMessage)}`, + `助手:${compactMessage(turn.assistantMessage)}`, + ]) + .filter((entry) => entry.length > 0); + + if (formattedTurns.length === 0) { + return ""; + } + + const conversation = formattedTurns.join("\n"); + const trimmedConversation = + conversation.length > RESTORE_CONTEXT_CHAR_LIMIT + ? `${conversation.slice(0, RESTORE_CONTEXT_CHAR_LIMIT - 3)}...` + : conversation; + + return [ + "[Previous conversation context]", + "以下为当前前端对话线程中最近的历史对话,请延续其中已确认的目标、约束、结论与引用结果。", + trimmedConversation, + ].join("\n"); +}; + +const compactMessage = (value: string) => { + const normalized = value.replace(/\s+/g, " ").trim(); + if (!normalized) { + return ""; + } + return normalized.length > RESTORE_MESSAGE_CHAR_LIMIT + ? `${normalized.slice(0, RESTORE_MESSAGE_CHAR_LIMIT - 3)}...` + : normalized; }; \ No newline at end of file diff --git a/src/runtime/opencode.ts b/src/runtime/opencode.ts index 0a8c089..9d63df4 100644 --- a/src/runtime/opencode.ts +++ b/src/runtime/opencode.ts @@ -54,14 +54,6 @@ export class OpencodeRuntimeAdapter { return requireData(response.data, "session.create"); } - async getSession(id: string) { - const client = await this.ensureClient(); - const response = await client.session.get({ - sessionID: id, - }); - return requireData(response.data, "session.get"); - } - async sendPrompt(sessionId: string, text: string) { await this.prompt(sessionId, text); // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, @@ -103,15 +95,6 @@ export class OpencodeRuntimeAdapter { return requireData(messages.data, "session.messages"); } - async forkSession(sessionId: string, messageId?: string) { - const client = await this.ensureClient(); - const response = await client.session.fork({ - sessionID: sessionId, - messageID: messageId, - }); - return requireData(response.data, "session.fork"); - } - async abortSession(sessionId: string) { const client = await this.ensureClient(); const response = await client.session.abort({ diff --git a/src/server.ts b/src/server.ts index f7fc967..d56080a 100644 --- a/src/server.ts +++ b/src/server.ts @@ -5,6 +5,7 @@ import express from "express"; import { SessionHistoryStore } from "./history/store.js"; import { ChatSessionBridge } from "./chat/sessionBridge.js"; import { config } from "./config.js"; +import { ConversationStore } from "./conversations/store.js"; import { logger } from "./logger.js"; import { LearningOrchestrator } from "./learning/orchestrator.js"; import { MemoryStore } from "./memory/store.js"; @@ -12,13 +13,12 @@ import { ResultReferenceResolver } from "./results/resolver.js"; import { ResultReferenceStore } from "./results/store.js"; import { buildChatRouter } from "./routes/chat.js"; import { opencodeRuntime } from "./runtime/opencode.js"; -import { SessionRegistry } from "./session/registry.js"; import { ToolSessionContextStore } from "./session/toolContextStore.js"; import { DynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js"; const app = express(); -const registry = new SessionRegistry(config.SESSION_TTL_SECONDS); -const sessionBridge = new ChatSessionBridge(registry, opencodeRuntime); +const sessionBridge = new ChatSessionBridge(opencodeRuntime); +const conversationStore = new ConversationStore(); const memoryStore = new MemoryStore(); const sessionHistoryStore = new SessionHistoryStore(); const toolContextStore = new ToolSessionContextStore(); @@ -63,12 +63,22 @@ app.post("/internal/tools/dynamic-http-call", async (req, res) => { return; } - const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; - const context = sessionBridge.getSessionContext(sessionId); + const sessionScopeKey = + typeof req.body?.sessionScopeKey === "string" ? req.body.sessionScopeKey : ""; + const threadContext = await toolContextStore.read(sessionScopeKey); + const runtimeContext = sessionBridge.getActiveSensitiveContext(sessionScopeKey); + if (!threadContext && !runtimeContext) { + res.status(404).json({ + message: "runtime or session context not found", + detail: sessionScopeKey, + }); + return; + } + const context = runtimeContext ?? threadContext; if (!context) { res.status(404).json({ - message: "session context not found", - detail: sessionId, + message: "runtime or session context not found", + detail: sessionScopeKey, }); return; } @@ -83,12 +93,12 @@ app.post("/internal/tools/dynamic-http-call", async (req, res) => { arguments: req.body?.arguments, }, { - accessToken: context.accessToken, + accessToken: runtimeContext?.accessToken, actorKey: context.actorKey, clientSessionId: context.clientSessionId, projectId: context.projectId, projectKey: context.projectKey, - sessionId, + sessionId: context.clientSessionId, traceId: context.traceId, }, ); @@ -108,13 +118,14 @@ app.post("/internal/tools/fetch-result-ref", async (req, res) => { return; } - const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const sessionScopeKey = + typeof req.body?.sessionScopeKey === "string" ? req.body.sessionScopeKey : ""; const resultRef = typeof req.body?.result_ref === "string" ? req.body.result_ref : ""; - const context = sessionBridge.getSessionContext(sessionId); + const context = await toolContextStore.read(sessionScopeKey); if (!context) { res.status(404).json({ message: "session context not found", - detail: sessionId, + detail: sessionScopeKey, }); return; } @@ -127,6 +138,7 @@ app.post("/internal/tools/fetch-result-ref", async (req, res) => { resultRef, { actorKey: context.actorKey, + clientSessionId: context.clientSessionId, projectId: context.projectId, }, { @@ -149,13 +161,14 @@ app.post("/internal/tools/store-render-ref", async (req, res) => { return; } - const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const sessionScopeKey = + typeof req.body?.sessionScopeKey === "string" ? req.body.sessionScopeKey : ""; const filePath = typeof req.body?.file_path === "string" ? req.body.file_path.trim() : ""; - const context = sessionBridge.getSessionContext(sessionId); + const context = await toolContextStore.read(sessionScopeKey); if (!context) { res.status(404).json({ message: "session context not found", - detail: sessionId, + detail: sessionScopeKey, }); return; } @@ -170,7 +183,7 @@ app.post("/internal/tools/store-render-ref", async (req, res) => { clientSessionId: context.clientSessionId, projectId: context.projectId, projectKey: context.projectKey, - sessionId, + sessionId: context.clientSessionId, source: "migration", traceId: context.traceId, }); @@ -198,13 +211,14 @@ app.post("/internal/tools/session-search", async (req, res) => { return; } - const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const sessionScopeKey = + typeof req.body?.sessionScopeKey === "string" ? req.body.sessionScopeKey : ""; const query = typeof req.body?.query === "string" ? req.body.query : ""; - const context = await toolContextStore.read(sessionId); + const context = await toolContextStore.read(sessionScopeKey); if (!context) { res.status(404).json({ - message: "tool session context not found", - detail: sessionId, + message: "session context not found", + detail: sessionScopeKey, }); return; } @@ -231,7 +245,9 @@ app.use( buildChatRouter( sessionBridge, opencodeRuntime, + conversationStore, memoryStore, + sessionHistoryStore, learningOrchestrator, resultReferenceResolver, ), @@ -239,6 +255,7 @@ app.use( const bootstrap = async () => { await Promise.all([ + conversationStore.initialize(), learningOrchestrator.initialize(), memoryStore.initialize(), resultReferenceStore.initialize(), diff --git a/src/session/registry.ts b/src/session/registry.ts deleted file mode 100644 index 24d9ac8..0000000 --- a/src/session/registry.ts +++ /dev/null @@ -1,80 +0,0 @@ -import crypto from "node:crypto"; - -export type SessionBinding = { - clientSessionId: string; - sessionId: string; - lastUsedAt: number; -}; - -export type SessionContext = { - clientSessionId: string; - accessToken?: string; - projectId?: string; - userId?: string; -}; - -export class SessionRegistry { - private readonly ttlMs: number; - private readonly bindings = new Map(); - - constructor(ttlSeconds: number) { - this.ttlMs = ttlSeconds * 1000; - } - - upsert(context: SessionContext, sessionId: string): SessionBinding { - const binding: SessionBinding = { - clientSessionId: context.clientSessionId, - sessionId, - lastUsedAt: Date.now(), - }; - this.bindings.set(this.makeKey(context), binding); - return binding; - } - - get(context: SessionContext): SessionBinding | null { - const key = this.makeKey(context); - const binding = this.bindings.get(key); - if (!binding) { - return null; - } - if (Date.now() - binding.lastUsedAt > this.ttlMs) { - this.bindings.delete(key); - return null; - } - binding.lastUsedAt = Date.now(); - return binding; - } - - count(): number { - this.evictExpired(); - return this.bindings.size; - } - - evictExpired(): string[] { - const expired: string[] = []; - const now = Date.now(); - for (const [key, binding] of this.bindings.entries()) { - if (now - binding.lastUsedAt > this.ttlMs) { - expired.push(binding.sessionId); - this.bindings.delete(key); - } - } - return expired; - } - - private makeKey(context: SessionContext): string { - // 会话隔离不能只看前端 session_id;同一浏览器会话切换用户或项目时必须映射到不同 opencode session。 - const digest = crypto - .createHash("sha256") - .update( - [ - context.clientSessionId, - context.userId?.trim() ?? "", - context.projectId ?? "", - ].join("|"), - ) - .digest("hex"); - - return digest; - } -} diff --git a/src/session/toolContextStore.ts b/src/session/toolContextStore.ts index b405a04..bc8fa5f 100644 --- a/src/session/toolContextStore.ts +++ b/src/session/toolContextStore.ts @@ -7,6 +7,7 @@ import { readJsonFile, removeFileIfExists, } from "../utils/fileStore.js"; +import { toConversationScopeKey } from "../utils/fileStore.js"; export type ToolSessionContext = { actorKey: string; @@ -16,6 +17,7 @@ export type ToolSessionContext = { projectId?: string; projectKey: string; sessionId: string; + sessionScopeKey: string; traceId: string; }; @@ -28,6 +30,9 @@ export class ToolSessionContextStore { async write(context: ToolSessionContext) { await atomicWriteJson(this.filePath(context.sessionId), context); + if (context.learningMode === "interactive" && context.sessionScopeKey) { + await atomicWriteJson(this.filePath(context.sessionScopeKey), context); + } } async read(sessionId: string) { @@ -42,3 +47,9 @@ export class ToolSessionContextStore { return join(this.baseDir, `${sessionId}.json`); } } + +export const buildToolSessionScopeKey = ( + actorKey: string, + projectKey: string, + clientSessionId: string, +) => toConversationScopeKey(actorKey, projectKey, clientSessionId); diff --git a/src/utils/fileStore.ts b/src/utils/fileStore.ts index 3f04c71..afb4800 100644 --- a/src/utils/fileStore.ts +++ b/src/utils/fileStore.ts @@ -149,6 +149,12 @@ export const toProjectKey = (projectId?: string) => toScopedKey("project", proje export const toStableId = (...parts: string[]) => createHash("sha256").update(parts.join("|")).digest("hex").slice(0, 24); +export const toConversationScopeKey = ( + actorKey: string, + projectKey: string, + sessionId: string, +) => `conversation-${toStableId(actorKey, projectKey, sessionId)}`; + export const slugify = (value: string) => value .toLowerCase() diff --git a/tests/conversations/store.test.ts b/tests/conversations/store.test.ts new file mode 100644 index 0000000..f7dc550 --- /dev/null +++ b/tests/conversations/store.test.ts @@ -0,0 +1,64 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { ConversationStore } from "../../src/conversations/store.js"; + +describe("ConversationStore", () => { + let tempDir: string; + let store: ConversationStore; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), "tjwater-conversation-")); + store = new ConversationStore(tempDir); + await store.initialize(); + }); + + afterEach(async () => { + await rm(tempDir, { force: true, recursive: true }); + }); + + it("issues backend-managed session ids when absent", async () => { + const { record, created } = await store.ensure({ + actorKey: "actor-1", + projectId: "project-1", + projectKey: "project-key-1", + userId: "user-1", + }); + + expect(created).toBe(true); + expect(record.sessionId).toStartWith("chat-"); + expect(record.ownerUserId).toBe("user-1"); + expect(record.status).toBe("active"); + }); + + it("touches metadata and preserves scoped ownership", async () => { + const { record } = await store.ensure({ + actorKey: "actor-2", + projectId: "project-2", + projectKey: "project-key-2", + sessionId: "existing-session", + userId: "user-2", + }); + + const touched = await store.touch(record, { + title: "新标题", + }); + + expect(touched.title).toBe("新标题"); + expect(touched.updatedAt >= record.updatedAt).toBe(true); + + const fetched = await store.get( + { + actorKey: "actor-2", + projectId: "project-2", + projectKey: "project-key-2", + userId: "user-2", + }, + "existing-session", + ); + expect(fetched?.sessionScopeKey).toBe(record.sessionScopeKey); + expect(fetched?.title).toBe("新标题"); + }); +}); diff --git a/tests/history/store.test.ts b/tests/history/store.test.ts new file mode 100644 index 0000000..4667765 --- /dev/null +++ b/tests/history/store.test.ts @@ -0,0 +1,138 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { SessionHistoryStore } from "../../src/history/store.js"; + +describe("SessionHistoryStore", () => { + let tempDir: string; + let store: SessionHistoryStore; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), "tjwater-history-")); + store = new SessionHistoryStore(tempDir); + await store.initialize(); + }); + + afterEach(async () => { + await rm(tempDir, { force: true, recursive: true }); + }); + + it("falls back to legacy runtime-session transcripts by client session id and migrates on append", async () => { + await writeFile( + join(tempDir, "actor-1__project-1__runtime-session-1.json"), + JSON.stringify( + { + actorKey: "actor-1", + clientSessionId: "thread-1", + projectKey: "project-1", + sessionId: "runtime-session-1", + turns: [ + { + id: "turn-1", + assistantMessage: "先检查泵站流量。", + timestamp: "2026-05-21T00:00:00.000Z", + toolCallCount: 1, + userMessage: "帮我看一下当前异常。", + }, + ], + updatedAt: "2026-05-21T00:00:00.000Z", + }, + null, + 2, + ), + "utf8", + ); + + const recentTurns = await store.getRecentTurns( + { + actorKey: "actor-1", + clientSessionId: "thread-1", + projectKey: "project-1", + sessionId: "thread-1", + }, + 5, + ); + + expect(recentTurns).toHaveLength(1); + expect(recentTurns[0]?.userMessage).toBe("帮我看一下当前异常。"); + + const transcript = await store.appendTurn( + { + actorKey: "actor-1", + clientSessionId: "thread-1", + projectKey: "project-1", + sessionId: "thread-1", + }, + { + assistantMessage: "已经定位到 3 条疑似异常支路。", + toolCallCount: 2, + userMessage: "继续分析这些支路。", + }, + ); + + expect(transcript.sessionId).toBe("thread-1"); + expect(transcript.turns).toHaveLength(2); + }); + + it("clones only the kept prefix when forking a thread", async () => { + await store.appendTurn( + { + actorKey: "actor-2", + clientSessionId: "thread-source", + projectKey: "project-2", + sessionId: "thread-source", + }, + { + assistantMessage: "第一轮回复", + toolCallCount: 0, + userMessage: "第一轮提问", + }, + ); + await store.appendTurn( + { + actorKey: "actor-2", + clientSessionId: "thread-source", + projectKey: "project-2", + sessionId: "thread-source", + }, + { + assistantMessage: "第二轮回复", + toolCallCount: 0, + userMessage: "第二轮提问", + }, + ); + + const cloned = await store.cloneThread( + { + actorKey: "actor-2", + clientSessionId: "thread-source", + projectKey: "project-2", + sessionId: "thread-source", + }, + { + actorKey: "actor-2", + clientSessionId: "thread-fork", + projectKey: "project-2", + sessionId: "thread-fork", + }, + 2, + ); + + expect(cloned.turns).toHaveLength(1); + expect(cloned.turns[0]?.userMessage).toBe("第一轮提问"); + + const forkRecentTurns = await store.getRecentTurns( + { + actorKey: "actor-2", + clientSessionId: "thread-fork", + projectKey: "project-2", + sessionId: "thread-fork", + }, + 5, + ); + expect(forkRecentTurns).toHaveLength(1); + expect(forkRecentTurns[0]?.assistantMessage).toBe("第一轮回复"); + }); +}); diff --git a/tests/session/toolContextStore.test.ts b/tests/session/toolContextStore.test.ts new file mode 100644 index 0000000..7ff6b3d --- /dev/null +++ b/tests/session/toolContextStore.test.ts @@ -0,0 +1,51 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { + buildToolSessionScopeKey, + ToolSessionContextStore, +} from "../../src/session/toolContextStore.js"; + +describe("ToolSessionContextStore", () => { + let tempDir: string; + let store: ToolSessionContextStore; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), "tjwater-tool-context-")); + store = new ToolSessionContextStore(tempDir); + await store.initialize(); + }); + + afterEach(async () => { + await rm(tempDir, { force: true, recursive: true }); + }); + + it("writes interactive aliases under scoped session keys", async () => { + const sessionScopeKey = buildToolSessionScopeKey( + "actor-1", + "project-1", + "chat-session-1", + ); + + await store.write({ + actorKey: "actor-1", + allowLearningWrite: true, + clientSessionId: "chat-session-1", + learningMode: "interactive", + projectId: "project-id-1", + projectKey: "project-1", + sessionId: "runtime-session-1", + sessionScopeKey, + traceId: "trace-1", + }); + + const runtimeContext = await store.read("runtime-session-1"); + const scopedContext = await store.read(sessionScopeKey); + + expect(runtimeContext?.clientSessionId).toBe("chat-session-1"); + expect(scopedContext?.sessionScopeKey).toBe(sessionScopeKey); + expect(scopedContext?.sessionId).toBe("runtime-session-1"); + }); +});