From 872570ac3af48318cd5fd42844bce3a921d5abbe Mon Sep 17 00:00:00 2001 From: Huarch Date: Wed, 20 May 2026 16:50:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=20chat.ts=20=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E6=98=8E=E7=A1=AE=E5=8A=9F=E8=83=BD=E8=BE=B9?= =?UTF-8?q?=E7=95=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/routes/chat.ts | 795 +------------------------------------- src/routes/chatSession.ts | 144 +++++++ src/routes/chatStream.ts | 786 +++++++++++++++++++++++++++++++++++++ 3 files changed, 947 insertions(+), 778 deletions(-) create mode 100644 src/routes/chatSession.ts create mode 100644 src/routes/chatStream.ts diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 72b3af3..f4fa0b0 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -1,4 +1,3 @@ -import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; import { Router } from "express"; import { z } from "zod"; @@ -8,15 +7,18 @@ import { MemoryStore } from "../memory/store.js"; import { type ResultReferenceStore } from "../results/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; import { type ChatSessionBridge } from "../chat/sessionBridge.js"; -import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js"; import { toActorKey } from "../utils/fileStore.js"; - -const supportedModels = [ - "deepseek/deepseek-v4-flash", - "deepseek/deepseek-v4-pro", -] as const; - -type SupportedModel = (typeof supportedModels)[number]; +import { + buildPromptWithLearningContext, + generateSessionTitle, + getConversationTurnStats, +} from "./chatSession.js"; +import { + collectTextContent, + streamPromptResponse, + supportedModels, + type SupportedModel, +} from "./chatStream.js"; const payloadSchema = z.object({ message: z.string().min(1).max(10000), @@ -246,12 +248,12 @@ export const buildChatRouter = ( req.on("close", handleClientClose); res.on("close", handleClientClose); - try { - const preparedMessage = await buildPromptWithLearningContext( - memoryStore, - requestContext.actorKey, - requestContext.projectKey, - parsed.data.message, + try { + const preparedMessage = await buildPromptWithLearningContext( + memoryStore, + requestContext.actorKey, + requestContext.projectKey, + parsed.data.message, ); const streamResult = await streamPromptResponse({ runtime, @@ -347,766 +349,3 @@ export const buildChatRouter = ( const toSse = (event: string, data: Record) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; - -const getErrorMessage = (error: { - name: string; - data?: { message?: string }; -}) => error.data?.message ?? error.name; - -const isObjectRecord = (value: unknown): value is Record => - typeof value === "object" && value !== null && !Array.isArray(value); - -const normalizeToolParams = (value: unknown): Record => { - if (isObjectRecord(value)) { - return value; - } - if (typeof value === "string") { - try { - const parsed = JSON.parse(value) as unknown; - return isObjectRecord(parsed) ? parsed : {}; - } catch { - return {}; - } - } - return {}; -}; - -const extractRequestReason = (params: Record) => { - const candidates = ["reason", "request_reason", "why", "purpose", "rationale"]; - for (const key of candidates) { - const value = params[key]; - if (typeof value === "string") { - const normalized = value.trim(); - if (normalized) { - return normalized; - } - } - } - return ""; -}; - -const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill"); - -const extractSkillAuditInfo = (event: OpencodeEvent) => { - const payload = isObjectRecord(event.properties) - ? (event.properties as Record) - : {}; - const candidateName = - typeof payload.skill === "string" - ? payload.skill - : typeof payload.skillName === "string" - ? payload.skillName - : typeof payload.name === "string" - ? payload.name - : event.type; - const reason = extractRequestReason(payload); - return { - name: candidateName, - reason, - payload, - }; -}; - -const hasToolParams = (params: Record) => - Object.keys(params).length > 0; - -const toRuntimeModel = (model?: SupportedModel) => { - if (!model) { - return undefined; - } - const [providerID, modelID] = model.split("/"); - if (!providerID || !modelID) { - return undefined; - } - return { - providerID, - modelID, - }; -}; - -type StreamPromptOptions = { - runtime: OpencodeRuntimeAdapter; - opencodeSessionId: string; - clientSessionId: string; - message: string; - model?: SupportedModel; - traceId?: string; - projectId?: string; - signal?: AbortSignal; - write: (event: string, data: Record) => void; -}; - -type ProgressStatus = "running" | "completed" | "error"; - -type ProgressPayload = { - id: string; - phase: string; - status: ProgressStatus; - title: string; - detail?: string; -}; - -const streamPromptResponse = async ({ - runtime, - opencodeSessionId, - clientSessionId, - message, - model, - traceId, - projectId, - signal, - write, -}: StreamPromptOptions): Promise<{ - aborted: boolean; - failed: boolean; - toolCallCount: number; -}> => { - const eventStream = await runtime.subscribeEvents(); - const iterator = eventStream[Symbol.asyncIterator](); - const requestStartedAt = Date.now(); - const progressStartedAtMap = new Map(); - const finalizedProgressIds = new Set(); - const emittedToolParts = new Set(); - const partTypes = new Map(); - const pendingPartTextDeltas = new Map(); - const reasoningDeltas = new Map(); - let emittedText = false; - let toolCallCount = 0; - let done = false; - let promptSettled = false; - let aborted = signal?.aborted ?? false; - let failed = false; - - const abortPromise = signal - ? new Promise<{ type: "abort" }>((resolve) => { - if (signal.aborted) { - resolve({ type: "abort" }); - return; - } - signal.addEventListener("abort", () => resolve({ type: "abort" }), { - once: true, - }); - }) - : null; - - const emitProgress = ({ id, phase, status, title, detail }: ProgressPayload) => { - if (status === "running" && finalizedProgressIds.has(id)) { - return; - } - - const now = Date.now(); - const startedAt = progressStartedAtMap.get(id) ?? now; - if (!progressStartedAtMap.has(id)) { - progressStartedAtMap.set(id, startedAt); - } - - if (status === "running") { - write("progress", { - session_id: clientSessionId, - id, - phase, - status, - title, - detail, - started_at: startedAt, - elapsed_ms: Math.max(0, now - startedAt), - }); - return; - } - - const durationMs = Math.max(0, now - startedAt); - finalizedProgressIds.add(id); - progressStartedAtMap.delete(id); - write("progress", { - session_id: clientSessionId, - id, - phase, - status, - title, - detail, - started_at: startedAt, - ended_at: now, - duration_ms: durationMs, - }); - }; - - emitProgress({ - id: "request-received", - phase: "start", - status: "running", - title: "已收到请求,正在启动 Agent 分析", - detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。", - }); - - const promptPromise = runtime - .prompt(opencodeSessionId, message, toRuntimeModel(model)) - .then(() => { - promptSettled = true; - }) - .catch((error: unknown) => { - promptSettled = true; - throw error; - }); - - try { - while (!done) { - if (signal?.aborted) { - aborted = true; - break; - } - - const nextEvent = iterator - .next() - .then((result) => ({ type: "event" as const, result })); - const nextPrompt = promptSettled - ? null - : promptPromise.then( - () => ({ type: "prompt" as const }), - (error: unknown) => ({ type: "prompt-error" as const, error }), - ); - const next = await Promise.race( - [ - ...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]), - ...(abortPromise ? [abortPromise] : []), - ], - ); - - if (next.type === "abort") { - aborted = true; - break; - } - - if (next.type === "prompt-error") { - throw next.error; - } - if (next.type === "prompt") { - continue; - } - if (next.result.done) { - break; - } - - const event = next.result.value as OpencodeEvent; - if (!isSessionEvent(event, opencodeSessionId)) { - continue; - } - - if (event.type === "session.status") { - emitProgress({ - id: "session-status", - phase: "session", - status: event.properties.status.type === "idle" ? "completed" : "running", - title: - event.properties.status.type === "retry" - ? `模型请求重试中:${event.properties.status.message}` - : event.properties.status.type === "busy" - ? "Agent 正在处理请求" - : "Agent 已空闲", - detail: buildSessionStatusDetail(event.properties.status), - }); - continue; - } - - if (isSkillEvent(event)) { - const { name, reason, payload } = extractSkillAuditInfo(event); - void writeLlmRequestAuditLog({ - kind: "skill", - sessionId: opencodeSessionId, - clientSessionId, - traceId, - projectId, - target: name, - reason, - reasonProvided: Boolean(reason), - payload, - }).catch((error) => { - logger.warn({ err: error }, "failed to write skill audit log"); - }); - } - - if (event.type === "message.part.delta" && event.properties.field === "text") { - const partType = partTypes.get(event.properties.partID); - if (partType === "text") { - emittedText = true; - write("token", { - session_id: clientSessionId, - content: event.properties.delta, - }); - } else if (partType === "reasoning") { - const pending = reasoningDeltas.get(event.properties.partID) ?? []; - pending.push(event.properties.delta); - reasoningDeltas.set(event.properties.partID, pending); - } else if (!partType) { - const pending = pendingPartTextDeltas.get(event.properties.partID) ?? []; - pending.push(event.properties.delta); - pendingPartTextDeltas.set(event.properties.partID, pending); - } - continue; - } - - if (event.type === "message.part.updated") { - const part = event.properties.part; - partTypes.set(part.id, part.type); - if (part.type === "text") { - const pending = pendingPartTextDeltas.get(part.id) ?? []; - pendingPartTextDeltas.delete(part.id); - for (const content of pending) { - emittedText = true; - write("token", { - session_id: clientSessionId, - content, - }); - } - } else if (part.type === "reasoning") { - const pending = pendingPartTextDeltas.get(part.id) ?? []; - if (pending.length > 0) { - const existing = reasoningDeltas.get(part.id) ?? []; - reasoningDeltas.set(part.id, existing.concat(pending)); - } - pendingPartTextDeltas.delete(part.id); - const reasoningDetail = buildReasoningProgressDetail( - reasoningDeltas.get(part.id) ?? [], - part.time.end, - ); - emitProgress({ - id: part.id, - phase: "planning", - status: part.time.end ? "completed" : "running", - title: part.time.end ? "分析规划完成" : "正在规划分析步骤", - detail: reasoningDetail, - }); - } - if (part.type === "tool") { - const toolParams = normalizeToolParams(part.state.input); - const reason = extractRequestReason(toolParams); - const isToolFinalState = - part.state.status === "completed" || part.state.status === "error"; - - emitProgress({ - id: part.id, - phase: "tool", - status: normalizeToolStatus(part.state.status), - title: getToolProgressTitle(part.tool, part.state.status), - detail: buildToolProgressDetail( - part.tool, - part.state.status, - toolParams, - reason, - part.state.status === "error" ? part.state.error : undefined, - ), - }); - if ( - !emittedToolParts.has(part.id) && - (hasToolParams(toolParams) || isToolFinalState) - ) { - emittedToolParts.add(part.id); - toolCallCount += 1; - if (!reason) { - logger.warn( - { - tool: part.tool, - sessionId: opencodeSessionId, - clientSessionId, - }, - "llm tool request missing reason", - ); - } - void writeLlmRequestAuditLog({ - kind: "tool", - sessionId: opencodeSessionId, - clientSessionId, - traceId, - projectId, - target: part.tool, - reason, - reasonProvided: Boolean(reason), - payload: toolParams, - }).catch((error) => { - logger.warn({ err: error }, "failed to write tool audit log"); - }); - write("tool_call", { - session_id: clientSessionId, - tool: part.tool, - params: toolParams, - reason, - }); - } - } - continue; - } - - if (event.type === "todo.updated") { - const completed = event.properties.todos.filter( - (todo) => todo.status === "completed", - ).length; - emitProgress({ - id: "todo-progress", - phase: "planning", - status: completed === event.properties.todos.length ? "completed" : "running", - title: `计划进度 ${completed}/${event.properties.todos.length}`, - detail: event.properties.todos - .map((todo) => `${todo.status}: ${todo.content}`) - .join("\n"), - }); - continue; - } - - if (event.type === "session.error") { - write("error", { - session_id: clientSessionId, - message: event.properties.error - ? getErrorMessage(event.properties.error) - : "opencode session error", - detail: event.properties.error?.name, - total_duration_ms: Math.max(0, Date.now() - requestStartedAt), - }); - failed = true; - done = true; - continue; - } - - if (event.type === "session.idle") { - emitProgress({ - id: "session-status", - phase: "session", - status: "completed", - title: "Agent 已完成处理", - detail: "当前会话已无待执行任务,正在收尾并准备返回最终结果。", - }); - done = true; - } - } - - if (aborted) { - await runtime.abortSession(opencodeSessionId).catch((error) => { - logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); - }); - return { aborted: true, failed: false, toolCallCount }; - } - - if (failed) { - return { aborted: false, failed: true, toolCallCount }; - } - - await promptPromise; - if (!emittedText) { - await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write); - } - emitProgress({ - id: "request-received", - phase: "start", - status: "completed", - title: "请求处理完成", - detail: "本次请求的分析、工具执行和结果整理流程已经完成。", - }); - emitProgress({ - id: "request-completed", - phase: "complete", - status: "completed", - title: "分析完成", - detail: emittedText - ? "最终回答已生成并推送到前端。" - : "已完成分析,并通过兜底消息补发最终回答内容。", - }); - write("done", { - session_id: clientSessionId, - total_duration_ms: Math.max(0, Date.now() - requestStartedAt), - }); - return { aborted: false, failed: false, toolCallCount }; - } finally { - await iterator.return?.(undefined); - if (!promptSettled) { - await promptPromise.catch(() => undefined); - } - } -}; - -const isSessionEvent = (event: OpencodeEvent, sessionId: string) => - "properties" in event && - typeof event.properties === "object" && - event.properties !== null && - "sessionID" in event.properties && - event.properties.sessionID === sessionId; - -const emitFallbackMessage = async ( - runtime: OpencodeRuntimeAdapter, - opencodeSessionId: string, - clientSessionId: string, - write: (event: string, data: Record) => void, -) => { - const messages = await runtime.messages(opencodeSessionId); - const assistantMessage = [...messages] - .reverse() - .find((message) => message.info.role === "assistant"); - const parts = assistantMessage?.parts ?? []; - const text = collectTextContent(parts); - if (text) { - write("token", { - session_id: clientSessionId, - content: text, - }); - } -}; - -const collectTextContent = (parts: Part[]) => - parts - .filter((part): part is Extract => part.type === "text") - .map((part) => part.text) - .join(""); - -const normalizeToolStatus = (status: string) => { - if (status === "completed") return "completed"; - if (status === "error") return "error"; - return "running"; -}; - -const formatProgressValue = (value: unknown): string => { - if (typeof value === "string") { - return value.length > 120 ? `${value.slice(0, 117)}...` : value; - } - if ( - typeof value === "number" || - typeof value === "boolean" || - value === null || - value === undefined - ) { - return String(value); - } - try { - const serialized = JSON.stringify(value); - return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized; - } catch { - return "[unserializable]"; - } -}; - -const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim(); - -const truncateProgressText = (text: string, maxLength: number) => - text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text; - -const summarizeToolParams = (params: Record) => { - const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]); - const summary = Object.entries(params) - .filter(([key]) => !ignoredKeys.has(key)) - .slice(0, 4) - .map(([key, value]) => `${key}=${formatProgressValue(value)}`) - .join(", "); - - return summary || "无附加参数"; -}; - -const buildSessionStatusDetail = (status: { type: string; message?: string }) => { - if (status.type === "retry") { - return status.message - ? `模型请求需要重试,原因:${status.message}` - : "模型请求正在重试,等待下一次响应。"; - } - if (status.type === "busy") { - return status.message - ? `Agent 正在处理中:${status.message}` - : "Agent 正在执行推理、工具调用或结果整理。"; - } - if (status.type === "idle") { - return status.message - ? `Agent 已空闲:${status.message}` - : "当前会话暂时没有待处理任务。"; - } - return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`; -}; - -const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => { - const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800); - if (ended) { - return reasoningText - ? `推理过程:${reasoningText}` - : "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。"; - } - return reasoningText - ? `正在推理:${reasoningText}` - : "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。"; -}; - -const buildToolProgressDetail = ( - tool: string, - status: string, - params: Record, - reason: string, - error?: string, -) => { - const toolName = toolLabels[tool] ?? tool; - const reasonText = reason ? `;调用原因:${reason}` : ""; - const paramsText = `;关键参数:${summarizeToolParams(params)}`; - - if (status === "error") { - const errorText = error ? `;错误:${error}` : ""; - return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`; - } - if (status === "completed") { - return `${toolName} 已执行完成${reasonText}${paramsText}`; - } - if (status === "pending") { - return `${toolName} 已进入待执行状态${reasonText}${paramsText}`; - } - return `${toolName} 正在执行${reasonText}${paramsText}`; -}; - -const getToolProgressTitle = (tool: string, status: string) => { - const toolName = toolLabels[tool] ?? tool; - if (status === "completed") return `${toolName} 已完成`; - if (status === "error") return `${toolName} 执行失败`; - if (status === "pending") return `准备调用 ${toolName}`; - return `正在调用 ${toolName}`; -}; - -const buildSessionTitle = (message: string) => { - const normalized = message.replace(/\s+/g, " ").trim(); - if (!normalized) { - return "新对话"; - } - return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; -}; - -const TITLE_PROMPT_TIMEOUT_MS = 2500; - -const generateSessionTitle = async ( - runtime: OpencodeRuntimeAdapter, - options: { - sessionId: string; - latestUserMessage: string; - fallbackTitle?: string; - }, -) => { - const fallback = options.fallbackTitle?.trim() || buildSessionTitle(options.latestUserMessage); - let titleSessionId: string | undefined; - try { - const conversation = await buildTitleConversationContext(runtime, options.sessionId); - if (!conversation) { - return fallback; - } - - const titleSession = await runtime.createSession(`title-${Date.now().toString(36)}`); - titleSessionId = titleSession.id; - const request = runtime - .prompt( - titleSession.id, - [ - "你是会话标题生成器。", - "请根据用户问题生成一个 8-16 字中文标题。", - "要求:简洁、可读、避免标点、不要引号、不要解释。", - "请优先概括最近这轮对话的核心任务或结论。", - "只输出标题本身。", - "", - conversation, - ].join("\n"), - ) - .then(async () => { - const messages = await runtime.messages(titleSession.id, 20); - const assistantMessage = [...messages] - .reverse() - .find((message) => message.info.role === "assistant"); - const title = collectTextContent(assistantMessage?.parts ?? []); - return normalizeGeneratedTitle(title, fallback); - }); - - const timeout = new Promise((resolve) => { - setTimeout(() => resolve(fallback), TITLE_PROMPT_TIMEOUT_MS); - }); - - return await Promise.race([request, timeout]); - } catch (error) { - logger.warn({ err: error }, "failed to generate session title, using fallback"); - return fallback; - } finally { - if (titleSessionId) { - await runtime.abortSession(titleSessionId).catch((error) => { - logger.debug({ sessionId: titleSessionId, err: error }, "failed to cleanup title session"); - }); - } - } -}; - -const buildTitleConversationContext = async ( - runtime: OpencodeRuntimeAdapter, - sessionId: string, -) => { - const messages = await runtime.messages(sessionId, 12); - const recentMessages = messages - .filter( - (message) => - message.info.role === "user" || message.info.role === "assistant", - ) - .map((message) => ({ - role: message.info.role, - content: collectTextContent(message.parts).replace(/\s+/g, " ").trim(), - })) - .filter((message) => message.content.length > 0) - .slice(-6); - - if (recentMessages.length === 0) { - return ""; - } - - return recentMessages - .map((message) => `${message.role === "user" ? "用户" : "助手"}:${message.content}`) - .join("\n") - .slice(0, 2400); -}; - -const getConversationTurnStats = async ( - runtime: OpencodeRuntimeAdapter, - sessionId: string, -) => { - const messages = await runtime.messages(sessionId, 12); - return messages.reduce( - (stats, message) => { - if (message.info.role === "user") { - stats.userMessageCount += 1; - } else if (message.info.role === "assistant") { - stats.assistantMessageCount += 1; - } - return stats; - }, - { - userMessageCount: 0, - assistantMessageCount: 0, - }, - ); -}; - -const normalizeGeneratedTitle = (rawTitle: string, fallback: string) => { - const normalized = rawTitle - .replace(/\s+/g, " ") - .replace(/["'“”‘’`]/g, "") - .trim(); - if (!normalized) { - return fallback; - } - return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; -}; - -const toolLabels: Record = { - dynamic_http_call: "后端数据查询", - fetch_result_ref: "结果引用回读", - memory_manager: "记忆写入", - session_search: "历史会话检索", - skill_manager: "流程沉淀", - locate_features: "地图定位", - view_history: "历史数据面板", - view_scada: "SCADA 面板", - show_chart: "图表渲染", - render_junctions: "节点渲染", -}; - -const buildPromptWithLearningContext = async ( - memoryStore: MemoryStore, - actorKey: string, - projectKey: string, - message: string, -) => { - const snapshot = await memoryStore.buildPromptSnapshot({ actorKey, projectKey }); - if (!snapshot) { - return message; - } - return `${snapshot}\n\n[Current user request]\n${message}`; -}; diff --git a/src/routes/chatSession.ts b/src/routes/chatSession.ts new file mode 100644 index 0000000..03e90bf --- /dev/null +++ b/src/routes/chatSession.ts @@ -0,0 +1,144 @@ +import { logger } from "../logger.js"; +import { MemoryStore } from "../memory/store.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; + +import { collectTextContent } from "./chatStream.js"; + +const TITLE_PROMPT_TIMEOUT_MS = 2500; + +const buildSessionTitle = (message: string) => { + const normalized = message.replace(/\s+/g, " ").trim(); + if (!normalized) { + return "新对话"; + } + return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; +}; + +const buildTitleConversationContext = async ( + runtime: OpencodeRuntimeAdapter, + sessionId: string, +) => { + const messages = await runtime.messages(sessionId, 12); + const recentMessages = messages + .filter( + (message) => + message.info.role === "user" || message.info.role === "assistant", + ) + .map((message) => ({ + role: message.info.role, + content: collectTextContent(message.parts).replace(/\s+/g, " ").trim(), + })) + .filter((message) => message.content.length > 0) + .slice(-6); + + if (recentMessages.length === 0) { + return ""; + } + + return recentMessages + .map((message) => `${message.role === "user" ? "用户" : "助手"}:${message.content}`) + .join("\n") + .slice(0, 2400); +}; + +const normalizeGeneratedTitle = (rawTitle: string, fallback: string) => { + const normalized = rawTitle + .replace(/\s+/g, " ") + .replace(/["'“”‘’`]/g, "") + .trim(); + if (!normalized) { + return fallback; + } + return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; +}; + +export const generateSessionTitle = async ( + runtime: OpencodeRuntimeAdapter, + options: { + sessionId: string; + latestUserMessage: string; + fallbackTitle?: string; + }, +) => { + const fallback = options.fallbackTitle?.trim() || buildSessionTitle(options.latestUserMessage); + let titleSessionId: string | undefined; + try { + const conversation = await buildTitleConversationContext(runtime, options.sessionId); + if (!conversation) { + return fallback; + } + + const titleSession = await runtime.createSession(`title-${Date.now().toString(36)}`); + titleSessionId = titleSession.id; + const request = runtime + .prompt( + titleSession.id, + [ + "你是会话标题生成器。", + "请根据用户问题生成一个 8-16 字中文标题。", + "要求:简洁、可读、避免标点、不要引号、不要解释。", + "请优先概括最近这轮对话的核心任务或结论。", + "只输出标题本身。", + "", + conversation, + ].join("\n"), + ) + .then(async () => { + const messages = await runtime.messages(titleSession.id, 20); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const title = collectTextContent(assistantMessage?.parts ?? []); + return normalizeGeneratedTitle(title, fallback); + }); + + const timeout = new Promise((resolve) => { + setTimeout(() => resolve(fallback), TITLE_PROMPT_TIMEOUT_MS); + }); + + return await Promise.race([request, timeout]); + } catch (error) { + logger.warn({ err: error }, "failed to generate session title, using fallback"); + return fallback; + } finally { + if (titleSessionId) { + await runtime.abortSession(titleSessionId).catch((error) => { + logger.debug({ sessionId: titleSessionId, err: error }, "failed to cleanup title session"); + }); + } + } +}; + +export const getConversationTurnStats = async ( + runtime: OpencodeRuntimeAdapter, + sessionId: string, +) => { + const messages = await runtime.messages(sessionId, 12); + return messages.reduce( + (stats, message) => { + if (message.info.role === "user") { + stats.userMessageCount += 1; + } else if (message.info.role === "assistant") { + stats.assistantMessageCount += 1; + } + return stats; + }, + { + userMessageCount: 0, + assistantMessageCount: 0, + }, + ); +}; + +export const buildPromptWithLearningContext = async ( + memoryStore: MemoryStore, + actorKey: string, + projectKey: string, + message: string, +) => { + const snapshot = await memoryStore.buildPromptSnapshot({ actorKey, projectKey }); + if (!snapshot) { + return message; + } + return `${snapshot}\n\n[Current user request]\n${message}`; +}; \ No newline at end of file diff --git a/src/routes/chatStream.ts b/src/routes/chatStream.ts new file mode 100644 index 0000000..7d5f315 --- /dev/null +++ b/src/routes/chatStream.ts @@ -0,0 +1,786 @@ +import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; + +import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js"; +import { logger } from "../logger.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; + +export const supportedModels = [ + "deepseek/deepseek-v4-flash", + "deepseek/deepseek-v4-pro", +] as const; + +export type SupportedModel = (typeof supportedModels)[number]; + +type StreamPromptOptions = { + runtime: OpencodeRuntimeAdapter; + opencodeSessionId: string; + clientSessionId: string; + message: string; + model?: SupportedModel; + traceId?: string; + projectId?: string; + signal?: AbortSignal; + write: (event: string, data: Record) => void; +}; + +type ProgressStatus = "running" | "completed" | "error"; + +type ProgressPayload = { + id: string; + phase: string; + status: ProgressStatus; + title: string; + detail?: string; +}; + +const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development"; + +const toolLabels: Record = { + dynamic_http_call: "后端数据查询", + fetch_result_ref: "结果引用回读", + memory_manager: "记忆写入", + session_search: "历史会话检索", + skill_manager: "流程沉淀", + locate_features: "地图定位", + view_history: "历史数据面板", + view_scada: "SCADA 面板", + show_chart: "图表渲染", + render_junctions: "节点渲染", +}; + +const logDevelopmentDebug = ( + message: string, + metadata: Record, +) => { + if (!isDevelopmentDebugLoggingEnabled) { + return; + } + logger.info(metadata, message); +}; + +const getErrorMessage = (error: { + name: string; + data?: { message?: string }; +}) => error.data?.message ?? error.name; + +const getUnknownErrorMessage = (error: unknown) => { + if ( + typeof error === "object" && + error !== null && + "name" in error && + typeof error.name === "string" + ) { + const maybeData = "data" in error ? error.data : undefined; + return getErrorMessage({ + name: error.name, + data: + typeof maybeData === "object" && maybeData !== null && "message" in maybeData + ? { message: typeof maybeData.message === "string" ? maybeData.message : undefined } + : undefined, + }); + } + return error instanceof Error ? error.message : String(error); +}; + +const isObjectRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const normalizeToolParams = (value: unknown): Record => { + if (isObjectRecord(value)) { + return value; + } + if (typeof value === "string") { + try { + const parsed = JSON.parse(value) as unknown; + return isObjectRecord(parsed) ? parsed : {}; + } catch { + return {}; + } + } + return {}; +}; + +const extractRequestReason = (params: Record) => { + const candidates = ["reason", "request_reason", "why", "purpose", "rationale"]; + for (const key of candidates) { + const value = params[key]; + if (typeof value === "string") { + const normalized = value.trim(); + if (normalized) { + return normalized; + } + } + } + return ""; +}; + +const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill"); + +const extractSkillAuditInfo = (event: OpencodeEvent) => { + const payload = isObjectRecord(event.properties) + ? (event.properties as Record) + : {}; + const candidateName = + typeof payload.skill === "string" + ? payload.skill + : typeof payload.skillName === "string" + ? payload.skillName + : typeof payload.name === "string" + ? payload.name + : event.type; + const reason = extractRequestReason(payload); + return { + name: candidateName, + reason, + payload, + }; +}; + +const hasToolParams = (params: Record) => + Object.keys(params).length > 0; + +const toRuntimeModel = (model?: SupportedModel) => { + if (!model) { + return undefined; + } + const [providerID, modelID] = model.split("/"); + if (!providerID || !modelID) { + return undefined; + } + return { + providerID, + modelID, + }; +}; + +const isSessionEvent = (event: OpencodeEvent, sessionId: string) => + "properties" in event && + typeof event.properties === "object" && + event.properties !== null && + "sessionID" in event.properties && + event.properties.sessionID === sessionId; + +export const collectTextContent = (parts: Part[]) => + parts + .filter((part): part is Extract => part.type === "text") + .map((part) => part.text) + .join(""); + +const emitFallbackMessage = async ( + runtime: OpencodeRuntimeAdapter, + opencodeSessionId: string, + clientSessionId: string, + write: (event: string, data: Record) => void, +) => { + const messages = await runtime.messages(opencodeSessionId); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const parts = assistantMessage?.parts ?? []; + const text = collectTextContent(parts); + if (text) { + write("token", { + session_id: clientSessionId, + content: text, + }); + } +}; + +const normalizeToolStatus = (status: string) => { + if (status === "completed") return "completed"; + if (status === "error") return "error"; + return "running"; +}; + +const formatProgressValue = (value: unknown): string => { + if (typeof value === "string") { + return value.length > 120 ? `${value.slice(0, 117)}...` : value; + } + if ( + typeof value === "number" || + typeof value === "boolean" || + value === null || + value === undefined + ) { + return String(value); + } + try { + const serialized = JSON.stringify(value); + return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized; + } catch { + return "[unserializable]"; + } +}; + +const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim(); + +const truncateProgressText = (text: string, maxLength: number) => + text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text; + +const summarizeToolParams = (params: Record) => { + const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]); + const summary = Object.entries(params) + .filter(([key]) => !ignoredKeys.has(key)) + .slice(0, 4) + .map(([key, value]) => `${key}=${formatProgressValue(value)}`) + .join(", "); + + return summary || "无附加参数"; +}; + +const buildSessionStatusDetail = (status: { type: string; message?: string }) => { + if (status.type === "retry") { + return status.message + ? `模型请求需要重试,原因:${status.message}` + : "模型请求正在重试,等待下一次响应。"; + } + if (status.type === "busy") { + return status.message + ? `Agent 正在处理中:${status.message}` + : "Agent 正在执行推理、工具调用或结果整理。"; + } + if (status.type === "idle") { + return status.message + ? `Agent 已空闲:${status.message}` + : "当前会话暂时没有待处理任务。"; + } + return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`; +}; + +const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => { + const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800); + if (ended) { + return reasoningText + ? `推理过程:${reasoningText}` + : "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。"; + } + return reasoningText + ? `正在推理:${reasoningText}` + : "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。"; +}; + +const buildToolProgressDetail = ( + tool: string, + status: string, + params: Record, + reason: string, + error?: string, +) => { + const toolName = toolLabels[tool] ?? tool; + const reasonText = reason ? `;调用原因:${reason}` : ""; + const paramsText = `;关键参数:${summarizeToolParams(params)}`; + + if (status === "error") { + const errorText = error ? `;错误:${error}` : ""; + return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`; + } + if (status === "completed") { + return `${toolName} 已执行完成${reasonText}${paramsText}`; + } + if (status === "pending") { + return `${toolName} 已进入待执行状态${reasonText}${paramsText}`; + } + return `${toolName} 正在执行${reasonText}${paramsText}`; +}; + +const getToolProgressTitle = (tool: string, status: string) => { + const toolName = toolLabels[tool] ?? tool; + if (status === "completed") return `${toolName} 已完成`; + if (status === "error") return `${toolName} 执行失败`; + if (status === "pending") return `准备调用 ${toolName}`; + return `正在调用 ${toolName}`; +}; + +export const streamPromptResponse = async ({ + runtime, + opencodeSessionId, + clientSessionId, + message, + model, + traceId, + projectId, + signal, + write, +}: StreamPromptOptions): Promise<{ + aborted: boolean; + failed: boolean; + toolCallCount: number; +}> => { + const eventStream = await runtime.subscribeEvents(); + const iterator = eventStream[Symbol.asyncIterator](); + const requestStartedAt = Date.now(); + const promptStartedAt = Date.now(); + const progressStartedAtMap = new Map(); + const finalizedProgressIds = new Set(); + const emittedToolParts = new Set(); + const partTypes = new Map(); + const pendingPartTextDeltas = new Map(); + const reasoningDeltas = new Map(); + const reasoningStatuses = new Map(); + const toolStatuses = new Map(); + let lastSessionStatus: string | null = null; + let lastSessionStatusMessage: string | null = null; + let emittedText = false; + let toolCallCount = 0; + let done = false; + let promptSettled = false; + let aborted = signal?.aborted ?? false; + let failed = false; + const debugContext = { + opencodeSessionId, + clientSessionId, + traceId, + projectId, + model: model ?? null, + }; + + logDevelopmentDebug("chat stream started", { + ...debugContext, + messageChars: message.length, + }); + + const abortPromise = signal + ? new Promise<{ type: "abort" }>((resolve) => { + if (signal.aborted) { + resolve({ type: "abort" }); + return; + } + signal.addEventListener("abort", () => resolve({ type: "abort" }), { + once: true, + }); + }) + : null; + + const emitProgress = ({ id, phase, status, title, detail }: ProgressPayload) => { + if (status === "running" && finalizedProgressIds.has(id)) { + return; + } + + const now = Date.now(); + const startedAt = progressStartedAtMap.get(id) ?? now; + if (!progressStartedAtMap.has(id)) { + progressStartedAtMap.set(id, startedAt); + } + + if (status === "running") { + write("progress", { + session_id: clientSessionId, + id, + phase, + status, + title, + detail, + started_at: startedAt, + elapsed_ms: Math.max(0, now - startedAt), + }); + return; + } + + const durationMs = Math.max(0, now - startedAt); + finalizedProgressIds.add(id); + progressStartedAtMap.delete(id); + write("progress", { + session_id: clientSessionId, + id, + phase, + status, + title, + detail, + started_at: startedAt, + ended_at: now, + duration_ms: durationMs, + }); + }; + + emitProgress({ + id: "request-received", + phase: "start", + status: "running", + title: "已收到请求,正在启动 Agent 分析", + detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。", + }); + + const promptPromise = runtime + .prompt(opencodeSessionId, message, toRuntimeModel(model)) + .then(() => { + promptSettled = true; + logDevelopmentDebug("runtime.prompt resolved", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - promptStartedAt), + }); + }) + .catch((error: unknown) => { + promptSettled = true; + logDevelopmentDebug("runtime.prompt failed", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - promptStartedAt), + error: getUnknownErrorMessage(error), + }); + throw error; + }); + + logDevelopmentDebug("runtime.prompt dispatched", { + ...debugContext, + }); + + try { + while (!done) { + if (signal?.aborted) { + aborted = true; + logDevelopmentDebug("chat stream noticed abort signal", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + break; + } + + const nextEvent = iterator + .next() + .then((result) => ({ type: "event" as const, result })); + const nextPrompt = promptSettled + ? null + : promptPromise.then( + () => ({ type: "prompt" as const }), + (error: unknown) => ({ type: "prompt-error" as const, error }), + ); + const next = await Promise.race( + [ + ...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]), + ...(abortPromise ? [abortPromise] : []), + ], + ); + + if (next.type === "abort") { + aborted = true; + break; + } + + if (next.type === "prompt-error") { + throw next.error; + } + if (next.type === "prompt") { + continue; + } + if (next.result.done) { + break; + } + + const event = next.result.value as OpencodeEvent; + if (!isSessionEvent(event, opencodeSessionId)) { + continue; + } + + if (event.type === "session.status") { + const nextStatus = event.properties.status.type; + const nextStatusMessage = + "message" in event.properties.status && + typeof event.properties.status.message === "string" + ? event.properties.status.message + : null; + if ( + nextStatus !== lastSessionStatus || + nextStatusMessage !== lastSessionStatusMessage + ) { + lastSessionStatus = nextStatus; + lastSessionStatusMessage = nextStatusMessage; + logDevelopmentDebug("session status updated", { + ...debugContext, + status: nextStatus, + statusMessage: nextStatusMessage, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + } + emitProgress({ + id: "session-status", + phase: "session", + status: event.properties.status.type === "idle" ? "completed" : "running", + title: + event.properties.status.type === "retry" + ? `模型请求重试中:${event.properties.status.message}` + : event.properties.status.type === "busy" + ? "Agent 正在处理请求" + : "Agent 已空闲", + detail: buildSessionStatusDetail(event.properties.status), + }); + continue; + } + + if (isSkillEvent(event)) { + const { name, reason, payload } = extractSkillAuditInfo(event); + logDevelopmentDebug("skill event received", { + ...debugContext, + skill: name, + reason: reason || null, + payloadKeys: Object.keys(payload).slice(0, 8), + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + void writeLlmRequestAuditLog({ + kind: "skill", + sessionId: opencodeSessionId, + clientSessionId, + traceId, + projectId, + target: name, + reason, + reasonProvided: Boolean(reason), + payload, + }).catch((error) => { + logger.warn({ err: error }, "failed to write skill audit log"); + }); + } + + if (event.type === "message.part.delta" && event.properties.field === "text") { + const partType = partTypes.get(event.properties.partID); + if (partType === "text") { + emittedText = true; + write("token", { + session_id: clientSessionId, + content: event.properties.delta, + }); + } else if (partType === "reasoning") { + const pending = reasoningDeltas.get(event.properties.partID) ?? []; + pending.push(event.properties.delta); + reasoningDeltas.set(event.properties.partID, pending); + } else if (!partType) { + const pending = pendingPartTextDeltas.get(event.properties.partID) ?? []; + pending.push(event.properties.delta); + pendingPartTextDeltas.set(event.properties.partID, pending); + } + continue; + } + + if (event.type === "message.part.updated") { + const part = event.properties.part; + partTypes.set(part.id, part.type); + if (part.type === "text") { + const pending = pendingPartTextDeltas.get(part.id) ?? []; + pendingPartTextDeltas.delete(part.id); + for (const content of pending) { + emittedText = true; + write("token", { + session_id: clientSessionId, + content, + }); + } + } else if (part.type === "reasoning") { + const pending = pendingPartTextDeltas.get(part.id) ?? []; + if (pending.length > 0) { + const existing = reasoningDeltas.get(part.id) ?? []; + reasoningDeltas.set(part.id, existing.concat(pending)); + } + pendingPartTextDeltas.delete(part.id); + const reasoningStatus = part.time.end ? "completed" : "running"; + if (reasoningStatuses.get(part.id) !== reasoningStatus) { + reasoningStatuses.set(part.id, reasoningStatus); + logDevelopmentDebug("reasoning part status changed", { + ...debugContext, + partId: part.id, + status: reasoningStatus, + chunkCount: (reasoningDeltas.get(part.id) ?? []).length, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + } + const reasoningDetail = buildReasoningProgressDetail( + reasoningDeltas.get(part.id) ?? [], + part.time.end, + ); + emitProgress({ + id: part.id, + phase: "planning", + status: part.time.end ? "completed" : "running", + title: part.time.end ? "分析规划完成" : "正在规划分析步骤", + detail: reasoningDetail, + }); + } + if (part.type === "tool") { + const toolParams = normalizeToolParams(part.state.input); + const reason = extractRequestReason(toolParams); + const isToolFinalState = + part.state.status === "completed" || part.state.status === "error"; + const nextToolStatus = String(part.state.status); + + if (toolStatuses.get(part.id) !== nextToolStatus) { + toolStatuses.set(part.id, nextToolStatus); + logDevelopmentDebug("tool part status changed", { + ...debugContext, + partId: part.id, + tool: part.tool, + status: nextToolStatus, + reason: reason || null, + inputKeys: Object.keys(toolParams).slice(0, 8), + error: + part.state.status === "error" ? (part.state.error ?? "unknown") : null, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + } + + emitProgress({ + id: part.id, + phase: "tool", + status: normalizeToolStatus(part.state.status), + title: getToolProgressTitle(part.tool, part.state.status), + detail: buildToolProgressDetail( + part.tool, + part.state.status, + toolParams, + reason, + part.state.status === "error" ? part.state.error : undefined, + ), + }); + if ( + !emittedToolParts.has(part.id) && + (hasToolParams(toolParams) || isToolFinalState) + ) { + emittedToolParts.add(part.id); + toolCallCount += 1; + if (!reason) { + logger.warn( + { + tool: part.tool, + sessionId: opencodeSessionId, + clientSessionId, + }, + "llm tool request missing reason", + ); + } + void writeLlmRequestAuditLog({ + kind: "tool", + sessionId: opencodeSessionId, + clientSessionId, + traceId, + projectId, + target: part.tool, + reason, + reasonProvided: Boolean(reason), + payload: toolParams, + }).catch((error) => { + logger.warn({ err: error }, "failed to write tool audit log"); + }); + write("tool_call", { + session_id: clientSessionId, + tool: part.tool, + params: toolParams, + reason, + }); + } + } + continue; + } + + if (event.type === "todo.updated") { + const completed = event.properties.todos.filter( + (todo) => todo.status === "completed", + ).length; + emitProgress({ + id: "todo-progress", + phase: "planning", + status: completed === event.properties.todos.length ? "completed" : "running", + title: `计划进度 ${completed}/${event.properties.todos.length}`, + detail: event.properties.todos + .map((todo) => `${todo.status}: ${todo.content}`) + .join("\n"), + }); + continue; + } + + if (event.type === "session.error") { + logDevelopmentDebug("session error received", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + error: event.properties.error + ? getErrorMessage(event.properties.error) + : "opencode session error", + }); + write("error", { + session_id: clientSessionId, + message: event.properties.error + ? getErrorMessage(event.properties.error) + : "opencode session error", + detail: event.properties.error?.name, + total_duration_ms: Math.max(0, Date.now() - requestStartedAt), + }); + failed = true; + done = true; + continue; + } + + if (event.type === "session.idle") { + logDevelopmentDebug("session idle received", { + ...debugContext, + emittedText, + toolCallCount, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + emitProgress({ + id: "session-status", + phase: "session", + status: "completed", + title: "Agent 已完成处理", + detail: "当前会话已无待执行任务,正在收尾并准备返回最终结果。", + }); + done = true; + } + } + + if (aborted) { + logDevelopmentDebug("chat stream aborting session", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + await runtime.abortSession(opencodeSessionId).catch((error) => { + logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); + }); + return { aborted: true, failed: false, toolCallCount }; + } + + if (failed) { + return { aborted: false, failed: true, toolCallCount }; + } + + await promptPromise; + if (!emittedText) { + logDevelopmentDebug("no streamed text emitted, falling back to messages()", { + ...debugContext, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write); + } + emitProgress({ + id: "request-received", + phase: "start", + status: "completed", + title: "请求处理完成", + detail: "本次请求的分析、工具执行和结果整理流程已经完成。", + }); + emitProgress({ + id: "request-completed", + phase: "complete", + status: "completed", + title: "分析完成", + detail: emittedText + ? "最终回答已生成并推送到前端。" + : "已完成分析,并通过兜底消息补发最终回答内容。", + }); + write("done", { + session_id: clientSessionId, + total_duration_ms: Math.max(0, Date.now() - requestStartedAt), + }); + logDevelopmentDebug("chat stream completed", { + ...debugContext, + emittedText, + toolCallCount, + totalDurationMs: Math.max(0, Date.now() - requestStartedAt), + }); + return { aborted: false, failed: false, toolCallCount }; + } finally { + await iterator.return?.(undefined); + if (!promptSettled) { + await promptPromise.catch(() => undefined); + } + logDevelopmentDebug("chat stream cleanup finished", { + ...debugContext, + promptSettled, + totalDurationMs: Math.max(0, Date.now() - requestStartedAt), + }); + } +}; \ No newline at end of file