From 4ec6cbed1615e0bcada837b32419c0d662764660 Mon Sep 17 00:00:00 2001 From: Huarch Date: Fri, 15 May 2026 11:50:20 +0800 Subject: [PATCH] =?UTF-8?q?LLM-driven=20=E8=AE=BE=E8=AE=A1=EF=BC=8C?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AD=A6=E4=B9=A0=E5=AE=A1=E8=AE=A1=E5=92=8C?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E5=8E=86=E5=8F=B2=E5=AD=98=E5=82=A8=E8=87=B3?= =?UTF-8?q?=E7=9B=AE=E5=BD=95=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .opencode/tools/memory_manager.ts | 71 +++- .opencode/tools/session_search.ts | 43 +++ .opencode/tools/skill_manager.ts | 175 +++------ src/audit/learningAudit.ts | 34 ++ src/chat/sessionBridge.ts | 12 + src/config.ts | 26 ++ src/history/store.ts | 213 +++++++++++ src/learning/orchestrator.ts | 581 ++++++++++++++++++++++++++++++ src/learning/stateStore.ts | 76 ++++ src/memory/store.ts | 81 ++++- src/routes/chat.ts | 37 +- src/server.ts | 54 ++- src/session/toolContextStore.ts | 2 + src/skills/store.ts | 279 ++++++++++++++ src/utils/persistencePolicy.ts | 6 + 15 files changed, 1557 insertions(+), 133 deletions(-) create mode 100644 .opencode/tools/session_search.ts create mode 100644 src/audit/learningAudit.ts create mode 100644 src/history/store.ts create mode 100644 src/learning/orchestrator.ts create mode 100644 src/learning/stateStore.ts create mode 100644 src/skills/store.ts diff --git a/.opencode/tools/memory_manager.ts b/.opencode/tools/memory_manager.ts index 213f051..a1892c3 100644 --- a/.opencode/tools/memory_manager.ts +++ b/.opencode/tools/memory_manager.ts @@ -11,8 +11,11 @@ const initializePromise = Promise.all([ export default tool({ description: - "将长期有效的用户偏好或项目事实写入持久 memory。禁止写入 token、password、secret、system prompt 或一次性上下文。scope 仅允许 'user' 或 'workspace'。", + "管理长期有效的用户偏好或项目事实。支持 add/list/replace/remove。禁止写入 token、password、secret、system prompt 或一次性上下文。scope 仅允许 'user' 或 'workspace'。", args: { + action: tool.schema + .enum(["add", "list", "replace", "remove"]) + .describe("Memory operation to perform."), reason: tool.schema .string() .describe("Why this memory should be persisted for future requests."), @@ -23,9 +26,14 @@ export default tool({ ), content: tool.schema .string() + .optional() .describe( "The durable fact or preference to remember, written as one concise sentence.", ), + target_id: tool.schema + .string() + .optional() + .describe("Stable memory entry id used by replace/remove."), }, async execute(args, context) { await initializePromise; @@ -47,30 +55,75 @@ export default tool({ detail: `unsupported scope: ${args.scope}; use exact keyword 'user' or 'workspace'`, }); } + if (sessionContext.allowLearningWrite === false && args.action !== "list") { + return JSON.stringify({ + ok: true, + kind: "memory", + decision: "rejected", + detail: "memory writes are disabled for this session", + }); + } const scopeKey = scope === "user" ? sessionContext.actorKey : sessionContext.projectKey; - const result = await memoryStore.upsert(scope, scopeKey, { - content: args.content, + if (args.action === "list") { + return JSON.stringify({ + ok: true, + kind: "memory", + decision: "accepted", + detail: "memory listed", + items: await memoryStore.list(scope, scopeKey), + target: scope, + }); + } + + if (args.action === "add") { + const result = await memoryStore.upsert(scope, scopeKey, { + content: args.content ?? "", sessionId: context.sessionID, source: "tool", traceId: sessionContext.traceId, - }); - - if (!result.entry) { + }); + if (!result.entry) { return JSON.stringify({ ok: true, kind: "memory", decision: "rejected", detail: "content rejected by persistence policy", }); - } - - return JSON.stringify({ + } + return JSON.stringify({ ok: true, kind: "memory", decision: result.changed ? "accepted" : "deduped", detail: result.changed ? "memory stored" : "memory already existed", + entry: result.entry, + target: scope, + }); + } + + if (args.action === "replace") { + const result = await memoryStore.replace(scope, scopeKey, args.target_id ?? "", { + content: args.content ?? "", + sessionId: context.sessionID, + source: "tool", + traceId: sessionContext.traceId, + }); + return JSON.stringify({ + ok: true, + kind: "memory", + decision: result.changed ? "accepted" : "rejected", + detail: result.detail, + target: scope, + }); + } + + const result = await memoryStore.remove(scope, scopeKey, args.target_id ?? ""); + return JSON.stringify({ + ok: true, + kind: "memory", + decision: result.changed ? "accepted" : "rejected", + detail: result.detail, target: scope, }); }, diff --git a/.opencode/tools/session_search.ts b/.opencode/tools/session_search.ts new file mode 100644 index 0000000..152804e --- /dev/null +++ b/.opencode/tools/session_search.ts @@ -0,0 +1,43 @@ +import { tool } from "@opencode-ai/plugin"; + +const internalBaseUrl = + process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8787"; +const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; + +export default tool({ + description: + "搜索当前用户和项目范围内的历史会话 transcript。适合回忆过去讨论过的案例、约束和结论,避免把一次性案例写入 memory。", + args: { + reason: tool.schema + .string() + .describe("Why prior session history is needed for the current request."), + query: tool.schema + .string() + .describe("What to search for in prior session history."), + max_results: tool.schema + .number() + .int() + .positive() + .optional() + .describe("Optional maximum number of hits to return."), + }, + async execute(args, context) { + const response = await fetch(`${internalBaseUrl}/internal/tools/session-search`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-agent-internal-token": internalToken, + }, + body: JSON.stringify({ + max_results: args.max_results, + query: args.query, + sessionId: context.sessionID, + }), + }); + const text = await response.text(); + if (!response.ok) { + throw new Error(text); + } + return text; + }, +}); diff --git a/.opencode/tools/skill_manager.ts b/.opencode/tools/skill_manager.ts index e81554a..a2d45d0 100644 --- a/.opencode/tools/skill_manager.ts +++ b/.opencode/tools/skill_manager.ts @@ -1,37 +1,51 @@ import { tool } from "@opencode-ai/plugin"; -import { join, posix } from "node:path"; -import { config } from "../../src/config.js"; +import { SkillStore } from "../../src/skills/store.js"; import { ToolSessionContextStore } from "../../src/session/toolContextStore.js"; -import { - atomicWriteFileWithHistory, - ensureDirectory, - readTextFile, -} from "../../src/utils/fileStore.js"; -import { sanitizePersistentLine } from "../../src/utils/persistencePolicy.js"; const toolContextStore = new ToolSessionContextStore(); const initializePromise = toolContextStore.initialize(); -const SKILLS_ROOT_DIR = ".opencode/skills"; -// learned skill 与正式技能树同路径组织,但历史版本单独落到 data/history/skills 下。 -const SKILLS_HISTORY_DIR = join(config.PERSISTENCE_HISTORY_DIR, "skills"); -const LEARNED_PATTERNS_MARKER = "## Learned Patterns"; -let writeQueue: Promise = Promise.resolve(); +const skillStore = new SkillStore(); export default tool({ description: - "将已验证、可复用、非敏感的 workflow 或方法模式写入指定的 .opencode/skills 目录,由 opencode 自动识别和加载。", + "维护已验证、可复用、非敏感的 workflow 或方法模式。支持 list、append_pattern、remove_pattern、write_reference、remove_reference。", args: { + action: tool.schema + .enum([ + "list", + "append_pattern", + "remove_pattern", + "write_reference", + "remove_reference", + ]) + .describe("Skill maintenance operation."), reason: tool.schema .string() .describe( - "The reusable workflow or method pattern to persist for future reuse, written as one concise sentence.", + "Why this skill maintenance action is justified for future reuse.", ), skill_path: tool.schema .string() .describe( "Target skill directory path relative to .opencode/skills, for example analytics/simulation-analysis/leakage or platform/governance-observability/meta.", ), + pattern: tool.schema + .string() + .optional() + .describe("Pattern text used by append_pattern."), + target_id: tool.schema + .string() + .optional() + .describe("Stable learned pattern id used by remove_pattern."), + file_path: tool.schema + .string() + .optional() + .describe("Reference file path under references/, such as references/bottleneck-notes.md."), + content: tool.schema + .string() + .optional() + .describe("Reference markdown body used by write_reference."), }, async execute(args, context) { await initializePromise; @@ -39,122 +53,53 @@ export default tool({ if (!sessionContext) { throw new Error(`session context not found for ${context.sessionID}`); } - const skillPath = normalizeSkillPath(args.skill_path); - if (!skillPath) { + if (sessionContext.allowLearningWrite === false && args.action !== "list") { return JSON.stringify({ ok: true, kind: "skill", decision: "rejected", - detail: - "invalid skill_path; expected a relative path under .opencode/skills", + detail: "skill writes are disabled for this session", }); } - const pattern = sanitizePersistentLine(args.reason, 320); - if (!pattern) { + if (args.action === "list") { + const result = await skillStore.list(args.skill_path); + if (!result) { + return JSON.stringify({ + ok: true, + kind: "skill", + decision: "rejected", + detail: + "invalid skill_path; expected a relative path under .opencode/skills", + }); + } return JSON.stringify({ ok: true, kind: "skill", - decision: "rejected", - detail: "reason rejected by persistence policy", + decision: "accepted", + detail: "skill listed", + ...result, }); } - const result = await appendLearnedSkillPattern(skillPath, pattern); + const result = + args.action === "append_pattern" + ? await skillStore.appendPattern(args.skill_path, args.pattern ?? "") + : args.action === "remove_pattern" + ? await skillStore.removePattern(args.skill_path, args.target_id ?? "") + : args.action === "write_reference" + ? await skillStore.writeReference( + args.skill_path, + args.file_path ?? "", + args.content ?? "", + ) + : await skillStore.removeReference(args.skill_path, args.file_path ?? ""); + return JSON.stringify({ ok: true, kind: "skill", - decision: result.changed ? "accepted" : "deduped", - detail: result.changed ? "skill file updated" : "pattern already existed", + decision: result.changed ? "accepted" : "rejected", + detail: result.detail, target: result.target, }); }, }); - -const appendLearnedSkillPattern = async ( - skillPath: string, - pattern: string, -) => { - return serializeWrite(async () => { - const target = join(SKILLS_ROOT_DIR, skillPath, "SKILL.md"); - const current = - (await readTextFile(target)) ?? defaultLearnedSkill(skillPath); - const existingPatterns = extractLearnedPatterns(current); - if (existingPatterns.includes(pattern)) { - return { changed: false, target }; - } - - const next = current.includes(LEARNED_PATTERNS_MARKER) - ? current.replace( - LEARNED_PATTERNS_MARKER, - `${LEARNED_PATTERNS_MARKER}\n- ${pattern}`, - ) - : `${current.trimEnd()}\n\n${LEARNED_PATTERNS_MARKER}\n- ${pattern}\n`; - - await ensureDirectory(join(SKILLS_ROOT_DIR, skillPath)); - // 追加 learned pattern 前先备份旧版 SKILL.md,避免共享技能被异常写坏。 - await atomicWriteFileWithHistory(target, next, { - historyDir: SKILLS_HISTORY_DIR, - rootDir: SKILLS_ROOT_DIR, - }); - return { changed: true, target }; - }); -}; - -const serializeWrite = async (task: () => Promise) => { - const run = writeQueue.catch(() => undefined).then(task); - writeQueue = run.then( - () => undefined, - () => undefined, - ); - return run; -}; - -const defaultLearnedSkill = (skillPath: string) => `--- -name: tjwater-action-${toSkillName(skillPath)} -description: 由 skill_manager 在线追加的高置信度可复用 workflow。 -version: 1.0.0 ---- - -# learned skill - -## 简介 - -记录由 \`skill_manager\` 在线追加的高置信度 workflow 模式。 - -## Learned Patterns -`; - -const normalizeSkillPath = (rawSkillPath: string) => { - const normalized = posix.normalize( - rawSkillPath.trim().replace(/^\/+|\/+$/g, ""), - ); - if (!normalized || normalized === "." || normalized.startsWith("..")) { - return null; - } - if (normalized === "SKILL.md" || normalized.endsWith("/SKILL.md")) { - return null; - } - if (!/^[a-z0-9._/-]+$/i.test(normalized)) { - return null; - } - return normalized; -}; - -const toSkillName = (skillPath: string) => - skillPath - .split("/") - .filter(Boolean) - .join("-") - .replace(/[^a-z0-9._-]+/gi, "-") - .replace(/^-+|-+$/g, "") - .slice(0, 120) || "generated-skill"; - -const extractLearnedPatterns = (content: string) => { - if (!content.includes(LEARNED_PATTERNS_MARKER)) { - return []; - } - return (content.split(LEARNED_PATTERNS_MARKER)[1] ?? "") - .split("\n") - .filter((line) => line.trim().startsWith("- ")) - .map((line) => line.trim().slice(2)); -}; diff --git a/src/audit/learningAudit.ts b/src/audit/learningAudit.ts new file mode 100644 index 0000000..eb9464f --- /dev/null +++ b/src/audit/learningAudit.ts @@ -0,0 +1,34 @@ +import { appendFile, mkdir } from "node:fs/promises"; +import { dirname } from "node:path"; + +import { config } from "../config.js"; + +export type LearningAuditEntry = { + action: string; + detail?: string; + outcome: "accepted" | "error" | "rejected" | "skipped"; + projectId?: string; + proposal?: Record; + sessionId: string; + traceId?: string; +}; + +let logDirectoryReadyPromise: Promise | null = null; + +const ensureLogDirectory = async () => { + if (!logDirectoryReadyPromise) { + logDirectoryReadyPromise = mkdir(dirname(config.LEARNING_AUDIT_LOG_PATH), { + recursive: true, + }).then(() => undefined); + } + await logDirectoryReadyPromise; +}; + +export const writeLearningAuditLog = async (entry: LearningAuditEntry) => { + await ensureLogDirectory(); + const line = JSON.stringify({ + timestamp: new Date().toISOString(), + ...entry, + }); + await appendFile(config.LEARNING_AUDIT_LOG_PATH, `${line}\n`, "utf8"); +}; diff --git a/src/chat/sessionBridge.ts b/src/chat/sessionBridge.ts index 6d14380..3527764 100644 --- a/src/chat/sessionBridge.ts +++ b/src/chat/sessionBridge.ts @@ -52,7 +52,9 @@ export class ChatSessionBridge { this.sessionContexts.set(current.sessionId, requestContext); await this.toolContextStore.write({ actorKey: requestContext.actorKey, + allowLearningWrite: true, clientSessionId: requestContext.clientSessionId, + learningMode: "interactive", projectId: requestContext.projectId, projectKey: requestContext.projectKey, sessionId: current.sessionId, @@ -79,7 +81,9 @@ export class ChatSessionBridge { this.sessionContexts.set(binding.sessionId, requestContext); await this.toolContextStore.write({ actorKey: requestContext.actorKey, + allowLearningWrite: true, clientSessionId: requestContext.clientSessionId, + learningMode: "interactive", projectId: requestContext.projectId, projectKey: requestContext.projectKey, sessionId: binding.sessionId, @@ -148,7 +152,9 @@ export class ChatSessionBridge { this.sessionContexts.set(binding.sessionId, requestContext); await this.toolContextStore.write({ actorKey: requestContext.actorKey, + allowLearningWrite: true, clientSessionId: requestContext.clientSessionId, + learningMode: "interactive", projectId: requestContext.projectId, projectKey: requestContext.projectKey, sessionId: binding.sessionId, @@ -189,7 +195,9 @@ export class ChatSessionBridge { this.sessionContexts.set(binding.sessionId, nextRequestContext); await this.toolContextStore.write({ actorKey: nextRequestContext.actorKey, + allowLearningWrite: true, clientSessionId: nextRequestContext.clientSessionId, + learningMode: "interactive", projectId: nextRequestContext.projectId, projectKey: nextRequestContext.projectKey, sessionId: binding.sessionId, @@ -215,7 +223,9 @@ export class ChatSessionBridge { this.sessionContexts.set(binding.sessionId, nextRequestContext); await this.toolContextStore.write({ actorKey: nextRequestContext.actorKey, + allowLearningWrite: true, clientSessionId: nextRequestContext.clientSessionId, + learningMode: "interactive", projectId: nextRequestContext.projectId, projectKey: nextRequestContext.projectKey, sessionId: binding.sessionId, @@ -243,7 +253,9 @@ export class ChatSessionBridge { this.sessionContexts.set(binding.sessionId, nextRequestContext); await this.toolContextStore.write({ actorKey: nextRequestContext.actorKey, + allowLearningWrite: true, clientSessionId: nextRequestContext.clientSessionId, + learningMode: "interactive", projectId: nextRequestContext.projectId, projectKey: nextRequestContext.projectKey, sessionId: binding.sessionId, diff --git a/src/config.ts b/src/config.ts index 397ebe5..5621bb6 100644 --- a/src/config.ts +++ b/src/config.ts @@ -52,6 +52,32 @@ const envSchema = z.object({ PERSISTENCE_HISTORY_DIR: z.string().default("./data/history"), // 注入到 prompt 的 memory 快照最大字符数,避免上下文过大。 MEMORY_MAX_PROMPT_CHARS: z.coerce.number().int().positive().default(1800), + // session transcript 持久化目录。 + SESSION_HISTORY_STORAGE_DIR: z.string().default("./data/session-history"), + // 每个会话最多保留多少轮 transcript,超过后裁剪旧记录。 + SESSION_HISTORY_MAX_TURNS_PER_SESSION: z.coerce + .number() + .int() + .positive() + .default(120), + // session_search 工具默认返回的最大命中数。 + SESSION_SEARCH_MAX_RESULTS: z.coerce.number().int().positive().default(8), + // session_search 查询文本最大长度。 + SESSION_SEARCH_MAX_QUERY_CHARS: z.coerce.number().int().positive().default(240), + // learning review 会话状态目录。 + LEARNING_STATE_STORAGE_DIR: z.string().default("./data/learning-state"), + // learning audit 日志路径。 + LEARNING_AUDIT_LOG_PATH: z + .string() + .default("./logs/learning-audit.log"), + // learning gate 的最小 turn 冷却间隔;这是运行时节流,不参与内容判断。 + LEARNING_GATE_TURN_COOLDOWN: z.coerce.number().int().positive().default(2), + // gate 结果被提升为 review 前的最低置信度。 + LEARNING_GATE_MIN_CONFIDENCE: z.coerce.number().min(0).max(1).default(0.65), + // review prompt 最多携带多少轮最近 transcript。 + LEARNING_REVIEW_MAX_RECENT_TURNS: z.coerce.number().int().positive().default(8), + // review proposal 的最低置信度阈值。 + LEARNING_MIN_PROPOSAL_CONFIDENCE: z.coerce.number().min(0).max(1).default(0.8), // result_ref 持久化存储目录。 RESULT_REF_STORAGE_DIR: z.string().default("./data/result-refs"), // result_ref 保留时长(小时)。 diff --git a/src/history/store.ts b/src/history/store.ts new file mode 100644 index 0000000..cd7525b --- /dev/null +++ b/src/history/store.ts @@ -0,0 +1,213 @@ +import { join } from "node:path"; + +import { config } from "../config.js"; +import { + atomicWriteJson, + ensureDirectory, + listJsonFiles, + readJsonFile, + toStableId, +} from "../utils/fileStore.js"; +import { sanitizePersistentDocument } from "../utils/persistencePolicy.js"; + +export type SessionTurnRecord = { + id: string; + assistantMessage: string; + timestamp: string; + toolCallCount: number; + userMessage: string; +}; + +type SessionTranscriptRecord = { + actorKey: string; + clientSessionId?: string; + projectKey: string; + sessionId: string; + turns: SessionTurnRecord[]; + updatedAt: string; +}; + +export type SessionSearchHit = { + matchedField: "assistant" | "user"; + score: number; + sessionId: string; + snippet: string; + timestamp: string; + turnId: string; +}; + +type SessionHistoryContext = { + actorKey: string; + clientSessionId?: string; + projectKey: string; + sessionId: string; +}; + +export class SessionHistoryStore { + private readonly writeQueues = new Map>(); + + constructor(private readonly baseDir = config.SESSION_HISTORY_STORAGE_DIR) {} + + async initialize() { + await ensureDirectory(this.baseDir); + } + + async appendTurn( + context: SessionHistoryContext, + turn: { + assistantMessage: string; + toolCallCount: number; + userMessage: string; + }, + ) { + const key = this.filePath(context); + return this.serializeWrite(key, async () => { + const transcript = (await this.readTranscript(context)) ?? { + actorKey: context.actorKey, + clientSessionId: context.clientSessionId, + projectKey: context.projectKey, + sessionId: context.sessionId, + turns: [], + updatedAt: new Date().toISOString(), + }; + const userMessage = sanitizePersistentDocument(turn.userMessage, 4000); + const assistantMessage = sanitizePersistentDocument(turn.assistantMessage, 4000); + if (!userMessage || !assistantMessage) { + return transcript; + } + + const timestamp = new Date().toISOString(); + const record: SessionTurnRecord = { + id: toStableId(context.sessionId, timestamp, userMessage, assistantMessage), + assistantMessage, + timestamp, + toolCallCount: Math.max(0, turn.toolCallCount), + userMessage, + }; + transcript.clientSessionId = context.clientSessionId ?? transcript.clientSessionId; + transcript.turns.push(record); + if (transcript.turns.length > config.SESSION_HISTORY_MAX_TURNS_PER_SESSION) { + transcript.turns = transcript.turns.slice( + transcript.turns.length - config.SESSION_HISTORY_MAX_TURNS_PER_SESSION, + ); + } + transcript.updatedAt = timestamp; + await atomicWriteJson(key, transcript); + return transcript; + }); + } + + async getRecentTurns( + context: SessionHistoryContext, + limit: number, + ): Promise { + const transcript = await this.readTranscript(context); + if (!transcript) { + return []; + } + return transcript.turns.slice(-Math.max(1, limit)); + } + + async search( + context: Pick, + query: string, + maxResults = config.SESSION_SEARCH_MAX_RESULTS, + ): Promise { + const normalizedQuery = query.trim().toLowerCase().slice(0, config.SESSION_SEARCH_MAX_QUERY_CHARS); + if (!normalizedQuery) { + return []; + } + const queryTokens = normalizedQuery.split(/\s+/).filter(Boolean); + const hits: SessionSearchHit[] = []; + const files = await listJsonFiles(this.baseDir); + for (const file of files) { + const transcript = await readJsonFile(file); + if (!transcript) { + continue; + } + if ( + transcript.actorKey !== context.actorKey || + transcript.projectKey !== context.projectKey + ) { + continue; + } + for (const turn of transcript.turns) { + const candidates: Array<["user" | "assistant", string]> = [ + ["user", turn.userMessage], + ["assistant", turn.assistantMessage], + ]; + for (const [matchedField, text] of candidates) { + const score = scoreText(text, normalizedQuery, queryTokens); + if (score <= 0) { + continue; + } + hits.push({ + matchedField, + score, + sessionId: transcript.sessionId, + snippet: buildSnippet(text, normalizedQuery), + timestamp: turn.timestamp, + turnId: turn.id, + }); + } + } + } + return hits.sort((a, b) => b.score - a.score).slice(0, Math.max(1, maxResults)); + } + + private async readTranscript(context: SessionHistoryContext) { + return await readJsonFile(this.filePath(context)); + } + + private filePath(context: SessionHistoryContext) { + return join( + this.baseDir, + `${context.actorKey}__${context.projectKey}__${context.sessionId}.json`, + ); + } + + private async serializeWrite(key: string, task: () => Promise) { + const previous = this.writeQueues.get(key) ?? Promise.resolve(); + const run = previous.catch(() => undefined).then(task); + const next = run.then( + () => undefined, + () => undefined, + ); + this.writeQueues.set(key, next); + try { + return await run; + } finally { + if (this.writeQueues.get(key) === next) { + this.writeQueues.delete(key); + } + } + } +} + +const scoreText = (text: string, query: string, queryTokens: string[]) => { + const normalized = text.toLowerCase(); + let score = 0; + if (normalized.includes(query)) { + score += Math.max(10, query.length); + } + for (const token of queryTokens) { + if (token.length >= 2 && normalized.includes(token)) { + score += 1; + } + } + return score; +}; + +const buildSnippet = (text: string, query: string) => { + const compact = text.replace(/\s+/g, " ").trim(); + const idx = compact.toLowerCase().indexOf(query); + if (idx === -1) { + return compact.length > 180 ? `${compact.slice(0, 177)}...` : compact; + } + const start = Math.max(0, idx - 60); + const end = Math.min(compact.length, idx + query.length + 100); + const snippet = compact.slice(start, end).trim(); + const prefix = start > 0 ? "..." : ""; + const suffix = end < compact.length ? "..." : ""; + return `${prefix}${snippet}${suffix}`; +}; diff --git a/src/learning/orchestrator.ts b/src/learning/orchestrator.ts new file mode 100644 index 0000000..f955b4a --- /dev/null +++ b/src/learning/orchestrator.ts @@ -0,0 +1,581 @@ +import { z } from "zod"; + +import { writeLearningAuditLog } from "../audit/learningAudit.js"; +import { type ChatRequestContext } from "../chat/sessionBridge.js"; +import { config } from "../config.js"; +import { type SessionTurnRecord, SessionHistoryStore } from "../history/store.js"; +import { logger } from "../logger.js"; +import { LearningStateStore } from "./stateStore.js"; +import { MemoryStore, type MemoryScope } from "../memory/store.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; +import { SkillStore } from "../skills/store.js"; +import { ToolSessionContextStore } from "../session/toolContextStore.js"; +import { + sanitizePersistentDocument, + sanitizePersistentLine, +} from "../utils/persistencePolicy.js"; + +const gateResultSchema = z.object({ + confidence: z.number().min(0).max(1).default(0), + focus: z.enum(["memory", "skill", "both", "none"]).default("none"), + reason: z.string().default(""), + should_review: z.boolean().default(false), +}); + +const reviewResultSchema = z.object({ + memories: z + .array( + z.object({ + action: z.enum(["add", "replace", "remove"]), + confidence: z.number().min(0).max(1), + content: z.string().optional(), + evidence: z.string().default(""), + scope: z.enum(["user", "workspace"]), + target_id: z.string().optional(), + }), + ) + .default([]), + skills: z + .array( + z.object({ + action: z.enum(["append_pattern", "remove_pattern", "write_reference"]), + confidence: z.number().min(0).max(1), + content: z.string().optional(), + evidence: z.string().default(""), + file_path: z.string().optional(), + pattern: z.string().optional(), + skill_path: z.string(), + target_id: z.string().optional(), + }), + ) + .default([]), + summary: z.string().default(""), +}); + +type GateResult = z.infer; +type ReviewResult = z.infer; + +type SupportedModel = "deepseek/deepseek-v4-flash" | "deepseek/deepseek-v4-pro"; + +type TurnReviewInput = { + assistantMessage: string; + model?: SupportedModel; + requestContext: ChatRequestContext; + sessionId: string; + toolCallCount: number; + userMessage: string; +}; + +export class LearningOrchestrator { + private readonly activeReviews = new Set(); + private readonly learningStateStore = new LearningStateStore(); + private readonly skillStore = new SkillStore(); + private readonly toolContextStore = new ToolSessionContextStore(); + + constructor( + private readonly runtime: OpencodeRuntimeAdapter, + private readonly memoryStore: MemoryStore, + private readonly historyStore: SessionHistoryStore, + ) {} + + async initialize() { + await Promise.all([ + this.learningStateStore.initialize(), + this.toolContextStore.initialize(), + ]); + } + + async onTurnCompleted(input: TurnReviewInput) { + const transcript = await this.historyStore.appendTurn( + { + actorKey: input.requestContext.actorKey, + clientSessionId: input.requestContext.clientSessionId, + projectKey: input.requestContext.projectKey, + sessionId: input.sessionId, + }, + { + assistantMessage: input.assistantMessage, + toolCallCount: input.toolCallCount, + userMessage: input.userMessage, + }, + ); + const turnCount = transcript.turns.length; + if (this.activeReviews.has(input.sessionId)) { + return; + } + this.activeReviews.add(input.sessionId); + try { + const state = await this.learningStateStore.read(input.sessionId); + const turnsSinceGate = Math.max(0, turnCount - state.lastGatedTurn); + if (turnsSinceGate < config.LEARNING_GATE_TURN_COOLDOWN || state.pendingReview) { + this.activeReviews.delete(input.sessionId); + return; + } + await this.learningStateStore.markPending(input.sessionId, true); + } catch (error) { + this.activeReviews.delete(input.sessionId); + throw error; + } + queueMicrotask(() => { + void this.runGate({ + input, + recentTurns: transcript.turns.slice(-config.LEARNING_REVIEW_MAX_RECENT_TURNS), + turnCount, + }).finally(() => { + this.activeReviews.delete(input.sessionId); + }); + }); + } + + private async runGate({ + input, + recentTurns, + turnCount, + }: { + input: TurnReviewInput; + recentTurns: SessionTurnRecord[]; + turnCount: number; + }) { + let gateSessionId: string | null = null; + try { + const gateSession = await this.runtime.createSession( + `learning-gate-${input.requestContext.clientSessionId}`, + ); + gateSessionId = gateSession.id; + await this.toolContextStore.write({ + actorKey: input.requestContext.actorKey, + allowLearningWrite: false, + clientSessionId: `gate-${input.requestContext.clientSessionId}`, + learningMode: "review", + projectId: input.requestContext.projectId, + projectKey: input.requestContext.projectKey, + sessionId: gateSession.id, + traceId: input.requestContext.traceId, + }); + await this.runtime.prompt( + gateSession.id, + buildGatePrompt({ recentTurns }), + GATE_MODEL, + ); + const messages = await this.runtime.messages(gateSession.id, 20); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const gateText = collectTextContent(assistantMessage?.parts ?? []); + const gate = parseGateResult(gateText); + if (!gate) { + await this.learningStateStore.completeGate(input.sessionId, turnCount); + await writeLearningAuditLog({ + action: "review-gate", + detail: "gate result was not valid JSON", + outcome: "error", + projectId: input.requestContext.projectId, + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return; + } + const shouldPromote = + gate.should_review && + gate.confidence >= config.LEARNING_GATE_MIN_CONFIDENCE && + gate.focus !== "none"; + await writeLearningAuditLog({ + action: "review-gate", + detail: sanitizeAuditDetail(gate.reason), + outcome: shouldPromote ? "accepted" : "skipped", + projectId: input.requestContext.projectId, + proposal: sanitizeGateForAudit(gate), + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + if (!shouldPromote) { + await this.learningStateStore.completeGate(input.sessionId, turnCount); + return; + } + await this.runReview({ + focus: gate.focus, + input, + recentTurns, + turnCount, + }); + } catch (error) { + await this.learningStateStore.markPending(input.sessionId, false); + logger.warn({ err: error, sessionId: input.sessionId }, "learning gate failed"); + await writeLearningAuditLog({ + action: "review-gate", + detail: sanitizeAuditDetail(error instanceof Error ? error.message : String(error)), + outcome: "error", + projectId: input.requestContext.projectId, + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + } finally { + if (gateSessionId) { + await this.toolContextStore.remove(gateSessionId).catch(() => undefined); + await this.runtime.abortSession(gateSessionId).catch(() => undefined); + } + } + } + + private async runReview({ + focus, + input, + recentTurns, + turnCount, + }: { + focus: GateResult["focus"]; + input: TurnReviewInput; + recentTurns: SessionTurnRecord[]; + turnCount: number; + }) { + const reviewSession = await this.runtime.createSession( + `learning-review-${input.requestContext.clientSessionId}`, + ); + await this.toolContextStore.write({ + actorKey: input.requestContext.actorKey, + allowLearningWrite: false, + clientSessionId: `review-${input.requestContext.clientSessionId}`, + learningMode: "review", + projectId: input.requestContext.projectId, + projectKey: input.requestContext.projectKey, + sessionId: reviewSession.id, + traceId: input.requestContext.traceId, + }); + try { + await this.runtime.prompt( + reviewSession.id, + buildReviewPrompt({ focus, recentTurns }), + toRuntimeModel(input.model), + ); + const messages = await this.runtime.messages(reviewSession.id, 20); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const reviewText = collectTextContent(assistantMessage?.parts ?? []); + const parsed = parseReviewResult(reviewText); + if (!parsed) { + await this.learningStateStore.completeGate(input.sessionId, turnCount); + await writeLearningAuditLog({ + action: "review-parse", + detail: "review result was not valid JSON", + outcome: "error", + projectId: input.requestContext.projectId, + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return; + } + await this.applyReviewResult(input, parsed, turnCount); + await this.learningStateStore.completeReview(input.sessionId, turnCount); + } catch (error) { + await this.learningStateStore.markPending(input.sessionId, false); + logger.warn({ err: error, sessionId: input.sessionId }, "learning review failed"); + await writeLearningAuditLog({ + action: "review-run", + detail: sanitizeAuditDetail(error instanceof Error ? error.message : String(error)), + outcome: "error", + projectId: input.requestContext.projectId, + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + } finally { + await this.toolContextStore.remove(reviewSession.id).catch(() => undefined); + await this.runtime.abortSession(reviewSession.id).catch(() => undefined); + } + } + + private async applyReviewResult( + input: TurnReviewInput, + result: ReviewResult, + turnCount: number, + ) { + const threshold = config.LEARNING_MIN_PROPOSAL_CONFIDENCE; + let accepted = 0; + for (const proposal of result.memories) { + const outcome = await this.applyMemoryProposal(input, proposal, threshold); + accepted += outcome ? 1 : 0; + } + for (const proposal of result.skills) { + const outcome = await this.applySkillProposal(input, proposal, threshold); + accepted += outcome ? 1 : 0; + } + await writeLearningAuditLog({ + action: "review-summary", + detail: sanitizeAuditDetail(result.summary), + outcome: accepted > 0 ? "accepted" : "skipped", + projectId: input.requestContext.projectId, + proposal: { + accepted, + memories: result.memories.length, + skills: result.skills.length, + turnCount, + }, + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + } + + private async applyMemoryProposal( + input: TurnReviewInput, + proposal: ReviewResult["memories"][number], + threshold: number, + ) { + if (proposal.confidence < threshold) { + await writeLearningAuditLog({ + action: `memory-${proposal.action}`, + detail: "proposal below confidence threshold", + outcome: "skipped", + projectId: input.requestContext.projectId, + proposal: sanitizeMemoryProposalForAudit(proposal), + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return false; + } + const scopeKey = + proposal.scope === "user" + ? input.requestContext.actorKey + : input.requestContext.projectKey; + const draft = { + content: proposal.content ?? "", + sessionId: input.sessionId, + source: "review" as const, + traceId: input.requestContext.traceId, + }; + const result = + proposal.action === "add" + ? await this.memoryStore.upsert(proposal.scope as MemoryScope, scopeKey, draft) + : proposal.action === "replace" + ? await this.memoryStore.replace( + proposal.scope as MemoryScope, + scopeKey, + proposal.target_id ?? "", + draft, + ) + : await this.memoryStore.remove( + proposal.scope as MemoryScope, + scopeKey, + proposal.target_id ?? "", + ); + const accepted = + "entry" in result ? Boolean(result.entry) : Boolean(result.changed); + await writeLearningAuditLog({ + action: `memory-${proposal.action}`, + detail: sanitizeAuditDetail( + "detail" in result ? result.detail : result.changed ? "memory stored" : "memory deduped", + ), + outcome: accepted ? "accepted" : "rejected", + projectId: input.requestContext.projectId, + proposal: sanitizeMemoryProposalForAudit(proposal), + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return accepted; + } + + private async applySkillProposal( + input: TurnReviewInput, + proposal: ReviewResult["skills"][number], + threshold: number, + ) { + if (proposal.confidence < threshold) { + await writeLearningAuditLog({ + action: `skill-${proposal.action}`, + detail: "proposal below confidence threshold", + outcome: "skipped", + projectId: input.requestContext.projectId, + proposal: sanitizeSkillProposalForAudit(proposal), + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return false; + } + const result = + proposal.action === "append_pattern" + ? await this.skillStore.appendPattern(proposal.skill_path, proposal.pattern ?? "") + : proposal.action === "remove_pattern" + ? await this.skillStore.removePattern( + proposal.skill_path, + proposal.target_id ?? "", + ) + : await this.skillStore.writeReference( + proposal.skill_path, + proposal.file_path ?? "", + proposal.content ?? "", + ); + await writeLearningAuditLog({ + action: `skill-${proposal.action}`, + detail: sanitizeAuditDetail(result.detail), + outcome: result.changed ? "accepted" : "rejected", + projectId: input.requestContext.projectId, + proposal: sanitizeSkillProposalForAudit(proposal), + sessionId: input.sessionId, + traceId: input.requestContext.traceId, + }); + return result.changed; + } +} + +const buildGatePrompt = ({ recentTurns }: { recentTurns: SessionTurnRecord[] }) => { + const transcript = recentTurns + .map( + (turn, index) => + `Turn ${index + 1}\nUser: ${turn.userMessage}\nAssistant: ${turn.assistantMessage}\nTool calls: ${turn.toolCallCount}`, + ) + .join("\n\n"); + return [ + "You are the learning gate for TJWaterAgent.", + "Do NOT call any tools. Return JSON only. Do NOT wrap in markdown fences.", + "Decide whether this recent conversation is worth a deeper learning review.", + "A review is warranted only when there is likely durable memory or reusable skill signal.", + "Ignore one-off cases, temporary outcomes, and task-local noise.", + "", + 'Return JSON schema: {"should_review":true|false,"reason":"string","confidence":0.0,"focus":"memory|skill|both|none"}', + "", + "Conversation transcript:", + transcript || "(empty)", + ].join("\n"); +}; + +const buildReviewPrompt = ({ + focus, + recentTurns, +}: { + focus: GateResult["focus"]; + recentTurns: SessionTurnRecord[]; +}) => { + const transcript = recentTurns + .map( + (turn, index) => + `Turn ${index + 1}\nUser: ${turn.userMessage}\nAssistant: ${turn.assistantMessage}\nTool calls: ${turn.toolCallCount}`, + ) + .join("\n\n"); + return [ + "You are doing an internal self-improvement review for TJWaterAgent.", + "Do NOT call any tools. Return JSON only. Do NOT wrap in markdown fences.", + `Focus: ${focus}`, + "Decide what durable lessons to keep from the conversation below.", + "", + "Memory rules:", + "- Keep only stable user preferences, durable constraints, or stable workspace facts.", + "- Use scope='user' for user preferences and constraints.", + "- Use scope='workspace' for project or environment facts.", + "- Do not store one-off task outcomes, temporary facts, or speculative conclusions.", + "", + "Skill rules:", + "- Save only reusable workflows, methods, or pitfalls that will help in future similar tasks.", + "- Prefer append_pattern for concise reusable lessons.", + "- Use write_reference only for compact durable supporting notes under references/*.md.", + "- Do not edit frontmatter or arbitrary sections.", + "", + "Output JSON schema:", + `{"summary":"string","memories":[{"action":"add|replace|remove","scope":"user|workspace","content":"string?","target_id":"string?","confidence":0.0,"evidence":"string"}],"skills":[{"action":"append_pattern|remove_pattern|write_reference","skill_path":"string","pattern":"string?","target_id":"string?","file_path":"references/example.md?","content":"string?","confidence":0.0,"evidence":"string"}]}`, + "", + "If nothing should be saved, return empty arrays.", + "", + "Conversation transcript:", + transcript || "(empty)", + ].join("\n"); +}; + +const parseGateResult = (text: string): GateResult | null => { + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i); + const candidate = fenced?.[1]?.trim() ?? trimmed; + try { + return gateResultSchema.parse(JSON.parse(candidate)); + } catch { + return null; + } +}; + +const parseReviewResult = (text: string): ReviewResult | null => { + const trimmed = text.trim(); + if (!trimmed) { + return null; + } + const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i); + const candidate = fenced?.[1]?.trim() ?? trimmed; + try { + return reviewResultSchema.parse(JSON.parse(candidate)); + } catch { + return null; + } +}; + +const collectTextContent = ( + parts: Array<{ type: string; text?: string }>, +) => + parts + .filter((part): part is { type: "text"; text: string } => part.type === "text") + .map((part) => part.text) + .join(""); + +const toRuntimeModel = (model?: SupportedModel) => { + if (!model) { + return undefined; + } + const [providerID, modelID] = model.split("/"); + if (!providerID || !modelID) { + return undefined; + } + return { + modelID, + providerID, + }; +}; + +const GATE_MODEL = { + modelID: "deepseek-v4-flash", + providerID: "deepseek", +} as const; + +const REDACTED_AUDIT_FIELD = "[redacted by persistence policy]"; + +const sanitizeAuditDetail = (detail?: string) => { + if (!detail) { + return undefined; + } + return sanitizePersistentDocument(detail, 1000) || REDACTED_AUDIT_FIELD; +}; + +const sanitizeAuditLine = (value?: string, maxLength = 320) => { + if (value === undefined) { + return undefined; + } + return sanitizePersistentLine(value, maxLength) || REDACTED_AUDIT_FIELD; +}; + +const sanitizeGateForAudit = (gate: GateResult): Record => ({ + confidence: gate.confidence, + focus: gate.focus, + reason: sanitizeAuditLine(gate.reason), + should_review: gate.should_review, +}); + +const sanitizeMemoryProposalForAudit = ( + proposal: ReviewResult["memories"][number], +): Record => ({ + action: proposal.action, + confidence: proposal.confidence, + content: sanitizeAuditLine(proposal.content), + evidence: sanitizeAuditLine(proposal.evidence), + scope: proposal.scope, + target_id: sanitizeAuditLine(proposal.target_id, 120), +}); + +const sanitizeSkillProposalForAudit = ( + proposal: ReviewResult["skills"][number], +): Record => ({ + action: proposal.action, + confidence: proposal.confidence, + content: sanitizeAuditDetail(proposal.content), + evidence: sanitizeAuditLine(proposal.evidence), + file_path: sanitizeAuditLine(proposal.file_path, 200), + pattern: sanitizeAuditLine(proposal.pattern), + skill_path: sanitizeAuditLine(proposal.skill_path, 200), + target_id: sanitizeAuditLine(proposal.target_id, 120), +}); diff --git a/src/learning/stateStore.ts b/src/learning/stateStore.ts new file mode 100644 index 0000000..33e032b --- /dev/null +++ b/src/learning/stateStore.ts @@ -0,0 +1,76 @@ +import { join } from "node:path"; + +import { config } from "../config.js"; +import { + atomicWriteJson, + ensureDirectory, + readJsonFile, +} from "../utils/fileStore.js"; + +export type LearningSessionState = { + lastGatedTurn: number; + lastReviewedTurn: number; + pendingReview: boolean; + sessionId: string; + updatedAt: string; +}; + +export class LearningStateStore { + constructor(private readonly baseDir = config.LEARNING_STATE_STORAGE_DIR) {} + + async initialize() { + await ensureDirectory(this.baseDir); + } + + async read(sessionId: string): Promise { + const existing = await readJsonFile(this.filePath(sessionId)); + if (existing) { + return existing; + } + return { + lastGatedTurn: 0, + lastReviewedTurn: 0, + pendingReview: false, + sessionId, + updatedAt: new Date(0).toISOString(), + }; + } + + async write(state: LearningSessionState) { + await atomicWriteJson(this.filePath(state.sessionId), { + ...state, + updatedAt: new Date().toISOString(), + }); + } + + async markPending(sessionId: string, pendingReview: boolean) { + const current = await this.read(sessionId); + await this.write({ + ...current, + pendingReview, + }); + } + + async completeReview(sessionId: string, reviewedTurnCount: number) { + const current = await this.read(sessionId); + await this.write({ + ...current, + lastGatedTurn: Math.max(current.lastGatedTurn, reviewedTurnCount), + lastReviewedTurn: reviewedTurnCount, + pendingReview: false, + }); + } + + async completeGate(sessionId: string, gatedTurnCount: number) { + const current = await this.read(sessionId); + await this.write({ + ...current, + lastGatedTurn: gatedTurnCount, + pendingReview: false, + }); + } + + private filePath(sessionId: string) { + return join(this.baseDir, `${sessionId}.json`); + } +} diff --git a/src/memory/store.ts b/src/memory/store.ts index 0b0f115..c8771fe 100644 --- a/src/memory/store.ts +++ b/src/memory/store.ts @@ -6,6 +6,7 @@ import { atomicWriteFileWithHistory, ensureDirectory, readTextFile, + toStableId, } from "../utils/fileStore.js"; export type MemoryScope = "user" | "workspace"; @@ -13,6 +14,7 @@ export type MemoryEntrySource = "review" | "tool"; export type MemoryEntry = { content: string; + id: string; }; export type MemoryDraft = { @@ -64,7 +66,10 @@ export class MemoryStore { return { changed: false, entry: existing }; } - const entry: MemoryEntry = { content }; + const entry: MemoryEntry = { + content, + id: toStableId(scope, key, content.toLowerCase()), + }; entries.unshift(entry); // 每次覆盖 memory 文件前先保留上一版,写入失败时由底层工具恢复。 await atomicWriteFileWithHistory( @@ -79,6 +84,62 @@ export class MemoryStore { }); } + async list(scope: MemoryScope, key: string) { + return await this.readEntries(scope, key); + } + + async replace(scope: MemoryScope, key: string, targetId: string, draft: MemoryDraft) { + return this.serializeWrite(async () => { + const content = normalizeMemoryContent(draft.content); + if (!content) { + return { changed: false, detail: "content rejected by persistence policy" }; + } + const entries = await this.readEntries(scope, key); + const index = entries.findIndex((entry) => entry.id === targetId.trim()); + if (index === -1) { + return { changed: false, detail: "memory entry not found" }; + } + const duplicate = entries.find( + (entry, currentIndex) => currentIndex !== index && entry.content === content, + ); + if (duplicate) { + return { changed: false, detail: "replacement would duplicate an existing memory" }; + } + entries[index] = { + content, + id: entries[index]?.id ?? toStableId(scope, key, content.toLowerCase()), + }; + await atomicWriteFileWithHistory( + this.filePath(scope, key), + renderMemoryMarkdown(scope, entries), + { + historyDir: this.historyDir, + rootDir: this.baseDir, + }, + ); + return { changed: true, detail: "memory replaced" }; + }); + } + + async remove(scope: MemoryScope, key: string, targetId: string) { + return this.serializeWrite(async () => { + const entries = await this.readEntries(scope, key); + const next = entries.filter((entry) => entry.id !== targetId.trim()); + if (next.length === entries.length) { + return { changed: false, detail: "memory entry not found" }; + } + await atomicWriteFileWithHistory( + this.filePath(scope, key), + renderMemoryMarkdown(scope, next), + { + historyDir: this.historyDir, + rootDir: this.baseDir, + }, + ); + return { changed: true, detail: "memory removed" }; + }); + } + async buildPromptSnapshot(context: MemoryContext) { const [userMemory, workspaceMemory] = await Promise.all([ this.readEntries("user", context.actorKey), @@ -158,11 +219,25 @@ const parseMemoryMarkdown = (content: string): MemoryEntry[] => .split("\n") .map((line) => line.trim()) .filter((line) => line.startsWith("- ")) - .map((line) => ({ content: normalizeMemoryContent(line.slice(2)) })) + .map((line) => line.slice(2).trim()) + .map((line) => { + const match = line.match(/^\[([a-z0-9]{8,})\]\s+(.*)$/i); + if (match) { + return { + content: normalizeMemoryContent(match[2]), + id: match[1], + }; + } + const normalized = normalizeMemoryContent(line); + return { + content: normalized, + id: normalized ? toStableId("memory-entry", normalized.toLowerCase()) : "", + }; + }) .filter((entry) => entry.content); const renderMemoryMarkdown = (scope: MemoryScope, entries: MemoryEntry[]) => { const title = scope === "user" ? "# User Memory" : "# Workspace Memory"; - const bullets = entries.map((entry) => `- ${entry.content}`); + const bullets = entries.map((entry) => `- [${entry.id}] ${entry.content}`); return [title, "", ...bullets, ""].join("\n"); }; diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 60b1931..13999e9 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -2,6 +2,7 @@ import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; import { Router } from "express"; import { z } from "zod"; +import { type LearningOrchestrator } from "../learning/orchestrator.js"; import { logger } from "../logger.js"; import { MemoryStore } from "../memory/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; @@ -34,6 +35,7 @@ export const buildChatRouter = ( sessionBridge: ChatSessionBridge, runtime: OpencodeRuntimeAdapter, memoryStore: MemoryStore, + learningOrchestrator: LearningOrchestrator, ) => { const chatRouter = Router(); @@ -229,6 +231,11 @@ export const buildChatRouter = ( }); if (!streamResult.aborted && !streamResult.failed) { + const messages = await runtime.messages(binding.sessionId, 60); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const assistantText = collectTextContent(assistantMessage?.parts ?? []); const existingSessionTitle = sessionBridge.getSessionTitle(binding.sessionId); let sessionTitle = existingSessionTitle; const shouldGenerateTitle = @@ -251,6 +258,21 @@ export const buildChatRouter = ( ); } } + if (assistantText) { + void learningOrchestrator.onTurnCompleted({ + assistantMessage: assistantText, + model: parsed.data.model, + requestContext, + sessionId: binding.sessionId, + toolCallCount: streamResult.toolCallCount, + userMessage: parsed.data.message, + }).catch((error) => { + logger.warn( + { err: error, sessionId: binding.sessionId }, + "post-turn learning failed", + ); + }); + } } } finally { streamClosed = true; @@ -385,7 +407,11 @@ const streamPromptResponse = async ({ projectId, signal, write, -}: StreamPromptOptions): Promise<{ aborted: boolean; failed: boolean }> => { +}: StreamPromptOptions): Promise<{ + aborted: boolean; + failed: boolean; + toolCallCount: number; +}> => { const eventStream = await runtime.subscribeEvents(); const iterator = eventStream[Symbol.asyncIterator](); const requestStartedAt = Date.now(); @@ -396,6 +422,7 @@ const streamPromptResponse = async ({ const pendingPartTextDeltas = new Map(); const reasoningDeltas = new Map(); let emittedText = false; + let toolCallCount = 0; let done = false; let promptSettled = false; let aborted = signal?.aborted ?? false; @@ -624,6 +651,7 @@ const streamPromptResponse = async ({ (hasToolParams(toolParams) || isToolFinalState) ) { emittedToolParts.add(part.id); + toolCallCount += 1; if (!reason) { logger.warn( { @@ -704,11 +732,11 @@ const streamPromptResponse = async ({ await runtime.abortSession(opencodeSessionId).catch((error) => { logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); }); - return { aborted: true, failed: false }; + return { aborted: true, failed: false, toolCallCount }; } if (failed) { - return { aborted: false, failed: true }; + return { aborted: false, failed: true, toolCallCount }; } await promptPromise; @@ -735,7 +763,7 @@ const streamPromptResponse = async ({ session_id: clientSessionId, total_duration_ms: Math.max(0, Date.now() - requestStartedAt), }); - return { aborted: false, failed: false }; + return { aborted: false, failed: false, toolCallCount }; } finally { await iterator.return?.(undefined); if (!promptSettled) { @@ -1003,6 +1031,7 @@ const toolLabels: Record = { dynamic_http_call: "后端数据查询", fetch_result_ref: "结果引用回读", memory_manager: "记忆写入", + session_search: "历史会话检索", skill_manager: "流程沉淀", locate_features: "地图定位", view_history: "历史数据面板", diff --git a/src/server.ts b/src/server.ts index 6ee9ebe..1cbe423 100644 --- a/src/server.ts +++ b/src/server.ts @@ -2,20 +2,30 @@ import { randomUUID } from "node:crypto"; import cors from "cors"; import express from "express"; +import { SessionHistoryStore } from "./history/store.js"; import { ChatSessionBridge } from "./chat/sessionBridge.js"; import { config } from "./config.js"; import { logger } from "./logger.js"; +import { LearningOrchestrator } from "./learning/orchestrator.js"; import { MemoryStore } from "./memory/store.js"; import { ResultReferenceStore } from "./results/store.js"; import { buildChatRouter } from "./routes/chat.js"; import { opencodeRuntime } from "./runtime/opencode.js"; import { SessionRegistry } from "./session/registry.js"; +import { ToolSessionContextStore } from "./session/toolContextStore.js"; import { DynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js"; const app = express(); const registry = new SessionRegistry(config.SESSION_TTL_SECONDS); const sessionBridge = new ChatSessionBridge(registry, opencodeRuntime); const memoryStore = new MemoryStore(); +const sessionHistoryStore = new SessionHistoryStore(); +const toolContextStore = new ToolSessionContextStore(); +const learningOrchestrator = new LearningOrchestrator( + opencodeRuntime, + memoryStore, + sessionHistoryStore, +); const resultReferenceStore = new ResultReferenceStore(); const dynamicHttpExecutor = new DynamicHttpExecutor(resultReferenceStore); const internalToken = config.AGENT_INTERNAL_TOKEN ?? randomUUID(); @@ -126,13 +136,53 @@ app.post("/internal/tools/fetch-result-ref", async (req, res) => { res.json(result); }); +app.post("/internal/tools/session-search", async (req, res) => { + if (req.header("x-agent-internal-token") !== internalToken) { + res.status(403).json({ message: "forbidden" }); + return; + } + + const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const query = typeof req.body?.query === "string" ? req.body.query : ""; + const context = await toolContextStore.read(sessionId); + if (!context) { + res.status(404).json({ + message: "tool session context not found", + detail: sessionId, + }); + return; + } + if (!query.trim()) { + res.status(400).json({ message: "query is required" }); + return; + } + const hits = await sessionHistoryStore.search( + { + actorKey: context.actorKey, + projectKey: context.projectKey, + }, + query, + typeof req.body?.max_results === "number" ? req.body.max_results : undefined, + ); + res.json({ + hits, + query, + }); +}); + app.use( "/api/v1/agent/chat", - buildChatRouter(sessionBridge, opencodeRuntime, memoryStore), + buildChatRouter(sessionBridge, opencodeRuntime, memoryStore, learningOrchestrator), ); const bootstrap = async () => { - await Promise.all([memoryStore.initialize(), resultReferenceStore.initialize()]); + await Promise.all([ + learningOrchestrator.initialize(), + memoryStore.initialize(), + resultReferenceStore.initialize(), + sessionHistoryStore.initialize(), + toolContextStore.initialize(), + ]); resultReferenceStore.startCleanupLoop(); }; diff --git a/src/session/toolContextStore.ts b/src/session/toolContextStore.ts index 0dfa93e..b405a04 100644 --- a/src/session/toolContextStore.ts +++ b/src/session/toolContextStore.ts @@ -10,7 +10,9 @@ import { export type ToolSessionContext = { actorKey: string; + allowLearningWrite?: boolean; clientSessionId: string; + learningMode?: "interactive" | "review"; projectId?: string; projectKey: string; sessionId: string; diff --git a/src/skills/store.ts b/src/skills/store.ts new file mode 100644 index 0000000..b3d9870 --- /dev/null +++ b/src/skills/store.ts @@ -0,0 +1,279 @@ +import { dirname, join, posix } from "node:path"; + +import { config } from "../config.js"; +import { + atomicWriteFileWithHistory, + ensureDirectory, + listFiles, + readTextFile, + removeFileIfExists, + slugify, + toStableId, +} from "../utils/fileStore.js"; +import { + sanitizePersistentDocument, + sanitizePersistentLine, +} from "../utils/persistencePolicy.js"; + +const LEARNED_PATTERNS_MARKER = "## Learned Patterns"; +const SKILLS_ROOT_DIR = ".opencode/skills"; +const SKILLS_HISTORY_DIR = join(config.PERSISTENCE_HISTORY_DIR, "skills"); + +export type SkillPatternRecord = { + id: string; + content: string; +}; + +export class SkillStore { + private writeQueue: Promise = Promise.resolve(); + + async list(skillPath: string) { + const normalizedSkillPath = normalizeSkillPath(skillPath); + if (!normalizedSkillPath) { + return null; + } + const target = this.skillFilePath(normalizedSkillPath); + const current = (await readTextFile(target)) ?? defaultLearnedSkill(normalizedSkillPath); + return { + references: await this.listReferenceFiles(normalizedSkillPath), + skillPath: normalizedSkillPath, + target, + patterns: extractLearnedPatterns(current), + }; + } + + async appendPattern(skillPath: string, pattern: string) { + const normalizedSkillPath = normalizeSkillPath(skillPath); + if (!normalizedSkillPath) { + return { changed: false, detail: "invalid skill_path", target: "" }; + } + const sanitizedPattern = sanitizePersistentLine(pattern, 320); + if (!sanitizedPattern) { + return { changed: false, detail: "pattern rejected by persistence policy", target: "" }; + } + return this.serializeWrite(async () => { + const target = this.skillFilePath(normalizedSkillPath); + const current = (await readTextFile(target)) ?? defaultLearnedSkill(normalizedSkillPath); + const existingPatterns = extractLearnedPatterns(current); + if (existingPatterns.some((entry) => entry.content === sanitizedPattern)) { + return { changed: false, detail: "pattern already existed", target }; + } + const record: SkillPatternRecord = { + content: sanitizedPattern, + id: toStableId(normalizedSkillPath, sanitizedPattern.toLowerCase()), + }; + const next = current.includes(LEARNED_PATTERNS_MARKER) + ? current.replace( + LEARNED_PATTERNS_MARKER, + `${LEARNED_PATTERNS_MARKER}\n- [${record.id}] ${record.content}`, + ) + : `${current.trimEnd()}\n\n${LEARNED_PATTERNS_MARKER}\n- [${record.id}] ${record.content}\n`; + await ensureDirectory(join(SKILLS_ROOT_DIR, normalizedSkillPath)); + await atomicWriteFileWithHistory(target, next, { + historyDir: SKILLS_HISTORY_DIR, + rootDir: SKILLS_ROOT_DIR, + }); + return { changed: true, detail: "skill file updated", target }; + }); + } + + async removePattern(skillPath: string, targetId: string) { + const normalizedSkillPath = normalizeSkillPath(skillPath); + if (!normalizedSkillPath) { + return { changed: false, detail: "invalid skill_path", target: "" }; + } + return this.serializeWrite(async () => { + const target = this.skillFilePath(normalizedSkillPath); + const current = await readTextFile(target); + if (!current) { + return { changed: false, detail: "skill file not found", target }; + } + const patterns = extractLearnedPatterns(current); + const remaining = patterns.filter((entry) => entry.id !== targetId.trim()); + if (remaining.length === patterns.length) { + return { changed: false, detail: "pattern not found", target }; + } + const next = rewriteLearnedPatterns(current, remaining); + await atomicWriteFileWithHistory(target, next, { + historyDir: SKILLS_HISTORY_DIR, + rootDir: SKILLS_ROOT_DIR, + }); + return { changed: true, detail: "pattern removed", target }; + }); + } + + async writeReference(skillPath: string, filePath: string, content: string) { + const normalizedSkillPath = normalizeSkillPath(skillPath); + const normalizedReferencePath = normalizeReferencePath(filePath); + if (!normalizedSkillPath) { + return { changed: false, detail: "invalid skill_path", target: "" }; + } + if (!normalizedReferencePath) { + return { changed: false, detail: "invalid reference file_path", target: "" }; + } + const sanitizedContent = sanitizePersistentDocument(content, 5000); + if (!sanitizedContent) { + return { changed: false, detail: "reference content rejected by persistence policy", target: "" }; + } + return this.serializeWrite(async () => { + const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedReferencePath); + await ensureDirectory(dirname(target)); + await atomicWriteFileWithHistory(target, `${sanitizedContent}\n`, { + historyDir: SKILLS_HISTORY_DIR, + rootDir: SKILLS_ROOT_DIR, + }); + return { changed: true, detail: "reference written", target }; + }); + } + + async removeReference(skillPath: string, filePath: string) { + const normalizedSkillPath = normalizeSkillPath(skillPath); + const normalizedReferencePath = normalizeReferencePath(filePath); + if (!normalizedSkillPath) { + return { changed: false, detail: "invalid skill_path", target: "" }; + } + if (!normalizedReferencePath) { + return { changed: false, detail: "invalid reference file_path", target: "" }; + } + return this.serializeWrite(async () => { + const target = join(SKILLS_ROOT_DIR, normalizedSkillPath, normalizedReferencePath); + const previous = await readTextFile(target); + if (previous === null) { + return { changed: false, detail: "reference not found", target }; + } + await removeFileIfExists(target); + return { changed: true, detail: "reference removed", target }; + }); + } + + private async listReferenceFiles(skillPath: string) { + const referenceDir = join(SKILLS_ROOT_DIR, skillPath, "references"); + const files = await listFiles(referenceDir); + return files.map((file) => file.slice(referenceDir.length + 1)); + } + + private skillFilePath(skillPath: string) { + return join(SKILLS_ROOT_DIR, skillPath, "SKILL.md"); + } + + private async serializeWrite(task: () => Promise) { + const run = this.writeQueue.catch(() => undefined).then(task); + this.writeQueue = run.then( + () => undefined, + () => undefined, + ); + return run; + } +} + +export const normalizeSkillPath = (rawSkillPath: string) => { + const normalized = posix.normalize(rawSkillPath.trim().replace(/^\/+|\/+$/g, "")); + if (!normalized || normalized === "." || normalized.startsWith("..")) { + return null; + } + if (normalized === "SKILL.md" || normalized.endsWith("/SKILL.md")) { + return null; + } + if (!/^[a-z0-9._/-]+$/i.test(normalized)) { + return null; + } + return normalized; +}; + +const normalizeReferencePath = (rawFilePath: string) => { + const normalized = posix.normalize(rawFilePath.trim().replace(/^\/+|\/+$/g, "")); + if (!normalized || normalized.startsWith("..")) { + return null; + } + if (!normalized.startsWith("references/")) { + return null; + } + if (!normalized.endsWith(".md")) { + return null; + } + const segments = normalized.split("/"); + const last = segments.pop(); + if (!last) { + return null; + } + const stem = last.replace(/\.md$/i, ""); + const normalizedStem = slugify(stem); + return [...segments, `${normalizedStem}.md`].join("/"); +}; + +export const extractLearnedPatterns = (content: string): SkillPatternRecord[] => { + const section = extractLearnedPatternsSection(content); + if (!section) { + return []; + } + return section + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.startsWith("- ")) + .map((line) => line.slice(2).trim()) + .map((line) => { + const idMatch = line.match(/^\[([a-z0-9]{8,})\]\s+(.*)$/i); + if (idMatch) { + return { + content: idMatch[2], + id: idMatch[1], + }; + } + return { + content: line, + id: toStableId("skill-pattern", line.toLowerCase()), + }; + }) + .filter((entry) => entry.content); +}; + +const rewriteLearnedPatterns = (content: string, patterns: SkillPatternRecord[]) => { + const renderedSection = + patterns.length > 0 + ? `${LEARNED_PATTERNS_MARKER}\n${patterns.map((entry) => `- [${entry.id}] ${entry.content}`).join("\n")}` + : `${LEARNED_PATTERNS_MARKER}\n`; + if (!content.includes(LEARNED_PATTERNS_MARKER)) { + return `${content.trimEnd()}\n\n${renderedSection}\n`; + } + const markerIndex = content.indexOf(LEARNED_PATTERNS_MARKER); + const afterMarkerIndex = markerIndex + LEARNED_PATTERNS_MARKER.length; + const tail = content.slice(afterMarkerIndex); + const nextHeadingMatch = tail.match(/\n##\s+/); + const sectionEndOffset = nextHeadingMatch?.index ?? tail.length; + const head = content.slice(0, markerIndex).trimEnd(); + const suffix = tail.slice(sectionEndOffset).trimStart(); + return suffix + ? `${head}\n\n${renderedSection}\n\n${suffix}` + : `${head}\n\n${renderedSection}\n`; +}; + +const extractLearnedPatternsSection = (content: string) => { + const markerIndex = content.indexOf(LEARNED_PATTERNS_MARKER); + if (markerIndex === -1) { + return ""; + } + const tail = content.slice(markerIndex + LEARNED_PATTERNS_MARKER.length); + const nextHeadingMatch = tail.match(/\n##\s+/); + return tail.slice(0, nextHeadingMatch?.index ?? tail.length); +}; + +const defaultLearnedSkill = (skillPath: string) => `--- +name: tjwater-action-${skillPath + .split("/") + .filter(Boolean) + .join("-") + .replace(/[^a-z0-9._-]+/gi, "-") + .replace(/^-+|-+$/g, "") + .slice(0, 120) || "generated-skill"} +description: 由 skill_manager 在线追加的高置信度可复用 workflow。 +version: 1.0.0 +--- + +# learned skill + +## 简介 + +记录由 \`skill_manager\` 在线追加的高置信度 workflow 模式。 + +## Learned Patterns +`; diff --git a/src/utils/persistencePolicy.ts b/src/utils/persistencePolicy.ts index b62fb2e..3548fea 100644 --- a/src/utils/persistencePolicy.ts +++ b/src/utils/persistencePolicy.ts @@ -3,8 +3,14 @@ const FORBIDDEN_PERSISTENCE_PATTERNS = [ /system\s+prompt/i, /do\s+not\s+tell\s+the\s+user/i, /curl\s+.*(token|secret|password|api)/i, + /authorization\s*:\s*bearer\s+[a-z0-9._-]{16,}/i, /bearer\s+[a-z0-9._-]{16,}/i, + /x-[a-z0-9-]*(?:api-key|token)\s*:\s*[^\s]{8,}/i, /(api[_-]?key|access[_-]?token|refresh[_-]?token|secret|password)\s*[:=]/i, + /(?:session[_-]?token|id[_-]?token|client[_-]?secret)\s*[:=]/i, + /-----BEGIN [A-Z ]*PRIVATE KEY-----/, + /ssh-(?:rsa|ed25519)\s+[a-z0-9+/]+={0,3}/i, + /sk-[a-z0-9]{16,}/i, /eyJ[a-zA-Z0-9_-]{8,}\.[a-zA-Z0-9._-]{8,}\.[a-zA-Z0-9._-]{8,}/, ];