新增 memory 和 skill 存储,实现 Agent 持续学习,并增加工具支持;增加 LLM progress detail 输出

This commit is contained in:
2026-05-11 16:12:20 +08:00
parent a27c45910c
commit 5fbe8ae40c
16 changed files with 1411 additions and 129 deletions
+276 -58
View File
@@ -3,6 +3,7 @@ import { Router } from "express";
import { z } from "zod";
import { logger } from "../logger.js";
import { MemoryStore } from "../memory/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js";
@@ -24,6 +25,7 @@ const forkPayloadSchema = z.object({
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
memoryStore: MemoryStore,
) => {
const chatRouter = Router();
@@ -44,12 +46,14 @@ export const buildChatRouter = (
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const binding = await sessionBridge.abort({
clientSessionId: parsed.data.session_id,
accessToken,
projectId,
traceId,
userId,
});
if (!binding) {
@@ -97,6 +101,7 @@ export const buildChatRouter = (
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const { binding, requestContext } = await sessionBridge.fork({
clientSessionId: parsed.data.session_id,
@@ -104,6 +109,7 @@ export const buildChatRouter = (
projectId,
traceId,
keepMessageCount: parsed.data.keep_message_count,
userId,
});
logger.info(
@@ -148,12 +154,14 @@ export const buildChatRouter = (
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const { binding, requestContext, created } = await sessionBridge.resolve({
clientSessionId: parsed.data.session_id,
accessToken,
projectId,
traceId,
userId,
});
logger.info(
@@ -175,12 +183,6 @@ export const buildChatRouter = (
res.flushHeaders?.();
const clientSessionId = requestContext.clientSessionId;
const existingSessionTitle = sessionBridge.getSessionTitle(binding.sessionId);
const sessionTitle = existingSessionTitle
?? (await generateSessionTitle(runtime, parsed.data.message));
if (!existingSessionTitle) {
sessionBridge.setSessionTitle(binding.sessionId, sessionTitle);
}
let streamClosed = false;
const abortController = new AbortController();
const handleClientClose = () => {
@@ -193,18 +195,18 @@ export const buildChatRouter = (
req.on("close", handleClientClose);
res.on("close", handleClientClose);
try {
res.write(
toSse("session_title", {
session_id: clientSessionId,
title: sessionTitle,
}),
try {
const preparedMessage = await buildPromptWithLearningContext(
memoryStore,
requestContext.actorKey,
requestContext.projectKey,
parsed.data.message,
);
await streamPromptResponse({
const streamResult = await streamPromptResponse({
runtime,
opencodeSessionId: binding.sessionId,
clientSessionId,
message: parsed.data.message,
message: preparedMessage,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
signal: abortController.signal,
@@ -215,6 +217,32 @@ export const buildChatRouter = (
res.write(toSse(event, data));
},
});
if (!streamResult.aborted && !streamResult.failed) {
const existingSessionTitle = sessionBridge.getSessionTitle(binding.sessionId);
let sessionTitle = existingSessionTitle;
const shouldGenerateTitle =
!existingSessionTitle &&
(await isFirstRoundConversation(runtime, binding.sessionId));
if (shouldGenerateTitle) {
sessionTitle = await generateSessionTitle(runtime, {
sessionId: binding.sessionId,
latestUserMessage: parsed.data.message,
});
sessionBridge.setSessionTitle(binding.sessionId, sessionTitle);
}
if (!streamClosed && !res.writableEnded && !res.destroyed) {
if (shouldGenerateTitle && sessionTitle) {
res.write(
toSse("session_title", {
session_id: clientSessionId,
title: sessionTitle,
}),
);
}
res.write(toSse("done", { session_id: clientSessionId }));
}
}
} finally {
streamClosed = true;
req.off("close", handleClientClose);
@@ -322,16 +350,18 @@ const streamPromptResponse = async ({
projectId,
signal,
write,
}: StreamPromptOptions) => {
}: StreamPromptOptions): Promise<{ aborted: boolean; failed: boolean }> => {
const eventStream = await runtime.subscribeEvents();
const iterator = eventStream[Symbol.asyncIterator]();
const emittedToolParts = new Set<string>();
const partTypes = new Map<string, Part["type"]>();
const pendingTextDeltas = new Map<string, string[]>();
const pendingPartTextDeltas = new Map<string, string[]>();
const reasoningDeltas = new Map<string, string[]>();
let emittedText = false;
let done = false;
let promptSettled = false;
let aborted = signal?.aborted ?? false;
let failed = false;
const abortPromise = signal
? new Promise<{ type: "abort" }>((resolve) => {
@@ -351,6 +381,7 @@ const streamPromptResponse = async ({
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。",
});
const promptPromise = runtime
@@ -418,6 +449,7 @@ const streamPromptResponse = async ({
: event.properties.status.type === "busy"
? "Agent 正在处理请求"
: "Agent 已空闲",
detail: buildSessionStatusDetail(event.properties.status),
});
continue;
}
@@ -447,10 +479,14 @@ const streamPromptResponse = async ({
session_id: clientSessionId,
content: event.properties.delta,
});
} else if (!partType) {
const pending = pendingTextDeltas.get(event.properties.partID) ?? [];
} else if (partType === "reasoning") {
const pending = reasoningDeltas.get(event.properties.partID) ?? [];
pending.push(event.properties.delta);
pendingTextDeltas.set(event.properties.partID, pending);
reasoningDeltas.set(event.properties.partID, pending);
} else if (!partType) {
const pending = pendingPartTextDeltas.get(event.properties.partID) ?? [];
pending.push(event.properties.delta);
pendingPartTextDeltas.set(event.properties.partID, pending);
}
continue;
}
@@ -459,8 +495,8 @@ const streamPromptResponse = async ({
const part = event.properties.part;
partTypes.set(part.id, part.type);
if (part.type === "text") {
const pending = pendingTextDeltas.get(part.id) ?? [];
pendingTextDeltas.delete(part.id);
const pending = pendingPartTextDeltas.get(part.id) ?? [];
pendingPartTextDeltas.delete(part.id);
for (const content of pending) {
emittedText = true;
write("token", {
@@ -469,13 +505,23 @@ const streamPromptResponse = async ({
});
}
} else if (part.type === "reasoning") {
pendingTextDeltas.delete(part.id);
const pending = pendingPartTextDeltas.get(part.id) ?? [];
if (pending.length > 0) {
const existing = reasoningDeltas.get(part.id) ?? [];
reasoningDeltas.set(part.id, existing.concat(pending));
}
pendingPartTextDeltas.delete(part.id);
const reasoningDetail = buildReasoningProgressDetail(
reasoningDeltas.get(part.id) ?? [],
part.time.end,
);
write("progress", {
session_id: clientSessionId,
id: part.id,
phase: "planning",
status: part.time.end ? "completed" : "running",
title: part.time.end ? "分析规划完成" : "正在规划分析步骤",
detail: reasoningDetail,
});
}
if (part.type === "tool") {
@@ -490,7 +536,13 @@ const streamPromptResponse = async ({
phase: "tool",
status: normalizeToolStatus(part.state.status),
title: getToolProgressTitle(part.tool, part.state.status),
detail: part.state.status === "error" ? part.state.error : undefined,
detail: buildToolProgressDetail(
part.tool,
part.state.status,
toolParams,
reason,
part.state.status === "error" ? part.state.error : undefined,
),
});
if (
!emittedToolParts.has(part.id) &&
@@ -556,6 +608,7 @@ const streamPromptResponse = async ({
: "opencode session error",
detail: event.properties.error?.name,
});
failed = true;
done = true;
continue;
}
@@ -567,6 +620,7 @@ const streamPromptResponse = async ({
phase: "session",
status: "completed",
title: "Agent 已完成处理",
detail: "当前会话已无待执行任务,正在收尾并准备返回最终结果。",
});
done = true;
}
@@ -576,7 +630,11 @@ const streamPromptResponse = async ({
await runtime.abortSession(opencodeSessionId).catch((error) => {
logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session");
});
return;
return { aborted: true, failed: false };
}
if (failed) {
return { aborted: false, failed: true };
}
await promptPromise;
@@ -589,6 +647,7 @@ const streamPromptResponse = async ({
phase: "start",
status: "completed",
title: "请求处理完成",
detail: "本次请求的分析、工具执行和结果整理流程已经完成。",
});
write("progress", {
session_id: clientSessionId,
@@ -596,8 +655,11 @@ const streamPromptResponse = async ({
phase: "complete",
status: "completed",
title: "分析完成",
detail: emittedText
? "最终回答已生成并推送到前端。"
: "已完成分析,并通过兜底消息补发最终回答内容。",
});
write("done", { session_id: clientSessionId });
return { aborted: false, failed: false };
} finally {
await iterator.return?.(undefined);
if (!promptSettled) {
@@ -645,6 +707,97 @@ const normalizeToolStatus = (status: string) => {
return "running";
};
const formatProgressValue = (value: unknown): string => {
if (typeof value === "string") {
return value.length > 120 ? `${value.slice(0, 117)}...` : value;
}
if (
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value === undefined
) {
return String(value);
}
try {
const serialized = JSON.stringify(value);
return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized;
} catch {
return "[unserializable]";
}
};
const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim();
const truncateProgressText = (text: string, maxLength: number) =>
text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text;
const summarizeToolParams = (params: Record<string, unknown>) => {
const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]);
const summary = Object.entries(params)
.filter(([key]) => !ignoredKeys.has(key))
.slice(0, 4)
.map(([key, value]) => `${key}=${formatProgressValue(value)}`)
.join(", ");
return summary || "无附加参数";
};
const buildSessionStatusDetail = (status: { type: string; message?: string }) => {
if (status.type === "retry") {
return status.message
? `模型请求需要重试,原因:${status.message}`
: "模型请求正在重试,等待下一次响应。";
}
if (status.type === "busy") {
return status.message
? `Agent 正在处理中:${status.message}`
: "Agent 正在执行推理、工具调用或结果整理。";
}
if (status.type === "idle") {
return status.message
? `Agent 已空闲:${status.message}`
: "当前会话暂时没有待处理任务。";
}
return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`;
};
const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => {
const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800);
if (ended) {
return reasoningText
? `推理过程:${reasoningText}`
: "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。";
}
return reasoningText
? `正在推理:${reasoningText}`
: "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。";
};
const buildToolProgressDetail = (
tool: string,
status: string,
params: Record<string, unknown>,
reason: string,
error?: string,
) => {
const toolName = toolLabels[tool] ?? tool;
const reasonText = reason ? `;调用原因:${reason}` : "";
const paramsText = `;关键参数:${summarizeToolParams(params)}`;
if (status === "error") {
const errorText = error ? `;错误:${error}` : "";
return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`;
}
if (status === "completed") {
return `${toolName} 已执行完成${reasonText}${paramsText}`;
}
if (status === "pending") {
return `${toolName} 已进入待执行状态${reasonText}${paramsText}`;
}
return `${toolName} 正在执行${reasonText}${paramsText}`;
};
const getToolProgressTitle = (tool: string, status: string) => {
const toolName = toolLabels[tool] ?? tool;
if (status === "completed") return `${toolName} 已完成`;
@@ -665,51 +818,100 @@ const TITLE_PROMPT_TIMEOUT_MS = 2500;
const generateSessionTitle = async (
runtime: OpencodeRuntimeAdapter,
userMessage: string,
options: {
sessionId: string;
latestUserMessage: string;
fallbackTitle?: string;
},
) => {
const fallback = buildSessionTitle(userMessage);
const normalized = userMessage.replace(/\s+/g, " ").trim();
if (!normalized) {
return fallback;
}
const fallback = options.fallbackTitle?.trim() || buildSessionTitle(options.latestUserMessage);
let titleSessionId: string | undefined;
try {
const conversation = await buildTitleConversationContext(runtime, options.sessionId);
if (!conversation) {
return fallback;
}
const titleSession = await runtime.createSession(`title-${Date.now().toString(36)}`);
const request = runtime
.prompt(
titleSession.id,
[
"你是会话标题生成器。",
"请根据用户问题生成一个 8-16 字中文标题。",
"要求:简洁、可读、避免标点、不要引号、不要解释。",
"只输出标题本身。",
`用户问题:${normalized}`,
].join("\n"),
)
.then(async () => {
const messages = await runtime.messages(titleSession.id, 20);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const title = collectTextContent(assistantMessage?.parts ?? []);
return normalizeGeneratedTitle(title, fallback);
const titleSession = await runtime.createSession(`title-${Date.now().toString(36)}`);
titleSessionId = titleSession.id;
const request = runtime
.prompt(
titleSession.id,
[
"你是会话标题生成器。",
"请根据用户问题生成一个 8-16 字中文标题。",
"要求:简洁、可读、避免标点、不要引号、不要解释。",
"请优先概括最近这轮对话的核心任务或结论。",
"只输出标题本身。",
"",
conversation,
].join("\n"),
)
.then(async () => {
const messages = await runtime.messages(titleSession.id, 20);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const title = collectTextContent(assistantMessage?.parts ?? []);
return normalizeGeneratedTitle(title, fallback);
});
const timeout = new Promise<string>((resolve) => {
setTimeout(() => resolve(fallback), TITLE_PROMPT_TIMEOUT_MS);
});
const timeout = new Promise<string>((resolve) => {
setTimeout(() => resolve(fallback), TITLE_PROMPT_TIMEOUT_MS);
});
try {
return await Promise.race([request, timeout]);
} catch (error) {
logger.warn({ err: error }, "failed to generate session title, using fallback");
return fallback;
} finally {
await runtime.abortSession(titleSession.id).catch((error) => {
logger.debug({ sessionId: titleSession.id, err: error }, "failed to cleanup title session");
});
if (titleSessionId) {
await runtime.abortSession(titleSessionId).catch((error) => {
logger.debug({ sessionId: titleSessionId, err: error }, "failed to cleanup title session");
});
}
}
};
const buildTitleConversationContext = async (
runtime: OpencodeRuntimeAdapter,
sessionId: string,
) => {
const messages = await runtime.messages(sessionId, 12);
const recentMessages = messages
.filter(
(message) =>
message.info.role === "user" || message.info.role === "assistant",
)
.map((message) => ({
role: message.info.role,
content: collectTextContent(message.parts).replace(/\s+/g, " ").trim(),
}))
.filter((message) => message.content.length > 0)
.slice(-6);
if (recentMessages.length === 0) {
return "";
}
return recentMessages
.map((message) => `${message.role === "user" ? "用户" : "助手"}${message.content}`)
.join("\n")
.slice(0, 2400);
};
const isFirstRoundConversation = async (
runtime: OpencodeRuntimeAdapter,
sessionId: string,
) => {
const messages = await runtime.messages(sessionId, 12);
const chatMessageCount = messages.filter(
(message) =>
message.info.role === "user" || message.info.role === "assistant",
).length;
return chatMessageCount === 2;
};
const normalizeGeneratedTitle = (rawTitle: string, fallback: string) => {
const normalized = rawTitle
.replace(/\s+/g, " ")
@@ -723,8 +925,24 @@ const normalizeGeneratedTitle = (rawTitle: string, fallback: string) => {
const toolLabels: Record<string, string> = {
dynamic_http_call: "后端数据查询",
fetch_result_ref: "结果引用回读",
memory_manager: "记忆写入",
skill_manager: "流程沉淀",
locate_features: "地图定位",
view_history: "历史数据面板",
view_scada: "SCADA 面板",
show_chart: "图表渲染",
};
const buildPromptWithLearningContext = async (
memoryStore: MemoryStore,
actorKey: string,
projectKey: string,
message: string,
) => {
const snapshot = await memoryStore.buildPromptSnapshot({ actorKey, projectKey });
if (!snapshot) {
return message;
}
return `${snapshot}\n\n[Current user request]\n${message}`;
};