refactor: unify agent session persistence

This commit is contained in:
2026-06-04 15:02:27 +08:00
parent 04ded0ceb0
commit 0ecb2babf3
22 changed files with 542 additions and 497 deletions
+16 -12
View File
@@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
import { logger } from "../logger.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { ToolSessionContextStore } from "../session/toolContextStore.js";
import { SessionRuntimeContextStore } from "../sessions/runtimeContextStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
export type SessionBinding = {
@@ -26,12 +26,11 @@ export type ChatRequestContext = SessionContext & {
export class ChatSessionBridge {
private readonly abortControllers = new Map<string, AbortController>();
private readonly toolContextStore = new ToolSessionContextStore();
private readonly sessionRuntimeContextStore = new SessionRuntimeContextStore();
constructor(private readonly runtime: OpencodeRuntimeAdapter) {}
async resolve(context: {
clientSessionId?: string;
sessionId?: string;
accessToken?: string;
projectId?: string;
@@ -42,15 +41,19 @@ export class ChatSessionBridge {
requestContext: ChatRequestContext;
created: boolean;
}> {
const requestContext = this.buildRequestContext(context);
let requestContext = this.buildRequestContext(context);
const existingSessionId = context.sessionId?.trim();
await this.abortActiveRuntime(requestContext.clientSessionId, existingSessionId);
let sessionId = existingSessionId;
let created = false;
if (!sessionId) {
const session = await this.runtime.createSession(requestContext.clientSessionId);
const session = await this.runtime.createSession();
sessionId = session.id;
requestContext = {
...requestContext,
clientSessionId: sessionId,
};
created = true;
}
const binding: SessionBinding = {
@@ -58,7 +61,7 @@ export class ChatSessionBridge {
sessionId,
startedAt: Date.now(),
};
await this.toolContextStore.write({
await this.sessionRuntimeContextStore.write({
accessToken: requestContext.accessToken,
actorKey: requestContext.actorKey,
allowLearningWrite: true,
@@ -107,7 +110,7 @@ export class ChatSessionBridge {
};
}
async deleteConversationSession(context: {
async deleteSession(context: {
clientSessionId: string;
sessionId: string;
}) {
@@ -121,29 +124,30 @@ export class ChatSessionBridge {
await this.runtime.abortSession(sessionId).catch((error) => {
logger.warn(
{ clientSessionId, sessionId, err: error },
"failed to abort conversation runtime session",
"failed to abort runtime session",
);
});
await this.runtime.waitForSessionIdle(sessionId).catch((error) => {
logger.warn(
{ clientSessionId, sessionId, err: error },
"failed while waiting for conversation runtime session to become idle",
"failed while waiting for runtime session to become idle",
);
});
await this.toolContextStore.remove(sessionId).catch((error) => {
await this.sessionRuntimeContextStore.remove(sessionId).catch((error) => {
logger.debug({ sessionId, err: error }, "failed to cleanup runtime tool context");
});
}
private buildRequestContext(context: {
clientSessionId?: string;
sessionId?: string;
accessToken?: string;
projectId?: string;
traceId?: string;
userId?: string;
}): ChatRequestContext {
const sessionId = context.sessionId?.trim();
return {
clientSessionId: context.clientSessionId?.trim() || this.createClientSessionId(),
clientSessionId: sessionId || this.createClientSessionId(),
accessToken: context.accessToken,
actorKey: toActorKey(context.userId),
projectId: context.projectId,
+13 -11
View File
@@ -43,10 +43,12 @@ const envSchema = z
OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000),
// 默认使用的 opencode 模型标识。
OPENCODE_MODEL: z.string().default("deepseek/deepseek-v4-pro"),
// opencode skills 树目录;会在运行时解析为绝对路径,避免工具 cwd 偏移。
OPENCODE_SKILLS_ROOT_DIR: z.string().default("./.opencode/skills"),
// client 模式下,目标 opencode server 的基础地址。
OPENCODE_CLIENT_BASE_URL: z.string().url().optional(),
// 提供给本地 opencode tools 读取的会话上下文目录。
SESSION_CONTEXT_STORAGE_DIR: z.string().default("./data/session-contexts"),
SESSION_RUNTIME_CONTEXT_STORAGE_DIR: z.string().default("./data/session-runtime-contexts"),
// tjwater-cli 可执行文件路径。
TJWATER_CLI_PATH: z.string().default("./cli/tjwater-cli"),
// TJWater 后端 API 的基础地址。
@@ -59,18 +61,18 @@ const envSchema = z
MAX_PREVIEW_SAMPLE_ITEMS: z.coerce.number().int().positive().default(3),
// memory 持久化存储目录。
MEMORY_STORAGE_DIR: z.string().default("./data/memory"),
// 持久化文件写入前保留历史版本的目录。
PERSISTENCE_HISTORY_DIR: z.string().default("./data/history"),
// 持久化文件写入前保留备份版本的目录。
PERSISTENCE_BACKUP_DIR: z.string().default("./data/backup"),
// 注入到 prompt 的 memory 快照最大字符数,避免上下文过大。
MEMORY_MAX_PROMPT_CHARS: z.coerce.number().int().positive().default(1800),
// session transcript 持久化目录。
SESSION_HISTORY_STORAGE_DIR: z.string().default("./data/session-history"),
// conversation metadata 持久化目录。
CONVERSATION_STORAGE_DIR: z.string().default("./data/conversations"),
// conversation UI state 持久化目录。
CONVERSATION_STATE_STORAGE_DIR: z.string().default("./data/conversation-states"),
SESSION_TRANSCRIPT_STORAGE_DIR: z.string().default("./data/session-transcripts"),
// session metadata 持久化目录。
SESSION_METADATA_STORAGE_DIR: z.string().default("./data/session-metadata"),
// session UI state 持久化目录。
SESSION_UI_STATE_STORAGE_DIR: z.string().default("./data/session-ui-states"),
// 每个会话最多保留多少轮 transcript,超过后裁剪旧记录。
SESSION_HISTORY_MAX_TURNS_PER_SESSION: z.coerce
SESSION_TRANSCRIPT_MAX_TURNS_PER_SESSION: z.coerce
.number()
.int()
.positive()
@@ -79,8 +81,8 @@ const envSchema = z
SESSION_SEARCH_MAX_RESULTS: z.coerce.number().int().positive().default(8),
// session_search 查询文本最大长度。
SESSION_SEARCH_MAX_QUERY_CHARS: z.coerce.number().int().positive().default(240),
// learning review 会话状态目录。
LEARNING_STATE_STORAGE_DIR: z.string().default("./data/learning-state"),
// 当前 session 的 learning 进度状态目录。
SESSION_LEARNING_STATE_STORAGE_DIR: z.string().default("./data/session-learning-state"),
// learning audit 日志路径。
LEARNING_AUDIT_LOG_PATH: z
.string()
-55
View File
@@ -1,55 +0,0 @@
import { join } from "node:path";
import { config } from "../config.js";
import {
atomicWriteJson,
ensureDirectory,
readJsonFile,
removeFileIfExists,
toConversationScopeKey,
} from "../utils/fileStore.js";
export type ConversationStateRecord = {
sessionId: string;
isTitleManuallyEdited?: boolean;
messages: unknown[];
branchGroups: unknown[];
};
type ConversationStateContext = {
actorKey: string;
projectKey: string;
sessionId: string;
};
export class ConversationStateStore {
constructor(private readonly baseDir = config.CONVERSATION_STATE_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async read(context: ConversationStateContext) {
return await readJsonFile<ConversationStateRecord>(this.filePath(context));
}
async write(context: ConversationStateContext, state: ConversationStateRecord) {
await atomicWriteJson(this.filePath(context), state);
return state;
}
async remove(context: ConversationStateContext) {
await removeFileIfExists(this.filePath(context));
}
private filePath(context: ConversationStateContext) {
return join(
this.baseDir,
`${toConversationScopeKey(
context.actorKey,
context.projectKey,
context.sessionId,
)}.json`,
);
}
}
-161
View File
@@ -1,161 +0,0 @@
import { randomUUID } from "node:crypto";
import { join } from "node:path";
import { config } from "../config.js";
import {
atomicWriteJson,
ensureDirectory,
listJsonFiles,
readJsonFile,
removeFileIfExists,
} from "../utils/fileStore.js";
import { toConversationScopeKey } from "../utils/fileStore.js";
export type ConversationStatus = "active" | "archived";
export type ConversationRecord = {
sessionId: string;
actorKey: string;
ownerUserId?: string;
projectId?: string;
projectKey: string;
opencodeSessionId?: string;
parentSessionId?: string;
createdAt: string;
updatedAt: string;
status: ConversationStatus;
title?: string;
};
type ConversationContext = {
actorKey: string;
userId?: string;
projectId?: string;
projectKey: string;
};
type EnsureConversationInput = ConversationContext & {
sessionId?: string;
parentSessionId?: string;
};
export class ConversationStore {
constructor(private readonly baseDir = config.CONVERSATION_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async ensure(input: EnsureConversationInput) {
const sessionId = normalizeSessionId(input.sessionId) ?? createConversationSessionId();
const existing = await readJsonFile<ConversationRecord>(
this.filePath(input.actorKey, input.projectKey, sessionId),
);
if (existing) {
return { created: false, record: existing };
}
const now = new Date().toISOString();
const record: ConversationRecord = {
sessionId,
actorKey: input.actorKey,
ownerUserId: input.userId?.trim(),
projectId: input.projectId,
projectKey: input.projectKey,
parentSessionId: normalizeSessionId(input.parentSessionId),
createdAt: now,
updatedAt: now,
status: "active",
};
await atomicWriteJson(
this.filePath(record.actorKey, record.projectKey, record.sessionId),
record,
);
return { created: true, record };
}
async get(context: ConversationContext, sessionId: string) {
const normalizedSessionId = normalizeSessionId(sessionId);
if (!normalizedSessionId) {
return null;
}
return await readJsonFile<ConversationRecord>(
this.filePath(context.actorKey, context.projectKey, normalizedSessionId),
);
}
async touch(
record: ConversationRecord,
updates: Partial<Pick<ConversationRecord, "title" | "status" | "opencodeSessionId">> = {},
) {
const next: ConversationRecord = {
...record,
...normalizeConversationUpdates(updates),
updatedAt: new Date().toISOString(),
};
await atomicWriteJson(
this.filePath(record.actorKey, record.projectKey, record.sessionId),
next,
);
return next;
}
async list(context: ConversationContext) {
const files = await listJsonFiles(this.baseDir);
const records = await Promise.all(
files.map((file) => readJsonFile<ConversationRecord>(file)),
);
return records
.filter((record): record is ConversationRecord => Boolean(record))
.filter(
(record) =>
record.actorKey === context.actorKey &&
record.projectKey === context.projectKey,
)
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
}
async remove(record: ConversationRecord) {
await removeFileIfExists(
this.filePath(record.actorKey, record.projectKey, record.sessionId),
);
}
private filePath(actorKey: string, projectKey: string, sessionId: string) {
return join(
this.baseDir,
`${toConversationScopeKey(actorKey, projectKey, sessionId)}.json`,
);
}
}
export const createConversationSessionId = () => `chat-${randomUUID().slice(0, 16)}`;
const normalizeSessionId = (value?: string) => {
const normalized = value?.trim();
return normalized ? normalized.slice(0, 128) : undefined;
};
const normalizeConversationUpdates = (
updates: Partial<Pick<ConversationRecord, "title" | "status" | "opencodeSessionId">>,
) => {
const normalized: Partial<
Pick<ConversationRecord, "title" | "status" | "opencodeSessionId">
> = {};
if (updates.status === "active" || updates.status === "archived") {
normalized.status = updates.status;
}
if (typeof updates.title === "string") {
const trimmed = updates.title.trim();
if (trimmed) {
normalized.title = trimmed.slice(0, 120);
}
}
if (typeof updates.opencodeSessionId === "string") {
const trimmed = updates.opencodeSessionId.trim();
if (trimmed) {
normalized.opencodeSessionId = trimmed.slice(0, 256);
}
}
return normalized;
};
+19 -22
View File
@@ -3,13 +3,13 @@ import { z } from "zod";
import { writeLearningAuditLog } from "../audit/learningAudit.js";
import { type ChatRequestContext } from "../chat/sessionBridge.js";
import { config } from "../config.js";
import { type SessionTurnRecord, SessionHistoryStore } from "../history/store.js";
import { type SessionTurnRecord, SessionTranscriptStore } from "../sessions/transcriptStore.js";
import { logger } from "../logger.js";
import { LearningStateStore } from "./stateStore.js";
import { SessionLearningStateStore } from "./sessionStateStore.js";
import { MemoryStore, type MemoryScope } from "../memory/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { SkillStore } from "../skills/store.js";
import { ToolSessionContextStore } from "../session/toolContextStore.js";
import { SessionRuntimeContextStore } from "../sessions/runtimeContextStore.js";
import {
sanitizePersistentDocument,
sanitizePersistentLine,
@@ -68,25 +68,25 @@ type TurnReviewInput = {
export class LearningOrchestrator {
private readonly activeReviews = new Set<string>();
private readonly learningStateStore = new LearningStateStore();
private readonly sessionLearningStateStore = new SessionLearningStateStore();
private readonly skillStore = new SkillStore();
private readonly toolContextStore = new ToolSessionContextStore();
private readonly sessionRuntimeContextStore = new SessionRuntimeContextStore();
constructor(
private readonly runtime: OpencodeRuntimeAdapter,
private readonly memoryStore: MemoryStore,
private readonly historyStore: SessionHistoryStore,
private readonly transcriptStore: SessionTranscriptStore,
) {}
async initialize() {
await Promise.all([
this.learningStateStore.initialize(),
this.toolContextStore.initialize(),
this.sessionLearningStateStore.initialize(),
this.sessionRuntimeContextStore.initialize(),
]);
}
async onTurnCompleted(input: TurnReviewInput) {
const transcript = await this.historyStore.appendTurn(
const transcript = await this.transcriptStore.appendTurn(
{
actorKey: input.requestContext.actorKey,
clientSessionId: input.requestContext.clientSessionId,
@@ -105,13 +105,12 @@ export class LearningOrchestrator {
}
this.activeReviews.add(input.sessionId);
try {
const state = await this.learningStateStore.read(input.sessionId);
const state = await this.sessionLearningStateStore.read(input.sessionId);
const turnsSinceGate = Math.max(0, turnCount - state.lastGatedTurn);
if (turnsSinceGate < config.LEARNING_GATE_TURN_COOLDOWN || state.pendingReview) {
if (turnsSinceGate < config.LEARNING_GATE_TURN_COOLDOWN) {
this.activeReviews.delete(input.sessionId);
return;
}
await this.learningStateStore.markPending(input.sessionId, true);
} catch (error) {
this.activeReviews.delete(input.sessionId);
throw error;
@@ -142,7 +141,7 @@ export class LearningOrchestrator {
`learning-gate-${input.requestContext.clientSessionId}`,
);
gateSessionId = gateSession.id;
await this.toolContextStore.write({
await this.sessionRuntimeContextStore.write({
actorKey: input.requestContext.actorKey,
allowLearningWrite: false,
clientSessionId: `gate-${input.requestContext.clientSessionId}`,
@@ -164,7 +163,7 @@ export class LearningOrchestrator {
const gateText = collectTextContent(assistantMessage?.parts ?? []);
const gate = parseGateResult(gateText);
if (!gate) {
await this.learningStateStore.completeGate(input.sessionId, turnCount);
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
await writeLearningAuditLog({
action: "review-gate",
detail: "gate result was not valid JSON",
@@ -189,7 +188,7 @@ export class LearningOrchestrator {
traceId: input.requestContext.traceId,
});
if (!shouldPromote) {
await this.learningStateStore.completeGate(input.sessionId, turnCount);
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
return;
}
await this.runReview({
@@ -199,7 +198,6 @@ export class LearningOrchestrator {
turnCount,
});
} catch (error) {
await this.learningStateStore.markPending(input.sessionId, false);
logger.warn({ err: error, sessionId: input.sessionId }, "learning gate failed");
await writeLearningAuditLog({
action: "review-gate",
@@ -211,7 +209,7 @@ export class LearningOrchestrator {
});
} finally {
if (gateSessionId) {
await this.toolContextStore.remove(gateSessionId).catch(() => undefined);
await this.sessionRuntimeContextStore.remove(gateSessionId).catch(() => undefined);
await this.runtime.abortSession(gateSessionId).catch(() => undefined);
}
}
@@ -231,7 +229,7 @@ export class LearningOrchestrator {
const reviewSession = await this.runtime.createSession(
`learning-review-${input.requestContext.clientSessionId}`,
);
await this.toolContextStore.write({
await this.sessionRuntimeContextStore.write({
actorKey: input.requestContext.actorKey,
allowLearningWrite: false,
clientSessionId: `review-${input.requestContext.clientSessionId}`,
@@ -254,7 +252,7 @@ export class LearningOrchestrator {
const reviewText = collectTextContent(assistantMessage?.parts ?? []);
const parsed = parseReviewResult(reviewText);
if (!parsed) {
await this.learningStateStore.completeGate(input.sessionId, turnCount);
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
await writeLearningAuditLog({
action: "review-parse",
detail: "review result was not valid JSON",
@@ -266,9 +264,8 @@ export class LearningOrchestrator {
return;
}
await this.applyReviewResult(input, parsed, turnCount);
await this.learningStateStore.completeReview(input.sessionId, turnCount);
await this.sessionLearningStateStore.completeReview(input.sessionId, turnCount);
} catch (error) {
await this.learningStateStore.markPending(input.sessionId, false);
logger.warn({ err: error, sessionId: input.sessionId }, "learning review failed");
await writeLearningAuditLog({
action: "review-run",
@@ -279,7 +276,7 @@ export class LearningOrchestrator {
traceId: input.requestContext.traceId,
});
} finally {
await this.toolContextStore.remove(reviewSession.id).catch(() => undefined);
await this.sessionRuntimeContextStore.remove(reviewSession.id).catch(() => undefined);
await this.runtime.abortSession(reviewSession.id).catch(() => undefined);
}
}
@@ -7,57 +7,51 @@ import {
readJsonFile,
} from "../utils/fileStore.js";
export type LearningSessionState = {
export type SessionLearningState = {
lastGatedTurn: number;
lastReviewedTurn: number;
pendingReview: boolean;
sessionId: string;
updatedAt: string;
};
export class LearningStateStore {
constructor(private readonly baseDir = config.LEARNING_STATE_STORAGE_DIR) {}
export class SessionLearningStateStore {
constructor(private readonly baseDir = config.SESSION_LEARNING_STATE_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async read(sessionId: string): Promise<LearningSessionState> {
const existing = await readJsonFile<LearningSessionState>(this.filePath(sessionId));
async read(sessionId: string): Promise<SessionLearningState> {
const existing = await readJsonFile<SessionLearningState>(this.filePath(sessionId));
if (existing) {
return existing;
return {
lastGatedTurn: existing.lastGatedTurn,
lastReviewedTurn: existing.lastReviewedTurn,
sessionId: existing.sessionId,
updatedAt: existing.updatedAt,
};
}
return {
lastGatedTurn: 0,
lastReviewedTurn: 0,
pendingReview: false,
sessionId,
updatedAt: new Date(0).toISOString(),
};
}
async write(state: LearningSessionState) {
async write(state: SessionLearningState) {
await atomicWriteJson(this.filePath(state.sessionId), {
...state,
updatedAt: new Date().toISOString(),
});
}
async markPending(sessionId: string, pendingReview: boolean) {
const current = await this.read(sessionId);
await this.write({
...current,
pendingReview,
});
}
async completeReview(sessionId: string, reviewedTurnCount: number) {
const current = await this.read(sessionId);
await this.write({
...current,
lastGatedTurn: Math.max(current.lastGatedTurn, reviewedTurnCount),
lastReviewedTurn: reviewedTurnCount,
pendingReview: false,
});
}
@@ -66,7 +60,6 @@ export class LearningStateStore {
await this.write({
...current,
lastGatedTurn: gatedTurnCount,
pendingReview: false,
});
}
+6 -6
View File
@@ -42,15 +42,15 @@ export class MemoryStore {
constructor(
private readonly baseDir = config.MEMORY_STORAGE_DIR,
private readonly historyDir = join(config.PERSISTENCE_HISTORY_DIR, "memory"),
private readonly backupDir = join(config.PERSISTENCE_BACKUP_DIR, "memory"),
) {}
async initialize() {
await ensureDirectory(this.baseDir);
await ensureDirectory(join(this.baseDir, "users"));
await ensureDirectory(join(this.baseDir, "workspaces"));
// 历史备份与正式数据分目录存放,便于排查和手工恢复。
await ensureDirectory(this.historyDir);
// 备份与正式数据分目录存放,便于排查和手工恢复。
await ensureDirectory(this.backupDir);
}
async upsert(scope: MemoryScope, key: string, draft: MemoryDraft) {
@@ -76,7 +76,7 @@ export class MemoryStore {
this.filePath(scope, key),
renderMemoryMarkdown(scope, entries),
{
historyDir: this.historyDir,
backupDir: this.backupDir,
rootDir: this.baseDir,
},
);
@@ -113,7 +113,7 @@ export class MemoryStore {
this.filePath(scope, key),
renderMemoryMarkdown(scope, entries),
{
historyDir: this.historyDir,
backupDir: this.backupDir,
rootDir: this.baseDir,
},
);
@@ -132,7 +132,7 @@ export class MemoryStore {
this.filePath(scope, key),
renderMemoryMarkdown(scope, next),
{
historyDir: this.historyDir,
backupDir: this.backupDir,
rootDir: this.baseDir,
},
);
+106 -106
View File
@@ -2,16 +2,16 @@ import { Router } from "express";
import { z } from "zod";
import { type LearningOrchestrator } from "../learning/orchestrator.js";
import { type SessionHistoryStore } from "../history/store.js";
import { type SessionTranscriptStore } from "../sessions/transcriptStore.js";
import { logger } from "../logger.js";
import { MemoryStore } from "../memory/store.js";
import { type ConversationStateStore } from "../conversations/stateStore.js";
import { type ConversationStore } from "../conversations/store.js";
import { type SessionUiStateStore } from "../sessions/uiStateStore.js";
import { type SessionMetadataStore } from "../sessions/metadataStore.js";
import { type ResultReferenceResolver } from "../results/resolver.js";
import { RESULT_REFERENCE_KIND } from "../results/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
import { type ConversationRecord } from "../conversations/store.js";
import { type SessionRecord } from "../sessions/metadataStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
import {
buildPromptWithLearningContext,
@@ -45,26 +45,24 @@ const forkPayloadSchema = z.object({
keep_message_count: z.coerce.number().int().min(0),
});
const conversationStateSchema = z.object({
const sessionStateSchema = z.object({
title: z.string().max(120).optional(),
is_title_manually_edited: z.boolean().optional(),
messages: z.array(z.unknown()).default([]),
branch_groups: z.array(z.unknown()).default([]),
});
const toConversationStateContext = (conversation: ConversationRecord) => ({
actorKey: conversation.actorKey,
projectKey: conversation.projectKey,
sessionId: conversation.sessionId,
const toSessionUiStateContext = (sessionRecord: SessionRecord) => ({
sessionId: sessionRecord.sessionId,
});
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
conversationStore: ConversationStore,
conversationStateStore: ConversationStateStore,
sessionMetadataStore: SessionMetadataStore,
sessionUiStateStore: SessionUiStateStore,
memoryStore: MemoryStore,
sessionHistoryStore: SessionHistoryStore,
sessionTranscriptStore: SessionTranscriptStore,
learningOrchestrator: LearningOrchestrator,
resultReferenceResolver: ResultReferenceResolver,
) => {
@@ -84,13 +82,15 @@ export const buildChatRouter = (
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const requestedSessionId = parsed.data.session_id?.trim();
const sessionId = requestedSessionId || (await runtime.createSession()).id;
const { record, created } = await conversationStore.ensure({
const { record, created } = await sessionMetadataStore.ensure({
actorKey,
parentSessionId: parsed.data.parent_session_id,
projectId,
projectKey,
sessionId: parsed.data.session_id,
sessionId,
userId,
});
@@ -109,7 +109,7 @@ export const buildChatRouter = (
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const records = await conversationStore.list({
const records = await sessionMetadataStore.list({
actorKey,
projectId,
projectKey,
@@ -138,7 +138,7 @@ export const buildChatRouter = (
return;
}
const conversation = await conversationStore.get(
const sessionRecord = await sessionMetadataStore.get(
{
actorKey,
projectId,
@@ -147,31 +147,31 @@ export const buildChatRouter = (
},
sessionId,
);
if (!conversation) {
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const state = await conversationStateStore.read(
toConversationStateContext(conversation),
const state = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord),
);
res.json({
id: conversation.sessionId,
title: conversation.title ?? "新对话",
id: sessionRecord.sessionId,
title: sessionRecord.title ?? "新对话",
is_title_manually_edited: state?.isTitleManuallyEdited ?? false,
created_at: conversation.createdAt,
updated_at: conversation.updatedAt,
status: conversation.status,
session_id: conversation.sessionId,
created_at: sessionRecord.createdAt,
updated_at: sessionRecord.updatedAt,
status: sessionRecord.status,
session_id: sessionRecord.sessionId,
messages: state?.messages ?? [],
branch_groups: state?.branchGroups ?? [],
parent_session_id: conversation.parentSessionId,
parent_session_id: sessionRecord.parentSessionId,
});
});
chatRouter.put("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const parsed = conversationStateSchema.safeParse(req.body ?? {});
const parsed = sessionStateSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
@@ -189,17 +189,17 @@ export const buildChatRouter = (
return;
}
const { record } = await conversationStore.ensure({
const { record } = await sessionMetadataStore.ensure({
actorKey,
projectId,
projectKey,
sessionId,
userId,
});
const nextRecord = await conversationStore.touch(record, {
const nextRecord = await sessionMetadataStore.touch(record, {
...(parsed.data.title ? { title: parsed.data.title } : {}),
});
await conversationStateStore.write(toConversationStateContext(nextRecord), {
await sessionUiStateStore.write(toSessionUiStateContext(nextRecord), {
sessionId: nextRecord.sessionId,
isTitleManuallyEdited: parsed.data.is_title_manually_edited,
messages: parsed.data.messages,
@@ -231,21 +231,21 @@ export const buildChatRouter = (
res.status(400).json({ message: "session_id and title are required" });
return;
}
const conversation = await conversationStore.get(
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!conversation) {
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const nextConversation = await conversationStore.touch(conversation, { title });
const state = await conversationStateStore.read(
toConversationStateContext(nextConversation),
const nextSessionRecord = await sessionMetadataStore.touch(sessionRecord, { title });
const state = await sessionUiStateStore.read(
toSessionUiStateContext(nextSessionRecord),
);
if (state) {
await conversationStateStore.write(
toConversationStateContext(nextConversation),
await sessionUiStateStore.write(
toSessionUiStateContext(nextSessionRecord),
{
...state,
isTitleManuallyEdited:
@@ -254,9 +254,9 @@ export const buildChatRouter = (
);
}
res.json({
id: nextConversation.sessionId,
title: nextConversation.title,
updated_at: nextConversation.updatedAt,
id: nextSessionRecord.sessionId,
title: nextSessionRecord.title,
updated_at: nextSessionRecord.updatedAt,
});
});
@@ -270,22 +270,20 @@ export const buildChatRouter = (
res.status(400).json({ message: "session_id is required" });
return;
}
const conversation = await conversationStore.get(
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!conversation) {
if (!sessionRecord) {
res.status(204).end();
return;
}
await conversationStateStore.remove(toConversationStateContext(conversation));
if (conversation.opencodeSessionId) {
await sessionBridge.deleteConversationSession({
clientSessionId: conversation.sessionId,
sessionId: conversation.opencodeSessionId,
});
}
await conversationStore.remove(conversation);
await sessionUiStateStore.remove(toSessionUiStateContext(sessionRecord));
await sessionBridge.deleteSession({
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
});
await sessionMetadataStore.remove(sessionRecord);
res.status(204).end();
});
@@ -347,14 +345,14 @@ export const buildChatRouter = (
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const conversation = await conversationStore.get(
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
const binding = conversation?.opencodeSessionId
const binding = sessionRecord
? await sessionBridge.abort({
clientSessionId: conversation.sessionId,
sessionId: conversation.opencodeSessionId,
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
})
: null;
@@ -401,54 +399,56 @@ export const buildChatRouter = (
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sourceClientSessionId = parsed.data.session_id?.trim();
const sourceConversation = sourceClientSessionId
? await conversationStore.get(
const sourceSessionId = parsed.data.session_id?.trim();
const sourceSessionRecord = sourceSessionId
? await sessionMetadataStore.get(
{
actorKey,
projectId,
projectKey,
userId,
},
sourceClientSessionId,
sourceSessionId,
)
: null;
const { record: targetConversation } = await conversationStore.ensure({
const forkSession = await runtime.createSession();
const { record: targetSessionRecord } = await sessionMetadataStore.ensure({
actorKey,
parentSessionId: sourceClientSessionId,
parentSessionId: sourceSessionId,
projectId,
projectKey,
sessionId: forkSession.id,
userId,
});
const nextClientSessionId = targetConversation.sessionId;
const nextSessionId = targetSessionRecord.sessionId;
if (sourceClientSessionId && parsed.data.keep_message_count > 0) {
await sessionHistoryStore.cloneThread(
if (sourceSessionId && parsed.data.keep_message_count > 0) {
await sessionTranscriptStore.cloneThread(
{
actorKey,
clientSessionId: sourceClientSessionId,
clientSessionId: sourceSessionId,
projectKey,
sessionId: sourceClientSessionId,
sessionId: sourceSessionId,
},
{
actorKey,
clientSessionId: nextClientSessionId,
clientSessionId: nextSessionId,
projectKey,
sessionId: nextClientSessionId,
sessionId: nextSessionId,
},
parsed.data.keep_message_count,
);
if (sourceConversation?.title) {
await conversationStore.touch(targetConversation, {
title: sourceConversation.title,
if (sourceSessionRecord?.title) {
await sessionMetadataStore.touch(targetSessionRecord, {
title: sourceSessionRecord.title,
});
}
}
logger.info(
{
sourceClientSessionId: parsed.data.session_id,
clientSessionId: nextClientSessionId,
sourceSessionId: parsed.data.session_id,
sessionId: nextSessionId,
traceId,
projectId,
keepMessageCount: parsed.data.keep_message_count,
@@ -457,7 +457,7 @@ export const buildChatRouter = (
);
res.status(200).json({
session_id: nextClientSessionId,
session_id: nextSessionId,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
@@ -489,47 +489,47 @@ export const buildChatRouter = (
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const { record: conversation, created: conversationCreated } =
await conversationStore.ensure({
actorKey,
projectId,
projectKey,
sessionId: parsed.data.session_id,
userId,
});
const activeConversation = await conversationStore.touch(conversation);
const hadExistingRuntimeSession = Boolean(activeConversation.opencodeSessionId);
const requestedSessionId = parsed.data.session_id?.trim();
const existingSessionRecord = requestedSessionId
? await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
requestedSessionId,
)
: null;
const hadExistingRuntimeSession = Boolean(existingSessionRecord);
const { binding, requestContext, created } = await sessionBridge.resolve({
clientSessionId: activeConversation.sessionId,
sessionId: activeConversation.opencodeSessionId,
sessionId: requestedSessionId,
accessToken,
projectId,
traceId,
userId,
});
const conversationWithRuntime =
created && binding.sessionId !== activeConversation.opencodeSessionId
? await conversationStore.touch(activeConversation, {
opencodeSessionId: binding.sessionId,
})
: activeConversation;
const { record: ensuredSessionRecord, created: sessionCreated } =
await sessionMetadataStore.ensure({
actorKey,
projectId,
projectKey,
sessionId: binding.sessionId,
userId,
});
const activeSessionRecord = await sessionMetadataStore.touch(ensuredSessionRecord);
const historyContext = {
actorKey: requestContext.actorKey,
clientSessionId: requestContext.clientSessionId,
projectKey: requestContext.projectKey,
sessionId: requestContext.clientSessionId,
};
const recentTurns = await sessionHistoryStore.getRecentTurns(historyContext, 8);
const initialConversationState = await conversationStateStore.read(
toConversationStateContext(conversationWithRuntime),
const recentTurns = await sessionTranscriptStore.getRecentTurns(historyContext, 8);
const initialSessionState = await sessionUiStateStore.read(
toSessionUiStateContext(activeSessionRecord),
);
logger.info(
{
clientSessionId: requestContext.clientSessionId,
sessionId: binding.sessionId,
created: created || conversationCreated,
created: created || sessionCreated,
model: parsed.data.model,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
@@ -565,14 +565,14 @@ export const buildChatRouter = (
requestContext.projectKey,
{
recentTurns,
persistedMessages: initialConversationState?.messages,
persistedMessages: initialSessionState?.messages,
message: parsed.data.message,
restoreConversation: !hadExistingRuntimeSession,
},
);
const streamResult = await streamPromptResponse({
runtime,
opencodeSessionId: binding.sessionId,
sessionId: binding.sessionId,
clientSessionId,
message: preparedMessage,
model: parsed.data.model,
@@ -593,20 +593,20 @@ export const buildChatRouter = (
.reverse()
.find((message) => message.info.role === "assistant");
const assistantText = collectTextContent(assistantMessage?.parts ?? []);
const latestConversation =
(await conversationStore.get(
const latestSessionRecord =
(await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
conversationWithRuntime.sessionId,
)) ?? conversationWithRuntime;
const latestConversationState = await conversationStateStore.read(
toConversationStateContext(latestConversation),
activeSessionRecord.sessionId,
)) ?? activeSessionRecord;
const latestSessionState = await sessionUiStateStore.read(
toSessionUiStateContext(latestSessionRecord),
);
const existingSessionTitle = latestConversation.title;
const existingSessionTitle = latestSessionRecord.title;
let sessionTitle = existingSessionTitle;
const shouldGenerateTitle = shouldGenerateSessionTitle({
recentTurnCount: recentTurns.length,
isTitleManuallyEdited:
latestConversationState?.isTitleManuallyEdited ?? false,
latestSessionState?.isTitleManuallyEdited ?? false,
});
if (shouldGenerateTitle) {
sessionTitle = await generateSessionTitle(runtime, {
@@ -616,7 +616,7 @@ export const buildChatRouter = (
fallbackTitle: existingSessionTitle,
});
}
const nextConversation = await conversationStore.touch(latestConversation, {
const nextSessionRecord = await sessionMetadataStore.touch(latestSessionRecord, {
...(sessionTitle && sessionTitle !== existingSessionTitle
? { title: sessionTitle }
: {}),
+1 -1
View File
@@ -1,5 +1,5 @@
import { logger } from "../logger.js";
import { type SessionTurnRecord } from "../history/store.js";
import { type SessionTurnRecord } from "../sessions/transcriptStore.js";
import { MemoryStore } from "../memory/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
+15 -15
View File
@@ -13,7 +13,7 @@ export type SupportedModel = (typeof supportedModels)[number];
type StreamPromptOptions = {
runtime: OpencodeRuntimeAdapter;
opencodeSessionId: string;
sessionId: string;
clientSessionId: string;
message: string;
model?: SupportedModel;
@@ -168,11 +168,11 @@ export const collectTextContent = (parts: Part[]) =>
const emitFallbackMessage = async (
runtime: OpencodeRuntimeAdapter,
opencodeSessionId: string,
sessionId: string,
clientSessionId: string,
write: (event: string, data: Record<string, unknown>) => void,
) => {
const messages = await runtime.messages(opencodeSessionId);
const messages = await runtime.messages(sessionId);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
@@ -293,7 +293,7 @@ const getToolProgressTitle = (tool: string, status: string) => {
export const streamPromptResponse = async ({
runtime,
opencodeSessionId,
sessionId,
clientSessionId,
message,
model,
@@ -332,7 +332,7 @@ export const streamPromptResponse = async ({
let aborted = signal?.aborted ?? false;
let failed = false;
const debugContext = {
opencodeSessionId,
sessionId,
clientSessionId,
traceId,
projectId,
@@ -406,7 +406,7 @@ export const streamPromptResponse = async ({
});
const promptPromise = runtime
.prompt(opencodeSessionId, message, toRuntimeModel(model))
.prompt(sessionId, message, toRuntimeModel(model))
.then(() => {
promptSettled = true;
logDevelopmentDebug("runtime.prompt resolved", {
@@ -471,7 +471,7 @@ export const streamPromptResponse = async ({
}
const event = next.result.value as OpencodeEvent;
if (!isSessionEvent(event, opencodeSessionId)) {
if (!isSessionEvent(event, sessionId)) {
continue;
}
@@ -541,7 +541,7 @@ export const streamPromptResponse = async ({
});
void writeLlmRequestAuditLog({
kind: "skill",
sessionId: opencodeSessionId,
sessionId: sessionId,
clientSessionId,
traceId,
projectId,
@@ -691,7 +691,7 @@ export const streamPromptResponse = async ({
logger.warn(
{
tool: part.tool,
sessionId: opencodeSessionId,
sessionId: sessionId,
clientSessionId,
},
"llm tool request missing reason",
@@ -699,7 +699,7 @@ export const streamPromptResponse = async ({
}
void writeLlmRequestAuditLog({
kind: "tool",
sessionId: opencodeSessionId,
sessionId: sessionId,
clientSessionId,
traceId,
projectId,
@@ -781,12 +781,12 @@ export const streamPromptResponse = async ({
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
await runtime.abortSession(opencodeSessionId).catch((error) => {
logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session");
await runtime.abortSession(sessionId).catch((error) => {
logger.warn({ sessionId: sessionId, err: error }, "failed to abort opencode session");
});
await runtime.waitForSessionIdle(opencodeSessionId).catch((error) => {
await runtime.waitForSessionIdle(sessionId).catch((error) => {
logger.warn(
{ sessionId: opencodeSessionId, err: error },
{ sessionId: sessionId, err: error },
"failed while waiting for aborted opencode session to become idle",
);
});
@@ -803,7 +803,7 @@ export const streamPromptResponse = async ({
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write);
await emitFallbackMessage(runtime, sessionId, clientSessionId, write);
}
emitProgress({
id: "request-received",
+22 -22
View File
@@ -3,11 +3,11 @@ import { spawn } from "node:child_process";
import cors from "cors";
import express from "express";
import { SessionHistoryStore } from "./history/store.js";
import { SessionTranscriptStore } from "./sessions/transcriptStore.js";
import { ChatSessionBridge } from "./chat/sessionBridge.js";
import { config } from "./config.js";
import { ConversationStateStore } from "./conversations/stateStore.js";
import { ConversationStore } from "./conversations/store.js";
import { SessionUiStateStore } from "./sessions/uiStateStore.js";
import { SessionMetadataStore } from "./sessions/metadataStore.js";
import { logger } from "./logger.js";
import { LearningOrchestrator } from "./learning/orchestrator.js";
import { MemoryStore } from "./memory/store.js";
@@ -15,20 +15,20 @@ import { ResultReferenceResolver } from "./results/resolver.js";
import { ResultReferenceStore } from "./results/store.js";
import { buildChatRouter } from "./routes/chat.js";
import { opencodeRuntime } from "./runtime/opencode.js";
import { ToolSessionContextStore } from "./session/toolContextStore.js";
import { SessionRuntimeContextStore } from "./sessions/runtimeContextStore.js";
import { DynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js";
const app = express();
const sessionBridge = new ChatSessionBridge(opencodeRuntime);
const conversationStore = new ConversationStore();
const conversationStateStore = new ConversationStateStore();
const sessionMetadataStore = new SessionMetadataStore();
const sessionUiStateStore = new SessionUiStateStore();
const memoryStore = new MemoryStore();
const sessionHistoryStore = new SessionHistoryStore();
const toolContextStore = new ToolSessionContextStore();
const sessionTranscriptStore = new SessionTranscriptStore();
const sessionRuntimeContextStore = new SessionRuntimeContextStore();
const learningOrchestrator = new LearningOrchestrator(
opencodeRuntime,
memoryStore,
sessionHistoryStore,
sessionTranscriptStore,
);
const resultReferenceStore = new ResultReferenceStore();
const resultReferenceResolver = new ResultReferenceResolver(resultReferenceStore);
@@ -68,7 +68,7 @@ app.post("/internal/tools/dynamic-http-call", async (req, res) => {
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const context = sessionId ? await toolContextStore.read(sessionId) : null;
const context = sessionId ? await sessionRuntimeContextStore.read(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
@@ -114,7 +114,7 @@ app.post("/internal/tools/tjwater-cli-call", async (req, res) => {
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const context = sessionId ? await toolContextStore.read(sessionId) : null;
const context = sessionId ? await sessionRuntimeContextStore.read(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
@@ -218,7 +218,7 @@ app.post("/internal/tools/fetch-result-ref", async (req, res) => {
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const resultRef = typeof req.body?.result_ref === "string" ? req.body.result_ref : "";
const context = sessionId ? await toolContextStore.read(sessionId) : null;
const context = sessionId ? await sessionRuntimeContextStore.read(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
@@ -261,7 +261,7 @@ app.post("/internal/tools/store-render-ref", async (req, res) => {
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const filePath = typeof req.body?.file_path === "string" ? req.body.file_path.trim() : "";
const context = sessionId ? await toolContextStore.read(sessionId) : null;
const context = sessionId ? await sessionRuntimeContextStore.read(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
@@ -311,7 +311,7 @@ app.post("/internal/tools/session-search", async (req, res) => {
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const query = typeof req.body?.query === "string" ? req.body.query : "";
const context = sessionId ? await toolContextStore.read(sessionId) : null;
const context = sessionId ? await sessionRuntimeContextStore.read(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
@@ -323,7 +323,7 @@ app.post("/internal/tools/session-search", async (req, res) => {
res.status(400).json({ message: "query is required" });
return;
}
const hits = await sessionHistoryStore.search(
const hits = await sessionTranscriptStore.search(
{
actorKey: context.actorKey,
projectKey: context.projectKey,
@@ -342,10 +342,10 @@ app.use(
buildChatRouter(
sessionBridge,
opencodeRuntime,
conversationStore,
conversationStateStore,
sessionMetadataStore,
sessionUiStateStore,
memoryStore,
sessionHistoryStore,
sessionTranscriptStore,
learningOrchestrator,
resultReferenceResolver,
),
@@ -353,13 +353,13 @@ app.use(
const bootstrap = async () => {
await Promise.all([
conversationStore.initialize(),
conversationStateStore.initialize(),
sessionMetadataStore.initialize(),
sessionUiStateStore.initialize(),
learningOrchestrator.initialize(),
memoryStore.initialize(),
resultReferenceStore.initialize(),
sessionHistoryStore.initialize(),
toolContextStore.initialize(),
sessionTranscriptStore.initialize(),
sessionRuntimeContextStore.initialize(),
]);
resultReferenceStore.startCleanupLoop();
};
+149
View File
@@ -0,0 +1,149 @@
import { join } from "node:path";
import { config } from "../config.js";
import {
atomicWriteJson,
ensureDirectory,
listJsonFiles,
readJsonFile,
removeFileIfExists,
slugify,
} from "../utils/fileStore.js";
export type SessionStatus = "active" | "archived";
export type SessionRecord = {
sessionId: string;
actorKey: string;
ownerUserId?: string;
projectId?: string;
projectKey: string;
parentSessionId?: string;
createdAt: string;
updatedAt: string;
status: SessionStatus;
title?: string;
};
type SessionMetadataContext = {
actorKey: string;
userId?: string;
projectId?: string;
projectKey: string;
};
type EnsureSessionMetadataInput = SessionMetadataContext & {
sessionId: string;
parentSessionId?: string;
};
export class SessionMetadataStore {
constructor(private readonly baseDir = config.SESSION_METADATA_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async ensure(input: EnsureSessionMetadataInput) {
const sessionId = normalizeSessionId(input.sessionId);
if (!sessionId) {
throw new Error("sessionId is required");
}
const existing = await readJsonFile<SessionRecord>(
this.filePath(sessionId),
);
if (existing) {
return { created: false, record: existing };
}
const now = new Date().toISOString();
const record: SessionRecord = {
sessionId,
actorKey: input.actorKey,
ownerUserId: input.userId?.trim(),
projectId: input.projectId,
projectKey: input.projectKey,
parentSessionId: normalizeSessionId(input.parentSessionId),
createdAt: now,
updatedAt: now,
status: "active",
};
await atomicWriteJson(
this.filePath(record.sessionId),
record,
);
return { created: true, record };
}
async get(context: SessionMetadataContext, sessionId: string) {
const normalizedSessionId = normalizeSessionId(sessionId);
if (!normalizedSessionId) {
return null;
}
return await readJsonFile<SessionRecord>(
this.filePath(normalizedSessionId),
);
}
async touch(
record: SessionRecord,
updates: Partial<Pick<SessionRecord, "title" | "status">> = {},
) {
const next: SessionRecord = {
...record,
...normalizeSessionUpdates(updates),
updatedAt: new Date().toISOString(),
};
await atomicWriteJson(
this.filePath(record.sessionId),
next,
);
return next;
}
async list(context: SessionMetadataContext) {
const files = await listJsonFiles(this.baseDir);
const records = await Promise.all(
files.map((file) => readJsonFile<SessionRecord>(file)),
);
return records
.filter((record): record is SessionRecord => Boolean(record))
.filter(
(record) =>
record.actorKey === context.actorKey &&
record.projectKey === context.projectKey,
)
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
}
async remove(record: SessionRecord) {
await removeFileIfExists(
this.filePath(record.sessionId),
);
}
private filePath(sessionId: string) {
return join(this.baseDir, `${slugify(sessionId)}.json`);
}
}
const normalizeSessionId = (value?: string) => {
const normalized = value?.trim();
return normalized ? normalized.slice(0, 128) : undefined;
};
const normalizeSessionUpdates = (
updates: Partial<Pick<SessionRecord, "title" | "status">>,
) => {
const normalized: Partial<Pick<SessionRecord, "title" | "status">> = {};
if (updates.status === "active" || updates.status === "archived") {
normalized.status = updates.status;
}
if (typeof updates.title === "string") {
const trimmed = updates.title.trim();
if (trimmed) {
normalized.title = trimmed.slice(0, 120);
}
}
return normalized;
};
@@ -8,7 +8,7 @@ import {
removeFileIfExists,
} from "../utils/fileStore.js";
export type ToolSessionContext = {
export type SessionRuntimeContext = {
accessToken?: string;
actorKey: string;
allowLearningWrite?: boolean;
@@ -20,19 +20,19 @@ export type ToolSessionContext = {
traceId: string;
};
export class ToolSessionContextStore {
constructor(private readonly baseDir = config.SESSION_CONTEXT_STORAGE_DIR) {}
export class SessionRuntimeContextStore {
constructor(private readonly baseDir = config.SESSION_RUNTIME_CONTEXT_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async write(context: ToolSessionContext) {
async write(context: SessionRuntimeContext) {
await atomicWriteJson(this.filePath(context.sessionId), context);
}
async read(sessionId: string) {
return await readJsonFile<ToolSessionContext>(this.filePath(sessionId));
return await readJsonFile<SessionRuntimeContext>(this.filePath(sessionId));
}
async remove(sessionId: string) {
@@ -36,24 +36,24 @@ export type SessionSearchHit = {
turnId: string;
};
type SessionHistoryContext = {
type SessionTranscriptContext = {
actorKey: string;
clientSessionId?: string;
projectKey: string;
sessionId: string;
};
export class SessionHistoryStore {
export class SessionTranscriptStore {
private readonly writeQueues = new Map<string, Promise<void>>();
constructor(private readonly baseDir = config.SESSION_HISTORY_STORAGE_DIR) {}
constructor(private readonly baseDir = config.SESSION_TRANSCRIPT_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async appendTurn(
context: SessionHistoryContext,
context: SessionTranscriptContext,
turn: {
assistantMessage: string;
toolCallCount: number;
@@ -87,9 +87,9 @@ export class SessionHistoryStore {
transcript.clientSessionId = context.clientSessionId ?? transcript.clientSessionId;
transcript.sessionId = context.sessionId;
transcript.turns.push(record);
if (transcript.turns.length > config.SESSION_HISTORY_MAX_TURNS_PER_SESSION) {
if (transcript.turns.length > config.SESSION_TRANSCRIPT_MAX_TURNS_PER_SESSION) {
transcript.turns = transcript.turns.slice(
transcript.turns.length - config.SESSION_HISTORY_MAX_TURNS_PER_SESSION,
transcript.turns.length - config.SESSION_TRANSCRIPT_MAX_TURNS_PER_SESSION,
);
}
transcript.updatedAt = timestamp;
@@ -99,7 +99,7 @@ export class SessionHistoryStore {
}
async getRecentTurns(
context: SessionHistoryContext,
context: SessionTranscriptContext,
limit: number,
): Promise<SessionTurnRecord[]> {
const transcript = await this.readTranscript(context);
@@ -110,8 +110,8 @@ export class SessionHistoryStore {
}
async cloneThread(
sourceContext: SessionHistoryContext,
targetContext: SessionHistoryContext,
sourceContext: SessionTranscriptContext,
targetContext: SessionTranscriptContext,
keepMessageCount: number,
) {
const sourceTranscript = await this.readTranscript(sourceContext);
@@ -129,7 +129,7 @@ export class SessionHistoryStore {
}
async search(
context: Pick<SessionHistoryContext, "actorKey" | "projectKey">,
context: Pick<SessionTranscriptContext, "actorKey" | "projectKey">,
query: string,
maxResults = config.SESSION_SEARCH_MAX_RESULTS,
): Promise<SessionSearchHit[]> {
@@ -175,7 +175,7 @@ export class SessionHistoryStore {
return hits.sort((a, b) => b.score - a.score).slice(0, Math.max(1, maxResults));
}
private async readTranscript(context: SessionHistoryContext) {
private async readTranscript(context: SessionTranscriptContext) {
const direct = await readJsonFile<SessionTranscriptRecord>(this.filePath(context));
if (direct) {
return direct;
@@ -210,7 +210,7 @@ export class SessionHistoryStore {
return matches.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt))[0] ?? null;
}
private filePath(context: SessionHistoryContext) {
private filePath(context: SessionTranscriptContext) {
return join(
this.baseDir,
`${context.actorKey}__${context.projectKey}__${context.sessionId}.json`,
+46
View File
@@ -0,0 +1,46 @@
import { join } from "node:path";
import { config } from "../config.js";
import {
atomicWriteJson,
ensureDirectory,
readJsonFile,
removeFileIfExists,
slugify,
} from "../utils/fileStore.js";
export type SessionUiStateRecord = {
sessionId: string;
isTitleManuallyEdited?: boolean;
messages: unknown[];
branchGroups: unknown[];
};
type SessionUiStateContext = {
sessionId: string;
};
export class SessionUiStateStore {
constructor(private readonly baseDir = config.SESSION_UI_STATE_STORAGE_DIR) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
async read(context: SessionUiStateContext) {
return await readJsonFile<SessionUiStateRecord>(this.filePath(context));
}
async write(context: SessionUiStateContext, state: SessionUiStateRecord) {
await atomicWriteJson(this.filePath(context), state);
return state;
}
async remove(context: SessionUiStateContext) {
await removeFileIfExists(this.filePath(context));
}
private filePath(context: SessionUiStateContext) {
return join(this.baseDir, `${slugify(context.sessionId)}.json`);
}
}
+30 -19
View File
@@ -1,4 +1,5 @@
import { dirname, join, posix } from "node:path";
import { dirname, isAbsolute, join, posix, resolve } from "node:path";
import { fileURLToPath } from "node:url";
import { config } from "../config.js";
import {
@@ -17,8 +18,13 @@ import {
} from "../utils/persistencePolicy.js";
const LEARNED_PATTERNS_MARKER = "## Learned Patterns";
const SKILLS_ROOT_DIR = ".opencode/skills";
const SKILLS_HISTORY_DIR = join(config.PERSISTENCE_HISTORY_DIR, "skills");
const PROJECT_ROOT_DIR = resolve(dirname(fileURLToPath(import.meta.url)), "../..");
const resolveProjectPath = (path: string) =>
isAbsolute(path) ? path : resolve(PROJECT_ROOT_DIR, path);
const DEFAULT_SKILLS_ROOT_DIR = resolveProjectPath(config.OPENCODE_SKILLS_ROOT_DIR);
const DEFAULT_SKILLS_BACKUP_DIR = resolveProjectPath(
join(config.PERSISTENCE_BACKUP_DIR, "skills"),
);
export type SkillPatternRecord = {
id: string;
@@ -28,6 +34,11 @@ export type SkillPatternRecord = {
export class SkillStore {
private writeQueue: Promise<void> = Promise.resolve();
constructor(
private readonly rootDir = DEFAULT_SKILLS_ROOT_DIR,
private readonly backupDir = DEFAULT_SKILLS_BACKUP_DIR,
) {}
async list(skillPath: string) {
const normalizedSkillPath = normalizeSkillPath(skillPath);
if (!normalizedSkillPath) {
@@ -70,10 +81,10 @@ export class SkillStore {
`${LEARNED_PATTERNS_MARKER}\n- [${record.id}] ${record.content}`,
)
: `${current.trimEnd()}\n\n${LEARNED_PATTERNS_MARKER}\n- [${record.id}] ${record.content}\n`;
await ensureDirectory(join(SKILLS_ROOT_DIR, normalizedSkillPath));
await ensureDirectory(join(this.rootDir, normalizedSkillPath));
await atomicWriteFileWithHistory(target, next, {
historyDir: SKILLS_HISTORY_DIR,
rootDir: SKILLS_ROOT_DIR,
backupDir: this.backupDir,
rootDir: this.rootDir,
});
return { changed: true, detail: "skill file updated", target };
});
@@ -97,8 +108,8 @@ export class SkillStore {
}
const next = rewriteLearnedPatterns(current, remaining);
await atomicWriteFileWithHistory(target, next, {
historyDir: SKILLS_HISTORY_DIR,
rootDir: SKILLS_ROOT_DIR,
backupDir: this.backupDir,
rootDir: this.rootDir,
});
return { changed: true, detail: "pattern removed", target };
});
@@ -118,11 +129,11 @@ export class SkillStore {
return { changed: false, detail: "reference content rejected by persistence policy", target: "" };
}
return this.serializeWrite(async () => {
const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedReferencePath);
const target = join(this.rootDir, normalizedSkillPath, normalizedReferencePath);
await ensureDirectory(dirname(target));
await atomicWriteFileWithHistory(target, `${sanitizedContent}\n`, {
historyDir: SKILLS_HISTORY_DIR,
rootDir: SKILLS_ROOT_DIR,
backupDir: this.backupDir,
rootDir: this.rootDir,
});
return { changed: true, detail: "reference written", target };
});
@@ -138,7 +149,7 @@ export class SkillStore {
return { changed: false, detail: "invalid reference file_path", target: "" };
}
return this.serializeWrite(async () => {
const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedReferencePath);
const target = join(this.rootDir, normalizedSkillPath, normalizedReferencePath);
const previous = await readTextFile(target);
if (previous === null) {
return { changed: false, detail: "reference not found", target };
@@ -162,11 +173,11 @@ export class SkillStore {
return { changed: false, detail: "script content rejected by persistence policy", target: "" };
}
return this.serializeWrite(async () => {
const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedScriptPath);
const target = join(this.rootDir, normalizedSkillPath, normalizedScriptPath);
await ensureDirectory(dirname(target));
await atomicWriteFileWithHistory(target, sanitizedContent, {
historyDir: SKILLS_HISTORY_DIR,
rootDir: SKILLS_ROOT_DIR,
backupDir: this.backupDir,
rootDir: this.rootDir,
});
return { changed: true, detail: "script written", target };
});
@@ -182,7 +193,7 @@ export class SkillStore {
return { changed: false, detail: "invalid script file_path", target: "" };
}
return this.serializeWrite(async () => {
const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedScriptPath);
const target = join(this.rootDir, normalizedSkillPath, normalizedScriptPath);
const previous = await readTextFile(target);
if (previous === null) {
return { changed: false, detail: "script not found", target };
@@ -193,19 +204,19 @@ export class SkillStore {
}
private async listReferenceFiles(skillPath: string) {
const referenceDir = join(SKILLS_ROOT_DIR, skillPath, "references");
const referenceDir = join(this.rootDir, skillPath, "references");
const files = await listFiles(referenceDir);
return files.map((file) => file.slice(referenceDir.length + 1));
}
private async listScriptFiles(skillPath: string) {
const scriptDir = join(SKILLS_ROOT_DIR, skillPath, "scripts");
const scriptDir = join(this.rootDir, skillPath, "scripts");
const files = await listFiles(scriptDir);
return files.map((file) => file.slice(scriptDir.length + 1));
}
private skillFilePath(skillPath: string) {
return join(SKILLS_ROOT_DIR, skillPath, "SKILL.md");
return join(this.rootDir, skillPath, "SKILL.md");
}
private async serializeWrite<T>(task: () => Promise<T>) {
+5 -11
View File
@@ -20,7 +20,7 @@ export const atomicWriteFile = async (path: string, content: string) => {
type HistoricalWriteOptions = {
afterWrite?: () => Promise<void> | void;
historyDir: string;
backupDir: string;
rootDir: string;
};
@@ -36,8 +36,8 @@ export const atomicWriteFileWithHistory = async (
let backupPath: string | null = null;
if (previous !== null) {
// 仅在覆盖已有文件时保留历史版本,避免为首次创建产生空备份。
backupPath = buildHistoryBackupPath(path, options);
// 仅在覆盖已有文件时保留备份版本,避免为首次创建产生空备份。
backupPath = buildBackupPath(path, options);
await atomicWriteFile(backupPath, previous);
}
@@ -149,12 +149,6 @@ export const toProjectKey = (projectId?: string) => toScopedKey("project", proje
export const toStableId = (...parts: string[]) =>
createHash("sha256").update(parts.join("|")).digest("hex").slice(0, 24);
export const toConversationScopeKey = (
actorKey: string,
projectKey: string,
sessionId: string,
) => `conversation-${toStableId(actorKey, projectKey, sessionId)}`;
export const slugify = (value: string) =>
value
.toLowerCase()
@@ -162,11 +156,11 @@ export const slugify = (value: string) =>
.replace(/^-+|-+$/g, "")
.slice(0, 64) || "entry";
const buildHistoryBackupPath = (path: string, options: HistoricalWriteOptions) => {
const buildBackupPath = (path: string, options: HistoricalWriteOptions) => {
const relativePath = relative(options.rootDir, path);
const scopedPath =
relativePath && !relativePath.startsWith("..") ? relativePath : basename(path);
// 备份目录尽量复用原始相对路径,便于按业务目录回看历史。
const backupName = `${basename(path)}.${Date.now().toString(36)}.bak`;
return join(options.historyDir, dirname(scopedPath), backupName);
return join(options.backupDir, dirname(scopedPath), backupName);
};
+1 -1
View File
@@ -5,7 +5,7 @@ import {
generateSessionTitle,
shouldGenerateSessionTitle,
} from "../../src/routes/chatSession.js";
import { type SessionTurnRecord } from "../../src/history/store.js";
import { type SessionTurnRecord } from "../../src/sessions/transcriptStore.js";
import { type MemoryStore } from "../../src/memory/store.js";
import { type OpencodeRuntimeAdapter } from "../../src/runtime/opencode.js";
@@ -3,15 +3,15 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { ConversationStore } from "../../src/conversations/store.js";
import { SessionMetadataStore } from "../../src/sessions/metadataStore.js";
describe("ConversationStore", () => {
describe("SessionMetadataStore", () => {
let tempDir: string;
let store: ConversationStore;
let store: SessionMetadataStore;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), "tjwater-conversation-"));
store = new ConversationStore(tempDir);
tempDir = await mkdtemp(join(tmpdir(), "tjwater-session-"));
store = new SessionMetadataStore(tempDir);
await store.initialize();
});
@@ -19,16 +19,17 @@ describe("ConversationStore", () => {
await rm(tempDir, { force: true, recursive: true });
});
it("issues backend-managed session ids when absent", async () => {
it("persists the provided opencode session id", async () => {
const { record, created } = await store.ensure({
actorKey: "actor-1",
projectId: "project-1",
projectKey: "project-key-1",
sessionId: "opencode-session-1",
userId: "user-1",
});
expect(created).toBe(true);
expect(record.sessionId).toStartWith("chat-");
expect(record.sessionId).toBe("opencode-session-1");
expect(record.ownerUserId).toBe("user-1");
expect(record.status).toBe("active");
});
@@ -44,11 +45,9 @@ describe("ConversationStore", () => {
const touched = await store.touch(record, {
title: "新标题",
opencodeSessionId: "opencode-session-1",
});
expect(touched.title).toBe("新标题");
expect(touched.opencodeSessionId).toBe("opencode-session-1");
expect(touched.updatedAt >= record.updatedAt).toBe(true);
const fetched = await store.get(
@@ -61,6 +60,5 @@ describe("ConversationStore", () => {
"existing-session",
);
expect(fetched?.title).toBe("新标题");
expect(fetched?.opencodeSessionId).toBe("opencode-session-1");
});
});
@@ -3,15 +3,15 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { ToolSessionContextStore } from "../../src/session/toolContextStore.js";
import { SessionRuntimeContextStore } from "../../src/sessions/runtimeContextStore.js";
describe("ToolSessionContextStore", () => {
describe("SessionRuntimeContextStore", () => {
let tempDir: string;
let store: ToolSessionContextStore;
let store: SessionRuntimeContextStore;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), "tjwater-tool-context-"));
store = new ToolSessionContextStore(tempDir);
store = new SessionRuntimeContextStore(tempDir);
await store.initialize();
});
@@ -3,15 +3,15 @@ import { mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { SessionHistoryStore } from "../../src/history/store.js";
import { SessionTranscriptStore } from "../../src/sessions/transcriptStore.js";
describe("SessionHistoryStore", () => {
describe("SessionTranscriptStore", () => {
let tempDir: string;
let store: SessionHistoryStore;
let store: SessionTranscriptStore;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), "tjwater-history-"));
store = new SessionHistoryStore(tempDir);
tempDir = await mkdtemp(join(tmpdir(), "tjwater-transcript-"));
store = new SessionTranscriptStore(tempDir);
await store.initialize();
});
+67
View File
@@ -0,0 +1,67 @@
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { SkillStore } from "../../src/skills/store.js";
describe("SkillStore", () => {
let originalCwd: string;
let tempDir: string;
let alternateCwd: string;
let skillsRoot: string;
let backupRoot: string;
let store: SkillStore;
beforeEach(async () => {
originalCwd = process.cwd();
tempDir = await mkdtemp(join(tmpdir(), "tjwater-skills-"));
alternateCwd = join(tempDir, "runtime-cwd");
skillsRoot = join(tempDir, "project", ".opencode", "skills");
backupRoot = join(tempDir, "backup", "skills");
store = new SkillStore(skillsRoot, backupRoot);
});
afterEach(async () => {
process.chdir(originalCwd);
await rm(tempDir, { force: true, recursive: true });
});
it("writes scripts under the configured skills root regardless of process cwd", async () => {
await mkdir(alternateCwd, { recursive: true });
process.chdir(alternateCwd);
const result = await store.writeScript(
"workflow/hydraulic-bottleneck-analysis",
"scripts/analyze.py",
"print('ok')\n",
);
expect(result).toEqual({
changed: true,
detail: "script written",
target: join(
skillsRoot,
"workflow",
"hydraulic-bottleneck-analysis",
"scripts",
"analyze.py",
),
});
await expect(readFile(result.target, "utf8")).resolves.toBe("print('ok')\n");
});
it("rejects script paths outside scripts/*.py", async () => {
const result = await store.writeScript(
"workflow/hydraulic-bottleneck-analysis",
"analyze.ts",
"console.log('ok')\n",
);
expect(result).toEqual({
changed: false,
detail: "invalid script file_path",
target: "",
});
});
});