import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; import { Router } from "express"; import { z } from "zod"; import { logger } from "../logger.js"; import { MemoryStore } from "../memory/store.js"; import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; import { type ChatSessionBridge } from "../chat/sessionBridge.js"; import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js"; const payloadSchema = z.object({ message: z.string().min(1).max(10000), session_id: z.string().max(128).optional(), }); const abortPayloadSchema = z.object({ session_id: z.string().max(128), }); const forkPayloadSchema = z.object({ session_id: z.string().max(128).optional(), keep_message_count: z.coerce.number().int().min(0), }); export const buildChatRouter = ( sessionBridge: ChatSessionBridge, runtime: OpencodeRuntimeAdapter, memoryStore: MemoryStore, ) => { const chatRouter = Router(); chatRouter.post("/abort", async (req, res) => { const parsed = abortPayloadSchema.safeParse(req.body); if (!parsed.success) { res.status(400).json({ message: "invalid request payload", detail: parsed.error.flatten(), }); return; } try { const authHeader = req.header("authorization"); const accessToken = authHeader?.startsWith("Bearer ") ? authHeader.slice("Bearer ".length) : authHeader; const projectId = req.header("x-project-id") ?? undefined; const traceId = req.header("x-trace-id") ?? undefined; const userId = req.header("x-user-id") ?? undefined; const binding = await sessionBridge.abort({ clientSessionId: parsed.data.session_id, accessToken, projectId, traceId, userId, }); if (!binding) { res.status(204).end(); return; } logger.info( { clientSessionId: parsed.data.session_id, sessionId: binding.sessionId, traceId, projectId, }, "aborted chat session by client request", ); res.status(202).json({ session_id: parsed.data.session_id, aborted: true, }); } catch (error) { const detail = error instanceof Error ? error.message : String(error); logger.error({ err: error }, "chat abort failed"); res.status(500).json({ message: "chat abort failed", detail, }); } }); chatRouter.post("/fork", async (req, res) => { const parsed = forkPayloadSchema.safeParse(req.body); if (!parsed.success) { res.status(400).json({ message: "invalid request payload", detail: parsed.error.flatten(), }); return; } try { const authHeader = req.header("authorization"); const accessToken = authHeader?.startsWith("Bearer ") ? authHeader.slice("Bearer ".length) : authHeader; const projectId = req.header("x-project-id") ?? undefined; const traceId = req.header("x-trace-id") ?? undefined; const userId = req.header("x-user-id") ?? undefined; const { binding, requestContext } = await sessionBridge.fork({ clientSessionId: parsed.data.session_id, accessToken, projectId, traceId, keepMessageCount: parsed.data.keep_message_count, userId, }); logger.info( { sourceClientSessionId: parsed.data.session_id, clientSessionId: requestContext.clientSessionId, sessionId: binding.sessionId, traceId: requestContext.traceId, projectId: requestContext.projectId, keepMessageCount: parsed.data.keep_message_count, }, "forked chat session", ); res.status(200).json({ session_id: requestContext.clientSessionId, }); } catch (error) { const detail = error instanceof Error ? error.message : String(error); logger.error({ err: error }, "chat fork failed"); res.status(500).json({ message: "chat fork failed", detail, }); } }); chatRouter.post("/stream", async (req, res) => { const parsed = payloadSchema.safeParse(req.body); if (!parsed.success) { res.status(400).json({ message: "invalid request payload", detail: parsed.error.flatten(), }); return; } try { const authHeader = req.header("authorization"); const accessToken = authHeader?.startsWith("Bearer ") ? authHeader.slice("Bearer ".length) : authHeader; const projectId = req.header("x-project-id") ?? undefined; const traceId = req.header("x-trace-id") ?? undefined; const userId = req.header("x-user-id") ?? undefined; const { binding, requestContext, created } = await sessionBridge.resolve({ clientSessionId: parsed.data.session_id, accessToken, projectId, traceId, userId, }); logger.info( { clientSessionId: requestContext.clientSessionId, sessionId: binding.sessionId, created, traceId: requestContext.traceId, projectId: requestContext.projectId, }, "processing chat request", ); res.status(200); res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); res.flushHeaders?.(); const clientSessionId = requestContext.clientSessionId; let streamClosed = false; const abortController = new AbortController(); const handleClientClose = () => { if (streamClosed || abortController.signal.aborted) { return; } abortController.abort(); }; req.on("close", handleClientClose); res.on("close", handleClientClose); try { const preparedMessage = await buildPromptWithLearningContext( memoryStore, requestContext.actorKey, requestContext.projectKey, parsed.data.message, ); const streamResult = await streamPromptResponse({ runtime, opencodeSessionId: binding.sessionId, clientSessionId, message: preparedMessage, traceId: requestContext.traceId, projectId: requestContext.projectId, signal: abortController.signal, write: (event, data) => { if (streamClosed || res.writableEnded || res.destroyed) { return; } res.write(toSse(event, data)); }, }); if (!streamResult.aborted && !streamResult.failed) { const existingSessionTitle = sessionBridge.getSessionTitle(binding.sessionId); let sessionTitle = existingSessionTitle; const shouldGenerateTitle = !existingSessionTitle && (await isFirstRoundConversation(runtime, binding.sessionId)); if (shouldGenerateTitle) { sessionTitle = await generateSessionTitle(runtime, { sessionId: binding.sessionId, latestUserMessage: parsed.data.message, }); sessionBridge.setSessionTitle(binding.sessionId, sessionTitle); } if (!streamClosed && !res.writableEnded && !res.destroyed) { if (shouldGenerateTitle && sessionTitle) { res.write( toSse("session_title", { session_id: clientSessionId, title: sessionTitle, }), ); } } } } finally { streamClosed = true; req.off("close", handleClientClose); res.off("close", handleClientClose); } if (!res.writableEnded && !res.destroyed) { res.end(); } } catch (error) { const detail = error instanceof Error ? error.message : String(error); logger.error({ err: error }, "chat stream failed"); res.status(500).json({ message: "chat stream failed", detail, }); } }); return chatRouter; }; const toSse = (event: string, data: Record) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; const getErrorMessage = (error: { name: string; data?: { message?: string }; }) => error.data?.message ?? error.name; const isObjectRecord = (value: unknown): value is Record => typeof value === "object" && value !== null && !Array.isArray(value); const normalizeToolParams = (value: unknown): Record => { if (isObjectRecord(value)) { return value; } if (typeof value === "string") { try { const parsed = JSON.parse(value) as unknown; return isObjectRecord(parsed) ? parsed : {}; } catch { return {}; } } return {}; }; const extractRequestReason = (params: Record) => { const candidates = ["reason", "request_reason", "why", "purpose", "rationale"]; for (const key of candidates) { const value = params[key]; if (typeof value === "string") { const normalized = value.trim(); if (normalized) { return normalized; } } } return ""; }; const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill"); const extractSkillAuditInfo = (event: OpencodeEvent) => { const payload = isObjectRecord(event.properties) ? (event.properties as Record) : {}; const candidateName = typeof payload.skill === "string" ? payload.skill : typeof payload.skillName === "string" ? payload.skillName : typeof payload.name === "string" ? payload.name : event.type; const reason = extractRequestReason(payload); return { name: candidateName, reason, payload, }; }; const hasToolParams = (params: Record) => Object.keys(params).length > 0; type StreamPromptOptions = { runtime: OpencodeRuntimeAdapter; opencodeSessionId: string; clientSessionId: string; message: string; traceId?: string; projectId?: string; signal?: AbortSignal; write: (event: string, data: Record) => void; }; type ProgressStatus = "running" | "completed" | "error"; type ProgressPayload = { id: string; phase: string; status: ProgressStatus; title: string; detail?: string; }; const streamPromptResponse = async ({ runtime, opencodeSessionId, clientSessionId, message, traceId, projectId, signal, write, }: StreamPromptOptions): Promise<{ aborted: boolean; failed: boolean }> => { const eventStream = await runtime.subscribeEvents(); const iterator = eventStream[Symbol.asyncIterator](); const requestStartedAt = Date.now(); const progressStartedAtMap = new Map(); const finalizedProgressIds = new Set(); const emittedToolParts = new Set(); const partTypes = new Map(); const pendingPartTextDeltas = new Map(); const reasoningDeltas = new Map(); let emittedText = false; let done = false; let promptSettled = false; let aborted = signal?.aborted ?? false; let failed = false; const abortPromise = signal ? new Promise<{ type: "abort" }>((resolve) => { if (signal.aborted) { resolve({ type: "abort" }); return; } signal.addEventListener("abort", () => resolve({ type: "abort" }), { once: true, }); }) : null; const emitProgress = ({ id, phase, status, title, detail }: ProgressPayload) => { if (status === "running" && finalizedProgressIds.has(id)) { return; } const now = Date.now(); const startedAt = progressStartedAtMap.get(id) ?? now; if (!progressStartedAtMap.has(id)) { progressStartedAtMap.set(id, startedAt); } if (status === "running") { write("progress", { session_id: clientSessionId, id, phase, status, title, detail, started_at: startedAt, elapsed_ms: Math.max(0, now - startedAt), }); return; } const durationMs = Math.max(0, now - startedAt); finalizedProgressIds.add(id); progressStartedAtMap.delete(id); write("progress", { session_id: clientSessionId, id, phase, status, title, detail, started_at: startedAt, ended_at: now, duration_ms: durationMs, }); }; emitProgress({ id: "request-received", phase: "start", status: "running", title: "已收到请求,正在启动 Agent 分析", detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。", }); const promptPromise = runtime .prompt(opencodeSessionId, message) .then(() => { promptSettled = true; }) .catch((error: unknown) => { promptSettled = true; throw error; }); try { while (!done) { if (signal?.aborted) { aborted = true; break; } const nextEvent = iterator .next() .then((result) => ({ type: "event" as const, result })); const nextPrompt = promptSettled ? null : promptPromise.then( () => ({ type: "prompt" as const }), (error: unknown) => ({ type: "prompt-error" as const, error }), ); const next = await Promise.race( [ ...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]), ...(abortPromise ? [abortPromise] : []), ], ); if (next.type === "abort") { aborted = true; break; } if (next.type === "prompt-error") { throw next.error; } if (next.type === "prompt") { continue; } if (next.result.done) { break; } const event = next.result.value as OpencodeEvent; if (!isSessionEvent(event, opencodeSessionId)) { continue; } if (event.type === "session.status") { emitProgress({ id: "session-status", phase: "session", status: event.properties.status.type === "idle" ? "completed" : "running", title: event.properties.status.type === "retry" ? `模型请求重试中:${event.properties.status.message}` : event.properties.status.type === "busy" ? "Agent 正在处理请求" : "Agent 已空闲", detail: buildSessionStatusDetail(event.properties.status), }); continue; } if (isSkillEvent(event)) { const { name, reason, payload } = extractSkillAuditInfo(event); void writeLlmRequestAuditLog({ kind: "skill", sessionId: opencodeSessionId, clientSessionId, traceId, projectId, target: name, reason, reasonProvided: Boolean(reason), payload, }).catch((error) => { logger.warn({ err: error }, "failed to write skill audit log"); }); } if (event.type === "message.part.delta" && event.properties.field === "text") { const partType = partTypes.get(event.properties.partID); if (partType === "text") { emittedText = true; write("token", { session_id: clientSessionId, content: event.properties.delta, }); } else if (partType === "reasoning") { const pending = reasoningDeltas.get(event.properties.partID) ?? []; pending.push(event.properties.delta); reasoningDeltas.set(event.properties.partID, pending); } else if (!partType) { const pending = pendingPartTextDeltas.get(event.properties.partID) ?? []; pending.push(event.properties.delta); pendingPartTextDeltas.set(event.properties.partID, pending); } continue; } if (event.type === "message.part.updated") { const part = event.properties.part; partTypes.set(part.id, part.type); if (part.type === "text") { const pending = pendingPartTextDeltas.get(part.id) ?? []; pendingPartTextDeltas.delete(part.id); for (const content of pending) { emittedText = true; write("token", { session_id: clientSessionId, content, }); } } else if (part.type === "reasoning") { const pending = pendingPartTextDeltas.get(part.id) ?? []; if (pending.length > 0) { const existing = reasoningDeltas.get(part.id) ?? []; reasoningDeltas.set(part.id, existing.concat(pending)); } pendingPartTextDeltas.delete(part.id); const reasoningDetail = buildReasoningProgressDetail( reasoningDeltas.get(part.id) ?? [], part.time.end, ); emitProgress({ id: part.id, phase: "planning", status: part.time.end ? "completed" : "running", title: part.time.end ? "分析规划完成" : "正在规划分析步骤", detail: reasoningDetail, }); } if (part.type === "tool") { const toolParams = normalizeToolParams(part.state.input); const reason = extractRequestReason(toolParams); const isToolFinalState = part.state.status === "completed" || part.state.status === "error"; emitProgress({ id: part.id, phase: "tool", status: normalizeToolStatus(part.state.status), title: getToolProgressTitle(part.tool, part.state.status), detail: buildToolProgressDetail( part.tool, part.state.status, toolParams, reason, part.state.status === "error" ? part.state.error : undefined, ), }); if ( !emittedToolParts.has(part.id) && (hasToolParams(toolParams) || isToolFinalState) ) { emittedToolParts.add(part.id); if (!reason) { logger.warn( { tool: part.tool, sessionId: opencodeSessionId, clientSessionId, }, "llm tool request missing reason", ); } void writeLlmRequestAuditLog({ kind: "tool", sessionId: opencodeSessionId, clientSessionId, traceId, projectId, target: part.tool, reason, reasonProvided: Boolean(reason), payload: toolParams, }).catch((error) => { logger.warn({ err: error }, "failed to write tool audit log"); }); write("tool_call", { session_id: clientSessionId, tool: part.tool, params: toolParams, reason, }); } } continue; } if (event.type === "todo.updated") { const completed = event.properties.todos.filter( (todo) => todo.status === "completed", ).length; emitProgress({ id: "todo-progress", phase: "planning", status: completed === event.properties.todos.length ? "completed" : "running", title: `计划进度 ${completed}/${event.properties.todos.length}`, detail: event.properties.todos .map((todo) => `${todo.status}: ${todo.content}`) .join("\n"), }); continue; } if (event.type === "session.error") { write("error", { session_id: clientSessionId, message: event.properties.error ? getErrorMessage(event.properties.error) : "opencode session error", detail: event.properties.error?.name, total_duration_ms: Math.max(0, Date.now() - requestStartedAt), }); failed = true; done = true; continue; } if (event.type === "session.idle") { emitProgress({ id: "session-status", phase: "session", status: "completed", title: "Agent 已完成处理", detail: "当前会话已无待执行任务,正在收尾并准备返回最终结果。", }); done = true; } } if (aborted) { await runtime.abortSession(opencodeSessionId).catch((error) => { logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); }); return { aborted: true, failed: false }; } if (failed) { return { aborted: false, failed: true }; } await promptPromise; if (!emittedText) { await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write); } emitProgress({ id: "request-received", phase: "start", status: "completed", title: "请求处理完成", detail: "本次请求的分析、工具执行和结果整理流程已经完成。", }); emitProgress({ id: "request-completed", phase: "complete", status: "completed", title: "分析完成", detail: emittedText ? "最终回答已生成并推送到前端。" : "已完成分析,并通过兜底消息补发最终回答内容。", }); write("done", { session_id: clientSessionId, total_duration_ms: Math.max(0, Date.now() - requestStartedAt), }); return { aborted: false, failed: false }; } finally { await iterator.return?.(undefined); if (!promptSettled) { await promptPromise.catch(() => undefined); } } }; const isSessionEvent = (event: OpencodeEvent, sessionId: string) => "properties" in event && typeof event.properties === "object" && event.properties !== null && "sessionID" in event.properties && event.properties.sessionID === sessionId; const emitFallbackMessage = async ( runtime: OpencodeRuntimeAdapter, opencodeSessionId: string, clientSessionId: string, write: (event: string, data: Record) => void, ) => { const messages = await runtime.messages(opencodeSessionId); const assistantMessage = [...messages] .reverse() .find((message) => message.info.role === "assistant"); const parts = assistantMessage?.parts ?? []; const text = collectTextContent(parts); if (text) { write("token", { session_id: clientSessionId, content: text, }); } }; const collectTextContent = (parts: Part[]) => parts .filter((part): part is Extract => part.type === "text") .map((part) => part.text) .join(""); const normalizeToolStatus = (status: string) => { if (status === "completed") return "completed"; if (status === "error") return "error"; return "running"; }; const formatProgressValue = (value: unknown): string => { if (typeof value === "string") { return value.length > 120 ? `${value.slice(0, 117)}...` : value; } if ( typeof value === "number" || typeof value === "boolean" || value === null || value === undefined ) { return String(value); } try { const serialized = JSON.stringify(value); return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized; } catch { return "[unserializable]"; } }; const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim(); const truncateProgressText = (text: string, maxLength: number) => text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text; const summarizeToolParams = (params: Record) => { const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]); const summary = Object.entries(params) .filter(([key]) => !ignoredKeys.has(key)) .slice(0, 4) .map(([key, value]) => `${key}=${formatProgressValue(value)}`) .join(", "); return summary || "无附加参数"; }; const buildSessionStatusDetail = (status: { type: string; message?: string }) => { if (status.type === "retry") { return status.message ? `模型请求需要重试,原因:${status.message}` : "模型请求正在重试,等待下一次响应。"; } if (status.type === "busy") { return status.message ? `Agent 正在处理中:${status.message}` : "Agent 正在执行推理、工具调用或结果整理。"; } if (status.type === "idle") { return status.message ? `Agent 已空闲:${status.message}` : "当前会话暂时没有待处理任务。"; } return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`; }; const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => { const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800); if (ended) { return reasoningText ? `推理过程:${reasoningText}` : "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。"; } return reasoningText ? `正在推理:${reasoningText}` : "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。"; }; const buildToolProgressDetail = ( tool: string, status: string, params: Record, reason: string, error?: string, ) => { const toolName = toolLabels[tool] ?? tool; const reasonText = reason ? `;调用原因:${reason}` : ""; const paramsText = `;关键参数:${summarizeToolParams(params)}`; if (status === "error") { const errorText = error ? `;错误:${error}` : ""; return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`; } if (status === "completed") { return `${toolName} 已执行完成${reasonText}${paramsText}`; } if (status === "pending") { return `${toolName} 已进入待执行状态${reasonText}${paramsText}`; } return `${toolName} 正在执行${reasonText}${paramsText}`; }; const getToolProgressTitle = (tool: string, status: string) => { const toolName = toolLabels[tool] ?? tool; if (status === "completed") return `${toolName} 已完成`; if (status === "error") return `${toolName} 执行失败`; if (status === "pending") return `准备调用 ${toolName}`; return `正在调用 ${toolName}`; }; const buildSessionTitle = (message: string) => { const normalized = message.replace(/\s+/g, " ").trim(); if (!normalized) { return "新对话"; } return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; }; const TITLE_PROMPT_TIMEOUT_MS = 2500; const generateSessionTitle = async ( runtime: OpencodeRuntimeAdapter, options: { sessionId: string; latestUserMessage: string; fallbackTitle?: string; }, ) => { const fallback = options.fallbackTitle?.trim() || buildSessionTitle(options.latestUserMessage); let titleSessionId: string | undefined; try { const conversation = await buildTitleConversationContext(runtime, options.sessionId); if (!conversation) { return fallback; } const titleSession = await runtime.createSession(`title-${Date.now().toString(36)}`); titleSessionId = titleSession.id; const request = runtime .prompt( titleSession.id, [ "你是会话标题生成器。", "请根据用户问题生成一个 8-16 字中文标题。", "要求:简洁、可读、避免标点、不要引号、不要解释。", "请优先概括最近这轮对话的核心任务或结论。", "只输出标题本身。", "", conversation, ].join("\n"), ) .then(async () => { const messages = await runtime.messages(titleSession.id, 20); const assistantMessage = [...messages] .reverse() .find((message) => message.info.role === "assistant"); const title = collectTextContent(assistantMessage?.parts ?? []); return normalizeGeneratedTitle(title, fallback); }); const timeout = new Promise((resolve) => { setTimeout(() => resolve(fallback), TITLE_PROMPT_TIMEOUT_MS); }); return await Promise.race([request, timeout]); } catch (error) { logger.warn({ err: error }, "failed to generate session title, using fallback"); return fallback; } finally { if (titleSessionId) { await runtime.abortSession(titleSessionId).catch((error) => { logger.debug({ sessionId: titleSessionId, err: error }, "failed to cleanup title session"); }); } } }; const buildTitleConversationContext = async ( runtime: OpencodeRuntimeAdapter, sessionId: string, ) => { const messages = await runtime.messages(sessionId, 12); const recentMessages = messages .filter( (message) => message.info.role === "user" || message.info.role === "assistant", ) .map((message) => ({ role: message.info.role, content: collectTextContent(message.parts).replace(/\s+/g, " ").trim(), })) .filter((message) => message.content.length > 0) .slice(-6); if (recentMessages.length === 0) { return ""; } return recentMessages .map((message) => `${message.role === "user" ? "用户" : "助手"}:${message.content}`) .join("\n") .slice(0, 2400); }; const isFirstRoundConversation = async ( runtime: OpencodeRuntimeAdapter, sessionId: string, ) => { const messages = await runtime.messages(sessionId, 12); const chatMessageCount = messages.filter( (message) => message.info.role === "user" || message.info.role === "assistant", ).length; return chatMessageCount === 2; }; const normalizeGeneratedTitle = (rawTitle: string, fallback: string) => { const normalized = rawTitle .replace(/\s+/g, " ") .replace(/["'“”‘’`]/g, "") .trim(); if (!normalized) { return fallback; } return normalized.length > 24 ? `${normalized.slice(0, 24)}...` : normalized; }; const toolLabels: Record = { dynamic_http_call: "后端数据查询", fetch_result_ref: "结果引用回读", memory_manager: "记忆写入", skill_manager: "流程沉淀", locate_features: "地图定位", view_history: "历史数据面板", view_scada: "SCADA 面板", show_chart: "图表渲染", }; const buildPromptWithLearningContext = async ( memoryStore: MemoryStore, actorKey: string, projectKey: string, message: string, ) => { const snapshot = await memoryStore.buildPromptSnapshot({ actorKey, projectKey }); if (!snapshot) { return message; } return `${snapshot}\n\n[Current user request]\n${message}`; };