重构会话管理功能,由后端 opencode 发放 sessionId,后端做 scope

This commit is contained in:
2026-05-21 15:41:46 +08:00
parent 7e63d38cf5
commit 5d80961930
20 changed files with 816 additions and 390 deletions
+100 -218
View File
@@ -2,10 +2,25 @@ import { randomUUID } from "node:crypto";
import { logger } from "../logger.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type SessionBinding, type SessionContext, SessionRegistry } from "../session/registry.js";
import { ToolSessionContextStore } from "../session/toolContextStore.js";
import {
buildToolSessionScopeKey,
ToolSessionContextStore,
} from "../session/toolContextStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
export type SessionBinding = {
clientSessionId: string;
sessionId: string;
startedAt: number;
};
export type SessionContext = {
clientSessionId: string;
accessToken?: string;
projectId?: string;
userId?: string;
};
export type ChatRequestContext = SessionContext & {
actorKey: string;
projectKey: string;
@@ -13,15 +28,12 @@ export type ChatRequestContext = SessionContext & {
};
export class ChatSessionBridge {
// 这里额外保存 session -> 用户上下文,供工具桥在服务端代发真实后端请求时复用
private readonly sessionContexts = new Map<string, ChatRequestContext>();
private readonly sessionTitles = new Map<string, string>();
// runtime session 仅在单次请求生命周期内有效;线程连续性由 clientSessionId 对应的持久状态承担
private readonly activeRuntimeSessions = new Map<string, string>();
private readonly activeSensitiveContexts = new Map<string, ChatRequestContext>();
private readonly toolContextStore = new ToolSessionContextStore();
constructor(
private readonly registry: SessionRegistry,
private readonly runtime: OpencodeRuntimeAdapter,
) {}
constructor(private readonly runtime: OpencodeRuntimeAdapter) {}
async resolve(context: {
clientSessionId?: string;
@@ -34,61 +46,22 @@ export class ChatSessionBridge {
requestContext: ChatRequestContext;
created: boolean;
}> {
const requestContext: ChatRequestContext = {
clientSessionId:
context.clientSessionId?.trim() || `agent-${randomUUID().slice(0, 12)}`,
accessToken: context.accessToken,
actorKey: toActorKey(context.userId),
projectId: context.projectId,
projectKey: toProjectKey(context.projectId),
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
userId: context.userId?.trim(),
};
this.cleanupExpired();
const current = this.registry.get(requestContext);
if (current) {
this.sessionContexts.set(current.sessionId, requestContext);
await this.toolContextStore.write({
actorKey: requestContext.actorKey,
allowLearningWrite: true,
clientSessionId: requestContext.clientSessionId,
learningMode: "interactive",
projectId: requestContext.projectId,
projectKey: requestContext.projectKey,
sessionId: current.sessionId,
traceId: requestContext.traceId,
});
try {
// 只有 opencode 侧 session 仍存在时,才复用本地映射。
await this.runtime.getSession(current.sessionId);
await this.runtime.waitForSessionIdle(current.sessionId).catch((error) => {
logger.warn(
{
clientSessionId: requestContext.clientSessionId,
sessionId: current.sessionId,
err: error,
},
"failed while waiting for reused opencode session to become idle",
);
});
return { binding: current, requestContext, created: false };
} catch (error) {
logger.warn(
{
clientSessionId: requestContext.clientSessionId,
sessionId: current.sessionId,
err: error,
},
"existing opencode session lookup failed, creating a new session",
);
}
}
const requestContext = this.buildRequestContext(context);
await this.abortActiveRuntime(requestContext.clientSessionId);
const session = await this.runtime.createSession(requestContext.clientSessionId);
const binding = this.registry.upsert(requestContext, session.id);
this.sessionContexts.set(binding.sessionId, requestContext);
const binding: SessionBinding = {
clientSessionId: requestContext.clientSessionId,
sessionId: session.id,
startedAt: Date.now(),
};
const sessionScopeKey = buildToolSessionScopeKey(
requestContext.actorKey,
requestContext.projectKey,
requestContext.clientSessionId,
);
this.activeRuntimeSessions.set(requestContext.clientSessionId, session.id);
this.activeSensitiveContexts.set(sessionScopeKey, requestContext);
await this.toolContextStore.write({
actorKey: requestContext.actorKey,
allowLearningWrite: true,
@@ -96,105 +69,70 @@ export class ChatSessionBridge {
learningMode: "interactive",
projectId: requestContext.projectId,
projectKey: requestContext.projectKey,
sessionId: binding.sessionId,
sessionId: session.id,
sessionScopeKey,
traceId: requestContext.traceId,
});
return { binding, requestContext, created: true };
}
count(): number {
return this.registry.count();
return this.activeRuntimeSessions.size;
}
getSessionContext(sessionId: string) {
return this.sessionContexts.get(sessionId) ?? null;
createClientSessionId() {
return `agent-${randomUUID().slice(0, 12)}`;
}
getSessionTitle(sessionId: string) {
return this.sessionTitles.get(sessionId);
}
setSessionTitle(sessionId: string, title: string) {
const normalized = title.trim();
if (!normalized) {
return;
}
this.sessionTitles.set(sessionId, normalized);
}
cloneSessionTitle(sourceSessionId: string, targetSessionId: string) {
const existingTitle = this.sessionTitles.get(sourceSessionId);
if (!existingTitle) {
return;
}
this.sessionTitles.set(targetSessionId, existingTitle);
getActiveSensitiveContext(sessionScopeKey: string) {
return this.activeSensitiveContexts.get(sessionScopeKey) ?? null;
}
async abort(context: {
clientSessionId?: string;
accessToken?: string;
projectId?: string;
traceId?: string;
userId?: string;
}): Promise<SessionBinding | null> {
const clientSessionId = context.clientSessionId?.trim();
if (!clientSessionId) {
return null;
}
const requestContext: ChatRequestContext = {
clientSessionId,
accessToken: context.accessToken,
actorKey: toActorKey(context.userId),
projectId: context.projectId,
projectKey: toProjectKey(context.projectId),
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
userId: context.userId?.trim(),
};
this.cleanupExpired();
const binding = this.registry.get(requestContext);
if (!binding) {
const sessionId = this.activeRuntimeSessions.get(clientSessionId);
if (!sessionId) {
return null;
}
this.sessionContexts.set(binding.sessionId, requestContext);
await this.toolContextStore.write({
actorKey: requestContext.actorKey,
allowLearningWrite: true,
clientSessionId: requestContext.clientSessionId,
learningMode: "interactive",
projectId: requestContext.projectId,
projectKey: requestContext.projectKey,
sessionId: binding.sessionId,
traceId: requestContext.traceId,
});
await this.runtime.abortSession(binding.sessionId);
await this.runtime.waitForSessionIdle(binding.sessionId).catch((error) => {
logger.warn(
{ clientSessionId, sessionId: binding.sessionId, err: error },
"failed while waiting for aborted opencode session to become idle",
);
});
return binding;
await this.abortActiveRuntime(clientSessionId);
return {
clientSessionId,
sessionId,
startedAt: Date.now(),
};
}
async fork(context: {
async releaseRuntimeSession(clientSessionId: string, sessionId: string) {
const activeSessionId = this.activeRuntimeSessions.get(clientSessionId);
if (activeSessionId === sessionId) {
this.activeRuntimeSessions.delete(clientSessionId);
}
this.activeSensitiveContexts.delete(findScopeKey(this.activeSensitiveContexts, clientSessionId));
await this.toolContextStore.remove(sessionId).catch((error) => {
logger.debug({ sessionId, err: error }, "failed to cleanup runtime tool context");
});
await this.runtime.abortSession(sessionId).catch((error) => {
logger.debug({ sessionId, err: error }, "failed to cleanup runtime session");
});
}
private buildRequestContext(context: {
clientSessionId?: string;
accessToken?: string;
projectId?: string;
traceId?: string;
keepMessageCount: number;
userId?: string;
}): Promise<{
binding: SessionBinding;
requestContext: ChatRequestContext;
created: boolean;
}> {
const currentClientSessionId = context.clientSessionId?.trim();
const nextRequestContext: ChatRequestContext = {
clientSessionId: `agent-${randomUUID().slice(0, 12)}`,
}): ChatRequestContext {
return {
clientSessionId: context.clientSessionId?.trim() || this.createClientSessionId(),
accessToken: context.accessToken,
actorKey: toActorKey(context.userId),
projectId: context.projectId,
@@ -202,95 +140,39 @@ export class ChatSessionBridge {
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
userId: context.userId?.trim(),
};
this.cleanupExpired();
if (!currentClientSessionId || context.keepMessageCount <= 0) {
const session = await this.runtime.createSession(nextRequestContext.clientSessionId);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
await this.toolContextStore.write({
actorKey: nextRequestContext.actorKey,
allowLearningWrite: true,
clientSessionId: nextRequestContext.clientSessionId,
learningMode: "interactive",
projectId: nextRequestContext.projectId,
projectKey: nextRequestContext.projectKey,
sessionId: binding.sessionId,
traceId: nextRequestContext.traceId,
});
return { binding, requestContext: nextRequestContext, created: true };
}
const currentContext: ChatRequestContext = {
clientSessionId: currentClientSessionId,
accessToken: context.accessToken,
actorKey: toActorKey(context.userId),
projectId: context.projectId,
projectKey: toProjectKey(context.projectId),
traceId: nextRequestContext.traceId,
userId: context.userId?.trim(),
};
const current = this.registry.get(currentContext);
if (!current) {
const session = await this.runtime.createSession(nextRequestContext.clientSessionId);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
await this.toolContextStore.write({
actorKey: nextRequestContext.actorKey,
allowLearningWrite: true,
clientSessionId: nextRequestContext.clientSessionId,
learningMode: "interactive",
projectId: nextRequestContext.projectId,
projectKey: nextRequestContext.projectKey,
sessionId: binding.sessionId,
traceId: nextRequestContext.traceId,
});
return { binding, requestContext: nextRequestContext, created: true };
}
await this.runtime.getSession(current.sessionId);
const messages = await this.runtime.messages(
current.sessionId,
Math.max(100, context.keepMessageCount + 20),
);
const chatMessages = messages.filter(
(message) => message.info.role === "user" || message.info.role === "assistant",
);
const keepMessage = chatMessages[context.keepMessageCount - 1];
if (!keepMessage) {
throw new Error(`fork keep point not found for message count ${context.keepMessageCount}`);
}
const session = await this.runtime.forkSession(current.sessionId, keepMessage.info.id);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
await this.toolContextStore.write({
actorKey: nextRequestContext.actorKey,
allowLearningWrite: true,
clientSessionId: nextRequestContext.clientSessionId,
learningMode: "interactive",
projectId: nextRequestContext.projectId,
projectKey: nextRequestContext.projectKey,
sessionId: binding.sessionId,
traceId: nextRequestContext.traceId,
});
this.cloneSessionTitle(current.sessionId, binding.sessionId);
return { binding, requestContext: nextRequestContext, created: true };
}
cleanupExpired(): void {
const expiredSessionIds = this.registry.evictExpired();
for (const sessionId of expiredSessionIds) {
this.sessionContexts.delete(sessionId);
this.sessionTitles.delete(sessionId);
void this.toolContextStore.remove(sessionId);
// 这里用 abort 做轻量清理;即使失败,也不阻断本地过期回收。
void this.runtime.abortSession(sessionId).catch((error) => {
logger.debug({ sessionId, err: error }, "ignoring failed abort for expired session");
});
private async abortActiveRuntime(clientSessionId: string) {
const activeSessionId = this.activeRuntimeSessions.get(clientSessionId);
if (!activeSessionId) {
return;
}
this.activeRuntimeSessions.delete(clientSessionId);
this.activeSensitiveContexts.delete(findScopeKey(this.activeSensitiveContexts, clientSessionId));
await this.toolContextStore.remove(activeSessionId).catch(() => undefined);
await this.runtime.abortSession(activeSessionId).catch((error) => {
logger.warn(
{ clientSessionId, sessionId: activeSessionId, err: error },
"failed to abort previous active runtime session",
);
});
await this.runtime.waitForSessionIdle(activeSessionId).catch((error) => {
logger.warn(
{ clientSessionId, sessionId: activeSessionId, err: error },
"failed while waiting for previous runtime session to become idle",
);
});
}
}
const findScopeKey = (
contexts: Map<string, ChatRequestContext>,
clientSessionId: string,
) => {
for (const [scopeKey, context] of contexts.entries()) {
if (context.clientSessionId === clientSessionId) {
return scopeKey;
}
}
return clientSessionId;
};