diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 90568ff..abb6032 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -8,9 +8,7 @@ import { MemoryStore } from "../memory/store.js"; import { type SessionUiStateStore } from "../sessions/uiStateStore.js"; import { type SessionMetadataStore } from "../sessions/metadataStore.js"; import { type ResultReferenceResolver } from "../results/resolver.js"; -import { RESULT_REFERENCE_KIND } from "../results/store.js"; import { - type PermissionReply, type OpencodeRuntimeAdapter, } from "../runtime/opencode.js"; import { type ChatSessionBridge } from "../chat/sessionBridge.js"; @@ -22,13 +20,36 @@ import { generateSessionTitle, shouldGenerateSessionTitle, } from "./chatSession.js"; +import { registerChatAuxiliaryRoutes } from "./chatAuxiliaryRoutes.js"; +import { registerChatInteractionRoutes } from "./chatInteractionRoutes.js"; import { collectTextContent, type PermissionRequestPayload, + type QuestionRequestPayload, streamPromptResponse, supportedModels, type SupportedModel, + type TodoUpdatePayload, } from "./chatStream.js"; +import { + type ActiveRun, + type RunStatus, + type StreamSubscriber, + cancelBackendTodos, + completeBackendProgress, + countFrontendUserMessages, + createInitialStreamingMessages, + isObjectRecord, + pruneBranchGroupsForMessageIndex, + toFrontendPermission, + toPermissionStatus, + updateLastAssistantMessage, + updateLastAssistantPermission, + updateLastAssistantQuestion, + upsertBackendProgress, + upsertBackendQuestion, + upsertBackendTodoUpdate, +} from "./chatUiState.js"; const payloadSchema = z.object({ message: z.string().min(1).max(10000), @@ -38,16 +59,6 @@ const payloadSchema = z.object({ regenerate_from_message_index: z.coerce.number().int().min(0).optional(), }); -const abortPayloadSchema = z.object({ - session_id: z.string().max(128), -}); - -const permissionReplyPayloadSchema = z.object({ - session_id: z.string().max(128), - reply: z.enum(["once", "always", "reject"]), - message: z.string().max(1000).optional(), -}); - const createSessionPayloadSchema = z.object({ session_id: z.string().max(128).optional(), parent_session_id: z.string().max(128).optional(), @@ -65,22 +76,6 @@ const sessionStateSchema = z.object({ branch_groups: z.array(z.unknown()).default([]), }); -type RunStatus = "running" | "completed" | "error" | "aborted"; - -type StreamSubscriber = { - write: (event: string, data: Record) => void; - close: () => void; -}; - -type ActiveRun = { - clientSessionId: string; - controller: AbortController; - messages: unknown[]; - pendingPermissions: Map; - status: RunStatus; - subscribers: Set; -}; - const activeRuns = new Map(); const lastRunStatuses = new Map(); @@ -91,174 +86,6 @@ const toSessionUiStateContext = (sessionRecord: SessionRecord) => ({ const getSessionRunStatus = (sessionId: string) => activeRuns.get(sessionId)?.status ?? lastRunStatuses.get(sessionId); -const isObjectRecord = (value: unknown): value is Record => - typeof value === "object" && value !== null && !Array.isArray(value); - -const createFrontendMessageId = () => - `msg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; - -const createInitialStreamingMessages = (existingMessages: unknown[], userContent: string) => { - const userMessage = { - id: createFrontendMessageId(), - role: "user", - content: userContent, - }; - return [ - ...existingMessages, - { - ...userMessage, - branchRootId: userMessage.id, - }, - { - id: createFrontendMessageId(), - role: "assistant", - content: "", - progress: [ - { - id: "request-received", - phase: "start", - status: "running", - title: "已收到请求,正在启动 Agent 分析", - detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。", - startedAt: Date.now(), - elapsedMs: 0, - elapsedSnapshotAt: Date.now(), - }, - ], - }, - ]; -}; - -const countFrontendUserMessages = (messages: unknown[]) => - messages.filter( - (message) => isObjectRecord(message) && message.role === "user", - ).length; - -const pruneBranchGroupsForMessageIndex = ( - branchGroups: unknown[], - messageIndex: number | undefined, -) => { - if (messageIndex === undefined) { - return branchGroups; - } - return branchGroups.filter( - (group) => - !isObjectRecord(group) || - typeof group.parentCount !== "number" || - group.parentCount < messageIndex, - ); -}; - -const upsertBackendProgress = ( - progress: unknown, - payload: Record, -) => { - const next = Array.isArray(progress) ? [...progress] : []; - const id = typeof payload.id === "string" ? payload.id : `progress-${Date.now()}`; - const index = next.findIndex((item) => isObjectRecord(item) && item.id === id); - const nextItem = { - id, - phase: typeof payload.phase === "string" ? payload.phase : "progress", - status: - payload.status === "completed" || payload.status === "error" - ? payload.status - : "running", - title: typeof payload.title === "string" ? payload.title : "正在处理", - detail: typeof payload.detail === "string" ? payload.detail : undefined, - startedAt: typeof payload.started_at === "number" ? payload.started_at : undefined, - endedAt: typeof payload.ended_at === "number" ? payload.ended_at : undefined, - elapsedMs: typeof payload.elapsed_ms === "number" ? payload.elapsed_ms : undefined, - elapsedSnapshotAt: - typeof payload.elapsed_ms === "number" ? Date.now() : undefined, - durationMs: typeof payload.duration_ms === "number" ? payload.duration_ms : undefined, - }; - if (index >= 0) { - next[index] = nextItem; - } else { - next.push(nextItem); - } - return next; -}; - -const completeBackendProgress = (progress: unknown) => - Array.isArray(progress) - ? progress.map((item) => { - if (!isObjectRecord(item) || item.status !== "running") { - return item; - } - const endedAt = Date.now(); - const startedAt = typeof item.startedAt === "number" ? item.startedAt : undefined; - return { - ...item, - status: "completed", - endedAt, - elapsedMs: undefined, - elapsedSnapshotAt: undefined, - durationMs: - typeof item.durationMs === "number" - ? item.durationMs - : startedAt !== undefined - ? Math.max(0, endedAt - startedAt) - : item.elapsedMs, - }; - }) - : progress; - -const updateLastAssistantMessage = ( - messages: unknown[], - updater: (message: Record) => Record, -) => { - for (let index = messages.length - 1; index >= 0; index -= 1) { - const message = messages[index]; - if (isObjectRecord(message) && message.role === "assistant") { - const next = [...messages]; - next[index] = updater(message); - return next; - } - } - return messages; -}; - -const updateLastAssistantPermission = ( - messages: unknown[], - requestId: string, - updater: (permission: Record) => Record, -) => - updateLastAssistantMessage(messages, (message) => { - const permissions = Array.isArray(message.permissions) - ? message.permissions - : []; - return { - ...message, - permissions: permissions.map((permission) => - isObjectRecord(permission) && permission.requestId === requestId - ? updater(permission) - : permission, - ), - }; - }); - -const toFrontendPermission = ( - payload: PermissionRequestPayload, - status: "pending" | "approved_once" | "approved_always" | "rejected" | "error" = "pending", -) => ({ - requestId: payload.request_id, - sessionId: payload.session_id, - permission: payload.permission, - patterns: payload.patterns, - metadata: payload.metadata, - always: payload.always, - tool: payload.tool, - createdAt: payload.created_at, - status, -}); - -const toPermissionStatus = (reply: PermissionReply) => { - if (reply === "always") return "approved_always"; - if (reply === "once") return "approved_once"; - return "rejected"; -}; - export const buildChatRouter = ( sessionBridge: ChatSessionBridge, runtime: OpencodeRuntimeAdapter, @@ -580,258 +407,20 @@ export const buildChatRouter = ( res.status(204).end(); }); - chatRouter.get("/render-ref/:renderRef", async (req, res) => { - const renderRef = req.params.renderRef?.trim(); - const userId = req.header("x-user-id")?.trim(); - const projectId = req.header("x-project-id") ?? undefined; - const clientSessionId = - typeof req.query.session_id === "string" - ? req.query.session_id.trim() - : undefined; - - if (!userId) { - res.status(400).json({ - message: "x-user-id is required", - }); - return; - } - - if (!renderRef) { - res.status(400).json({ - message: "render_ref is required", - }); - return; - } - - const result = await resultReferenceResolver.getFullAuthorized( - renderRef, - { - actorKey: toActorKey(userId), - clientSessionId, - projectId, - }, - { - expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload, - }, - ); - - if (!result) { - res.status(404).json({ message: "render_ref not found" }); - return; - } - - res.json(result); + registerChatAuxiliaryRoutes(chatRouter, { + activeRuns, + lastRunStatuses, + resultReferenceResolver, + sessionBridge, + sessionMetadataStore, + sessionUiStateStore, }); - chatRouter.post("/abort", async (req, res) => { - const parsed = abortPayloadSchema.safeParse(req.body); - if (!parsed.success) { - res.status(400).json({ - message: "invalid request payload", - detail: parsed.error.flatten(), - }); - return; - } - - try { - const projectId = req.header("x-project-id") ?? undefined; - const userId = req.header("x-user-id") ?? undefined; - const actorKey = toActorKey(userId); - const projectKey = toProjectKey(projectId); - const sessionRecord = await sessionMetadataStore.get( - { actorKey, projectId, projectKey, userId }, - parsed.data.session_id, - ); - const binding = sessionRecord - ? await sessionBridge.abort({ - clientSessionId: sessionRecord.sessionId, - sessionId: sessionRecord.sessionId, - }) - : null; - const run = activeRuns.get(parsed.data.session_id); - if (run && run.status === "running") { - run.status = "aborted"; - lastRunStatuses.set(parsed.data.session_id, "aborted"); - run.controller.abort(); - run.messages = updateLastAssistantMessage(run.messages, (message) => ({ - ...message, - content: - typeof message.content === "string" && message.content.trim() - ? message.content - : "⚠️ **请求已中断**", - isError: true, - progress: completeBackendProgress(message.progress), - })); - if (sessionRecord) { - const currentState = await sessionUiStateStore.read( - toSessionUiStateContext(sessionRecord), - ); - await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord), { - sessionId: sessionRecord.sessionId, - isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, - messages: run.messages, - branchGroups: currentState?.branchGroups ?? [], - }); - } - for (const subscriber of run.subscribers) { - subscriber.write("error", { - session_id: parsed.data.session_id, - message: "请求已中断", - }); - subscriber.close(); - } - run.subscribers.clear(); - } - - if (!binding && !run) { - res.status(204).end(); - return; - } - - logger.info( - { - clientSessionId: parsed.data.session_id, - sessionId: binding?.sessionId ?? parsed.data.session_id, - }, - "aborted chat session by client request", - ); - res.status(202).json({ - session_id: parsed.data.session_id, - aborted: true, - }); - } catch (error) { - const detail = error instanceof Error ? error.message : String(error); - logger.error({ err: error }, "chat abort failed"); - res.status(500).json({ - message: "chat abort failed", - detail, - }); - } - }); - - chatRouter.post("/permission/:requestId/reply", async (req, res) => { - const requestId = req.params.requestId?.trim(); - const parsed = permissionReplyPayloadSchema.safeParse(req.body); - if (!requestId) { - res.status(400).json({ message: "request_id is required" }); - return; - } - if (!parsed.success) { - res.status(400).json({ - message: "invalid request payload", - detail: parsed.error.flatten(), - }); - return; - } - - try { - const projectId = req.header("x-project-id") ?? undefined; - const userId = req.header("x-user-id") ?? undefined; - const actorKey = toActorKey(userId); - const projectKey = toProjectKey(projectId); - const sessionRecord = await sessionMetadataStore.get( - { actorKey, projectId, projectKey, userId }, - parsed.data.session_id, - ); - if (!sessionRecord) { - res.status(404).json({ message: "session not found" }); - return; - } - - const run = activeRuns.get(sessionRecord.sessionId); - if (!run || run.status !== "running") { - res.status(409).json({ message: "session is not waiting for permissions" }); - return; - } - - const pendingPermission = run.pendingPermissions.get(requestId); - if (!pendingPermission) { - res.status(404).json({ message: "permission request not found" }); - return; - } - const persistPermissionState = async () => { - const currentState = await sessionUiStateStore.read( - toSessionUiStateContext(sessionRecord), - ); - await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord), { - sessionId: sessionRecord.sessionId, - isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, - messages: run.messages, - branchGroups: currentState?.branchGroups ?? [], - }); - }; - - try { - await runtime.replyPermission({ - requestId, - sessionId: sessionRecord.sessionId, - reply: parsed.data.reply, - message: parsed.data.message, - }); - } catch (error) { - run.messages = updateLastAssistantPermission( - run.messages, - requestId, - (permission) => ({ - ...permission, - status: "error", - error: - error instanceof Error - ? error.message - : "failed to reply permission", - }), - ); - await persistPermissionState().catch((persistError) => { - logger.warn( - { err: persistError, sessionId: sessionRecord.sessionId }, - "failed to persist permission error state", - ); - }); - res.status(502).json({ - message: "permission reply failed", - detail: error instanceof Error ? error.message : String(error), - }); - return; - } - - run.pendingPermissions.delete(requestId); - const status = toPermissionStatus(parsed.data.reply); - run.messages = updateLastAssistantPermission( - run.messages, - requestId, - (permission) => ({ - ...permission, - status, - repliedAt: Date.now(), - }), - ); - await persistPermissionState().catch((persistError) => { - logger.warn( - { err: persistError, sessionId: sessionRecord.sessionId }, - "failed to persist permission reply state", - ); - }); - for (const subscriber of run.subscribers) { - subscriber.write("permission_response", { - session_id: sessionRecord.sessionId, - request_id: requestId, - reply: parsed.data.reply, - }); - } - - res.status(202).json({ - session_id: sessionRecord.sessionId, - request_id: requestId, - reply: parsed.data.reply, - }); - } catch (error) { - const detail = error instanceof Error ? error.message : String(error); - logger.error({ err: error }, "permission reply route failed"); - res.status(500).json({ - message: "permission reply route failed", - detail, - }); - } + registerChatInteractionRoutes(chatRouter, { + activeRuns, + runtime, + sessionMetadataStore, + sessionUiStateStore, }); chatRouter.post("/fork", async (req, res) => { @@ -1045,6 +634,7 @@ export const buildChatRouter = ( controller: abortController, messages: initialMessages, pendingPermissions: new Map(), + pendingQuestions: new Map(), status: "running", subscribers: new Set(), }; @@ -1129,6 +719,7 @@ export const buildChatRouter = ( : `⚠️ **错误:** ${typeof data.message === "string" ? data.message : "unknown error"}`, isError: true, progress: completeBackendProgress(message.progress), + todos: cancelBackendTodos(message.todos), })); } else if (event === "permission_request") { const payload = data as PermissionRequestPayload; @@ -1159,6 +750,60 @@ export const buildChatRouter = ( }), ); } + } else if (event === "question_request") { + const payload = data as QuestionRequestPayload; + let shouldTrackQuestion = true; + if (payload.tool?.callID) { + if (payload.request_id !== payload.tool.callID) { + activeRun.pendingQuestions.delete(payload.tool.callID); + } else { + const hasActionableQuestion = [...activeRun.pendingQuestions.values()].some( + (question) => + question.tool?.callID === payload.tool?.callID && + question.request_id !== payload.tool?.callID, + ); + if (hasActionableQuestion) { + activeRun.messages = updateLastAssistantMessage( + activeRun.messages, + (message) => ({ + ...message, + questions: upsertBackendQuestion(message.questions, payload), + }), + ); + shouldTrackQuestion = false; + } + } + } + if (shouldTrackQuestion) { + activeRun.pendingQuestions.set(payload.request_id, payload); + activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({ + ...message, + questions: upsertBackendQuestion(message.questions, payload), + })); + } + } else if (event === "question_response") { + const requestId = + typeof data.request_id === "string" ? data.request_id : undefined; + if (requestId) { + activeRun.pendingQuestions.delete(requestId); + activeRun.messages = updateLastAssistantQuestion( + activeRun.messages, + requestId, + (question) => ({ + ...question, + status: data.rejected === true ? "rejected" : "answered", + repliedAt: Date.now(), + answers: Array.isArray(data.answers) ? data.answers : question.answers, + error: undefined, + }), + ); + } + } else if (event === "todo_update") { + const payload = data as TodoUpdatePayload; + activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({ + ...message, + todos: upsertBackendTodoUpdate(message.todos, payload), + })); } for (const subscriber of activeRun.subscribers) { @@ -1257,6 +902,21 @@ export const buildChatRouter = ( } } } finally { + if (abortController.signal.aborted) { + activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({ + ...message, + content: + typeof message.content === "string" && message.content.trim() + ? message.content + : "⚠️ **请求已中断**", + isError: true, + progress: completeBackendProgress(message.progress), + todos: cancelBackendTodos(message.todos), + })); + void queueSessionUiStatePersist().catch((error) => { + logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist aborted chat stream state"); + }); + } await persistQueue.catch((error) => { logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state"); }); @@ -1273,7 +933,12 @@ export const buildChatRouter = ( subscriber.close(); } activeRun.subscribers.clear(); - activeRuns.delete(clientSessionId); + if ( + activeRun.pendingPermissions.size === 0 && + activeRun.pendingQuestions.size === 0 + ) { + activeRuns.delete(clientSessionId); + } streamClosed = true; req.off("close", handleClientClose); res.off("close", handleClientClose); @@ -1285,6 +950,16 @@ export const buildChatRouter = ( } catch (error) { const detail = error instanceof Error ? error.message : String(error); logger.error({ err: error }, "chat stream failed"); + if (res.headersSent) { + if (!res.writableEnded && !res.destroyed) { + res.write(toSse("error", { + message: "chat stream failed", + detail, + })); + res.end(); + } + return; + } res.status(500).json({ message: "chat stream failed", detail, diff --git a/src/routes/chatAuxiliaryRoutes.ts b/src/routes/chatAuxiliaryRoutes.ts new file mode 100644 index 0000000..2a25512 --- /dev/null +++ b/src/routes/chatAuxiliaryRoutes.ts @@ -0,0 +1,176 @@ +import { type Router } from "express"; +import { z } from "zod"; + +import { type ChatSessionBridge } from "../chat/sessionBridge.js"; +import { logger } from "../logger.js"; +import { type ResultReferenceResolver } from "../results/resolver.js"; +import { RESULT_REFERENCE_KIND } from "../results/store.js"; +import { type SessionMetadataStore } from "../sessions/metadataStore.js"; +import { type SessionUiStateStore } from "../sessions/uiStateStore.js"; +import { toActorKey, toProjectKey } from "../utils/fileStore.js"; +import { + type ActiveRun, + type RunStatus, + cancelBackendTodos, + completeBackendProgress, + updateLastAssistantMessage, +} from "./chatUiState.js"; + +const abortPayloadSchema = z.object({ + session_id: z.string().max(128), +}); + +type RegisterAuxiliaryRoutesOptions = { + activeRuns: Map; + lastRunStatuses: Map; + resultReferenceResolver: ResultReferenceResolver; + sessionBridge: ChatSessionBridge; + sessionMetadataStore: SessionMetadataStore; + sessionUiStateStore: SessionUiStateStore; +}; + +const toSessionUiStateContext = (sessionId: string) => ({ + sessionId, +}); + +export const registerChatAuxiliaryRoutes = ( + chatRouter: Router, + { + activeRuns, + lastRunStatuses, + resultReferenceResolver, + sessionBridge, + sessionMetadataStore, + sessionUiStateStore, + }: RegisterAuxiliaryRoutesOptions, +) => { + chatRouter.get("/render-ref/:renderRef", async (req, res) => { + const renderRef = req.params.renderRef?.trim(); + const userId = req.header("x-user-id")?.trim(); + const projectId = req.header("x-project-id") ?? undefined; + const clientSessionId = + typeof req.query.session_id === "string" + ? req.query.session_id.trim() + : undefined; + + if (!userId) { + res.status(400).json({ + message: "x-user-id is required", + }); + return; + } + + if (!renderRef) { + res.status(400).json({ + message: "render_ref is required", + }); + return; + } + + const result = await resultReferenceResolver.getFullAuthorized( + renderRef, + { + actorKey: toActorKey(userId), + clientSessionId, + projectId, + }, + { + expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload, + }, + ); + + if (!result) { + res.status(404).json({ message: "render_ref not found" }); + return; + } + + res.json(result); + }); + + chatRouter.post("/abort", async (req, res) => { + const parsed = abortPayloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const projectId = req.header("x-project-id") ?? undefined; + const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const sessionRecord = await sessionMetadataStore.get( + { actorKey, projectId, projectKey, userId }, + parsed.data.session_id, + ); + const binding = sessionRecord + ? await sessionBridge.abort({ + clientSessionId: sessionRecord.sessionId, + sessionId: sessionRecord.sessionId, + }) + : null; + const run = activeRuns.get(parsed.data.session_id); + if (run && run.status === "running") { + run.status = "aborted"; + lastRunStatuses.set(parsed.data.session_id, "aborted"); + run.controller.abort(); + run.messages = updateLastAssistantMessage(run.messages, (message) => ({ + ...message, + content: + typeof message.content === "string" && message.content.trim() + ? message.content + : "⚠️ **请求已中断**", + isError: true, + progress: completeBackendProgress(message.progress), + todos: cancelBackendTodos(message.todos), + })); + if (sessionRecord) { + const currentState = await sessionUiStateStore.read( + toSessionUiStateContext(sessionRecord.sessionId), + ); + await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), { + sessionId: sessionRecord.sessionId, + isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, + messages: run.messages, + branchGroups: currentState?.branchGroups ?? [], + }); + } + for (const subscriber of run.subscribers) { + subscriber.write("error", { + session_id: parsed.data.session_id, + message: "请求已中断", + }); + subscriber.close(); + } + run.subscribers.clear(); + } + + if (!binding && !run) { + res.status(204).end(); + return; + } + + logger.info( + { + clientSessionId: parsed.data.session_id, + sessionId: binding?.sessionId ?? parsed.data.session_id, + }, + "aborted chat session by client request", + ); + res.status(202).json({ + session_id: parsed.data.session_id, + aborted: true, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat abort failed"); + res.status(500).json({ + message: "chat abort failed", + detail, + }); + } + }); +}; diff --git a/src/routes/chatInteractionRoutes.ts b/src/routes/chatInteractionRoutes.ts new file mode 100644 index 0000000..26a1be9 --- /dev/null +++ b/src/routes/chatInteractionRoutes.ts @@ -0,0 +1,437 @@ +import { type Router } from "express"; +import { z } from "zod"; + +import { logger } from "../logger.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; +import { type SessionMetadataStore } from "../sessions/metadataStore.js"; +import { type SessionUiStateStore } from "../sessions/uiStateStore.js"; +import { toActorKey, toProjectKey } from "../utils/fileStore.js"; +import { + type ActiveRun, + toPermissionStatus, + updateLastAssistantPermission, + updateLastAssistantQuestion, +} from "./chatUiState.js"; + +const permissionReplyPayloadSchema = z.object({ + session_id: z.string().max(128), + reply: z.enum(["once", "always", "reject"]), + message: z.string().max(1000).optional(), +}); + +const questionReplyPayloadSchema = z.object({ + session_id: z.string().max(128), + answers: z.array(z.array(z.string().max(2000))).default([]), +}); + +const questionRejectPayloadSchema = z.object({ + session_id: z.string().max(128), +}); + +type RegisterInteractionRoutesOptions = { + activeRuns: Map; + runtime: OpencodeRuntimeAdapter; + sessionMetadataStore: SessionMetadataStore; + sessionUiStateStore: SessionUiStateStore; +}; + +const toSessionUiStateContext = (sessionId: string) => ({ + sessionId, +}); + +export const registerChatInteractionRoutes = ( + chatRouter: Router, + { + activeRuns, + runtime, + sessionMetadataStore, + sessionUiStateStore, + }: RegisterInteractionRoutesOptions, +) => { + chatRouter.post("/permission/:requestId/reply", async (req, res) => { + const requestId = req.params.requestId?.trim(); + const parsed = permissionReplyPayloadSchema.safeParse(req.body); + if (!requestId) { + res.status(400).json({ message: "request_id is required" }); + return; + } + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const projectId = req.header("x-project-id") ?? undefined; + const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const sessionRecord = await sessionMetadataStore.get( + { actorKey, projectId, projectKey, userId }, + parsed.data.session_id, + ); + if (!sessionRecord) { + res.status(404).json({ message: "session not found" }); + return; + } + + const run = activeRuns.get(sessionRecord.sessionId); + if (!run || run.status !== "running") { + res.status(409).json({ message: "session is not waiting for permissions" }); + return; + } + + const pendingPermission = run.pendingPermissions.get(requestId); + if (!pendingPermission) { + res.status(404).json({ message: "permission request not found" }); + return; + } + const persistPermissionState = async () => { + const currentState = await sessionUiStateStore.read( + toSessionUiStateContext(sessionRecord.sessionId), + ); + await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), { + sessionId: sessionRecord.sessionId, + isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, + messages: run.messages, + branchGroups: currentState?.branchGroups ?? [], + }); + }; + + try { + await runtime.replyPermission({ + requestId, + sessionId: sessionRecord.sessionId, + reply: parsed.data.reply, + message: parsed.data.message, + }); + } catch (error) { + run.messages = updateLastAssistantPermission( + run.messages, + requestId, + (permission) => ({ + ...permission, + status: "error", + error: + error instanceof Error + ? error.message + : "failed to reply permission", + }), + ); + await persistPermissionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist permission error state", + ); + }); + res.status(502).json({ + message: "permission reply failed", + detail: error instanceof Error ? error.message : String(error), + }); + return; + } + + run.pendingPermissions.delete(requestId); + const status = toPermissionStatus(parsed.data.reply); + run.messages = updateLastAssistantPermission( + run.messages, + requestId, + (permission) => ({ + ...permission, + status, + repliedAt: Date.now(), + }), + ); + await persistPermissionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist permission reply state", + ); + }); + for (const subscriber of run.subscribers) { + subscriber.write("permission_response", { + session_id: sessionRecord.sessionId, + request_id: requestId, + reply: parsed.data.reply, + }); + } + + res.status(202).json({ + session_id: sessionRecord.sessionId, + request_id: requestId, + reply: parsed.data.reply, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "permission reply route failed"); + res.status(500).json({ + message: "permission reply route failed", + detail, + }); + } + }); + + chatRouter.post("/question/:requestId/reply", async (req, res) => { + const requestId = req.params.requestId?.trim(); + const parsed = questionReplyPayloadSchema.safeParse(req.body); + if (!requestId) { + res.status(400).json({ message: "request_id is required" }); + return; + } + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const projectId = req.header("x-project-id") ?? undefined; + const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const sessionRecord = await sessionMetadataStore.get( + { actorKey, projectId, projectKey, userId }, + parsed.data.session_id, + ); + if (!sessionRecord) { + res.status(404).json({ message: "session not found" }); + return; + } + + const run = activeRuns.get(sessionRecord.sessionId); + if (!run) { + res.status(409).json({ message: "session is not waiting for questions" }); + return; + } + + const pendingQuestion = run.pendingQuestions.get(requestId); + if (!pendingQuestion) { + res.status(404).json({ message: "question request not found" }); + return; + } + const persistQuestionState = async () => { + const currentState = await sessionUiStateStore.read( + toSessionUiStateContext(sessionRecord.sessionId), + ); + await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), { + sessionId: sessionRecord.sessionId, + isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, + messages: run.messages, + branchGroups: currentState?.branchGroups ?? [], + }); + }; + + try { + await runtime.replyQuestion({ + requestId, + sessionId: sessionRecord.sessionId, + answers: parsed.data.answers, + }); + } catch (error) { + run.messages = updateLastAssistantQuestion( + run.messages, + requestId, + (question) => ({ + ...question, + status: "error", + error: + error instanceof Error + ? error.message + : "failed to reply question", + }), + ); + await persistQuestionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist question error state", + ); + }); + res.status(502).json({ + message: "question reply failed", + detail: error instanceof Error ? error.message : String(error), + }); + return; + } + + run.pendingQuestions.delete(requestId); + run.messages = updateLastAssistantQuestion( + run.messages, + requestId, + (question) => ({ + ...question, + status: "answered", + answers: parsed.data.answers, + repliedAt: Date.now(), + error: undefined, + }), + ); + await persistQuestionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist question reply state", + ); + }); + for (const subscriber of run.subscribers) { + subscriber.write("question_response", { + session_id: pendingQuestion.session_id, + request_id: requestId, + answers: parsed.data.answers, + }); + } + if ( + run.status !== "running" && + run.pendingPermissions.size === 0 && + run.pendingQuestions.size === 0 + ) { + activeRuns.delete(sessionRecord.sessionId); + } + + res.status(202).json({ + session_id: pendingQuestion.session_id, + request_id: requestId, + answers: parsed.data.answers, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "question reply route failed"); + res.status(500).json({ + message: "question reply route failed", + detail, + }); + } + }); + + chatRouter.post("/question/:requestId/reject", async (req, res) => { + const requestId = req.params.requestId?.trim(); + const parsed = questionRejectPayloadSchema.safeParse(req.body); + if (!requestId) { + res.status(400).json({ message: "request_id is required" }); + return; + } + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const projectId = req.header("x-project-id") ?? undefined; + const userId = req.header("x-user-id") ?? undefined; + const actorKey = toActorKey(userId); + const projectKey = toProjectKey(projectId); + const sessionRecord = await sessionMetadataStore.get( + { actorKey, projectId, projectKey, userId }, + parsed.data.session_id, + ); + if (!sessionRecord) { + res.status(404).json({ message: "session not found" }); + return; + } + + const run = activeRuns.get(sessionRecord.sessionId); + if (!run) { + res.status(409).json({ message: "session is not waiting for questions" }); + return; + } + + const pendingQuestion = run.pendingQuestions.get(requestId); + if (!pendingQuestion) { + res.status(404).json({ message: "question request not found" }); + return; + } + const persistQuestionState = async () => { + const currentState = await sessionUiStateStore.read( + toSessionUiStateContext(sessionRecord.sessionId), + ); + await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), { + sessionId: sessionRecord.sessionId, + isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false, + messages: run.messages, + branchGroups: currentState?.branchGroups ?? [], + }); + }; + + try { + await runtime.rejectQuestion({ + requestId, + sessionId: sessionRecord.sessionId, + }); + } catch (error) { + run.messages = updateLastAssistantQuestion( + run.messages, + requestId, + (question) => ({ + ...question, + status: "error", + error: + error instanceof Error + ? error.message + : "failed to reject question", + }), + ); + await persistQuestionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist question error state", + ); + }); + res.status(502).json({ + message: "question reject failed", + detail: error instanceof Error ? error.message : String(error), + }); + return; + } + + run.pendingQuestions.delete(requestId); + run.messages = updateLastAssistantQuestion( + run.messages, + requestId, + (question) => ({ + ...question, + status: "rejected", + repliedAt: Date.now(), + error: undefined, + }), + ); + await persistQuestionState().catch((persistError) => { + logger.warn( + { err: persistError, sessionId: sessionRecord.sessionId }, + "failed to persist question reject state", + ); + }); + for (const subscriber of run.subscribers) { + subscriber.write("question_response", { + session_id: pendingQuestion.session_id, + request_id: requestId, + rejected: true, + }); + } + if ( + run.status !== "running" && + run.pendingPermissions.size === 0 && + run.pendingQuestions.size === 0 + ) { + activeRuns.delete(sessionRecord.sessionId); + } + + res.status(202).json({ + session_id: pendingQuestion.session_id, + request_id: requestId, + rejected: true, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "question reject route failed"); + res.status(500).json({ + message: "question reject route failed", + detail, + }); + } + }); +}; diff --git a/src/routes/chatStream.ts b/src/routes/chatStream.ts index 3375421..0a93c96 100644 --- a/src/routes/chatStream.ts +++ b/src/routes/chatStream.ts @@ -2,7 +2,56 @@ import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js"; import { logger } from "../logger.js"; -import { type PermissionReply, type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; +import { + type PermissionReply, + type OpencodeRuntimeAdapter, +} from "../runtime/opencode.js"; +import { + buildPermissionDetail, + buildPermissionV2Detail, + buildReasoningProgressDetail, + buildSessionStatusDetail, + buildToolProgressDetail, + collectTextContent, + extractRequestReason, + extractSkillAuditInfo, + getErrorMessage, + getToolProgressTitle, + getUnknownErrorMessage, + hasToolParams, + isPermissionAskedEvent, + isPermissionRepliedEvent, + isPermissionV2AskedEvent, + isPermissionV2RepliedEvent, + isQuestionAskedEvent, + isQuestionRejectedEvent, + isQuestionRepliedEvent, + isQuestionV2AskedEvent, + isQuestionV2RejectedEvent, + isQuestionV2RepliedEvent, + isSessionEvent, + isSkillEvent, + logDevelopmentDebug, + normalizeQuestionAnswers, + normalizeQuestionPayload, + normalizeQuestionToolPayload, + normalizeTodoPriority, + normalizeTodoStatus, + normalizeToolParams, + normalizeToolStatus, + type PermissionRequestPayload, + type QuestionRequestPayload, + type TodoItemPayload, + type TodoUpdatePayload, +} from "./chatStreamEvents.js"; + +export { + collectTextContent, + type PermissionRequestPayload, + type QuestionRequestPayload, + type TodoItemPayload, + type TodoUpdatePayload, +} from "./chatStreamEvents.js"; export const supportedModels = [ "deepseek/deepseek-v4-flash", @@ -35,124 +84,6 @@ type ProgressPayload = { detail?: string; }; -export type PermissionRequestPayload = { - session_id: string; - request_id: string; - permission: string; - patterns: string[]; - metadata: Record; - always: string[]; - tool?: { - messageID: string; - callID: string; - }; - created_at: number; -}; - -const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development"; - -const toolLabels: Record = { - memory_manager: "记忆写入", - session_search: "历史会话检索", - skill_manager: "流程沉淀", - locate_features: "地图定位", - view_history: "历史数据面板", - view_scada: "SCADA 面板", - show_chart: "图表渲染", - render_junctions: "节点渲染", -}; - -const logDevelopmentDebug = ( - message: string, - metadata: Record, -) => { - if (!isDevelopmentDebugLoggingEnabled) { - return; - } - logger.info(metadata, message); -}; - -const getErrorMessage = (error: { - name: string; - data?: { message?: string }; -}) => error.data?.message ?? error.name; - -const getUnknownErrorMessage = (error: unknown) => { - if ( - typeof error === "object" && - error !== null && - "name" in error && - typeof error.name === "string" - ) { - const maybeData = "data" in error ? error.data : undefined; - return getErrorMessage({ - name: error.name, - data: - typeof maybeData === "object" && maybeData !== null && "message" in maybeData - ? { message: typeof maybeData.message === "string" ? maybeData.message : undefined } - : undefined, - }); - } - return error instanceof Error ? error.message : String(error); -}; - -const isObjectRecord = (value: unknown): value is Record => - typeof value === "object" && value !== null && !Array.isArray(value); - -const normalizeToolParams = (value: unknown): Record => { - if (isObjectRecord(value)) { - return value; - } - if (typeof value === "string") { - try { - const parsed = JSON.parse(value) as unknown; - return isObjectRecord(parsed) ? parsed : {}; - } catch { - return {}; - } - } - return {}; -}; - -const extractRequestReason = (params: Record) => { - const candidates = ["reason", "request_reason", "why", "purpose", "rationale"]; - for (const key of candidates) { - const value = params[key]; - if (typeof value === "string") { - const normalized = value.trim(); - if (normalized) { - return normalized; - } - } - } - return ""; -}; - -const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill"); - -const extractSkillAuditInfo = (event: OpencodeEvent) => { - const payload = isObjectRecord(event.properties) - ? (event.properties as Record) - : {}; - const candidateName = - typeof payload.skill === "string" - ? payload.skill - : typeof payload.skillName === "string" - ? payload.skillName - : typeof payload.name === "string" - ? payload.name - : event.type; - const reason = extractRequestReason(payload); - return { - name: candidateName, - reason, - payload, - }; -}; - -const hasToolParams = (params: Record) => - Object.keys(params).length > 0; - const toRuntimeModel = (model?: SupportedModel) => { if (!model) { return undefined; @@ -167,55 +98,6 @@ const toRuntimeModel = (model?: SupportedModel) => { }; }; -const isSessionEvent = (event: OpencodeEvent, sessionId: string) => - "properties" in event && - typeof event.properties === "object" && - event.properties !== null && - "sessionID" in event.properties && - event.properties.sessionID === sessionId; - -const isPermissionAskedEvent = ( - event: OpencodeEvent, -): event is Extract => - event.type === "permission.asked"; - -const isPermissionV2AskedEvent = ( - event: OpencodeEvent, -): event is Extract => - event.type === "permission.v2.asked"; - -const isPermissionRepliedEvent = ( - event: OpencodeEvent, -): event is Extract => - event.type === "permission.replied"; - -const isPermissionV2RepliedEvent = ( - event: OpencodeEvent, -): event is Extract => - event.type === "permission.v2.replied"; - -const buildPermissionDetail = (event: Extract) => { - const patterns = event.properties.patterns.length - ? event.properties.patterns.join(", ") - : event.properties.permission; - return `需要用户确认权限:${event.properties.permission};匹配规则:${patterns}`; -}; - -const buildPermissionV2Detail = ( - event: Extract, -) => { - const resources = event.properties.resources.length - ? event.properties.resources.join(", ") - : event.properties.action; - return `需要用户确认权限:${event.properties.action};资源:${resources}`; -}; - -export const collectTextContent = (parts: Part[]) => - parts - .filter((part): part is Extract => part.type === "text") - .map((part) => part.text) - .join(""); - const emitFallbackMessage = async ( runtime: OpencodeRuntimeAdapter, sessionId: string, @@ -236,111 +118,6 @@ const emitFallbackMessage = async ( } }; -const normalizeToolStatus = (status: string) => { - if (status === "completed") return "completed"; - if (status === "error") return "error"; - return "running"; -}; - -const formatProgressValue = (value: unknown): string => { - if (typeof value === "string") { - return value.length > 120 ? `${value.slice(0, 117)}...` : value; - } - if ( - typeof value === "number" || - typeof value === "boolean" || - value === null || - value === undefined - ) { - return String(value); - } - try { - const serialized = JSON.stringify(value); - return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized; - } catch { - return "[unserializable]"; - } -}; - -const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim(); - -const truncateProgressText = (text: string, maxLength: number) => - text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text; - -const summarizeToolParams = (params: Record) => { - const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]); - const summary = Object.entries(params) - .filter(([key]) => !ignoredKeys.has(key)) - .slice(0, 4) - .map(([key, value]) => `${key}=${formatProgressValue(value)}`) - .join(", "); - - return summary || "无附加参数"; -}; - -const buildSessionStatusDetail = (status: { type: string; message?: string }) => { - if (status.type === "retry") { - return status.message - ? `模型请求需要重试,原因:${status.message}` - : "模型请求正在重试,等待下一次响应。"; - } - if (status.type === "busy") { - return status.message - ? `Agent 正在处理中:${status.message}` - : "Agent 正在执行推理、工具调用或结果整理。"; - } - if (status.type === "idle") { - return status.message - ? `Agent 已空闲:${status.message}` - : "当前会话暂时没有待处理任务。"; - } - return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`; -}; - -const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => { - const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800); - if (ended) { - return reasoningText - ? `推理过程:${reasoningText}` - : "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。"; - } - return reasoningText - ? `正在推理:${reasoningText}` - : "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。"; -}; - -const buildToolProgressDetail = ( - tool: string, - status: string, - params: Record, - reason: string, - error?: string, -) => { - const toolName = toolLabels[tool] ?? tool; - const reasonText = reason ? `;调用原因:${reason}` : ""; - const paramsText = `;关键参数:${summarizeToolParams(params)}`; - - if (status === "error") { - const errorText = error ? `;错误:${error}` : ""; - return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`; - } - if (status === "completed") { - return `${toolName} 已执行完成${reasonText}${paramsText}`; - } - if (status === "pending") { - return `${toolName} 已进入待执行状态${reasonText}${paramsText}`; - } - return `${toolName} 正在执行${reasonText}${paramsText}`; -}; - -const getToolProgressTitle = (tool: string, status: string) => { - const toolName = toolLabels[tool] ?? tool; - if (status === "completed") return `${toolName} 已完成`; - if (status === "error") return `${toolName} 执行失败`; - if (status === "pending") return `准备调用 ${toolName}`; - return `正在调用 ${toolName}`; -}; - export const streamPromptResponse = async ({ runtime, sessionId, @@ -364,6 +141,8 @@ export const streamPromptResponse = async ({ const progressStartedAtMap = new Map(); const finalizedProgressIds = new Set(); const emittedToolParts = new Set(); + const emittedQuestionToolParts = new Set(); + const emittedQuestionRequestIds = new Set(); const partTypes = new Map(); const pendingPartTextDeltas = new Map(); const reasoningDeltas = new Map(); @@ -734,6 +513,76 @@ export const streamPromptResponse = async ({ continue; } + if (isQuestionAskedEvent(event) || isQuestionV2AskedEvent(event)) { + sawResponseActivity = true; + logDevelopmentDebug("question request received", { + ...debugContext, + requestId: event.properties.id, + questionCount: event.properties.questions.length, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + emitProgress({ + id: `question-${event.properties.id}`, + phase: "question", + status: "running", + title: "等待用户补充信息", + detail: event.properties.questions + .map((question) => question.question) + .join("\n"), + }); + const payload = normalizeQuestionPayload(event, clientSessionId); + emittedQuestionRequestIds.add(payload.request_id); + write("question_request", payload); + continue; + } + + if (isQuestionRepliedEvent(event) || isQuestionV2RepliedEvent(event)) { + sawResponseActivity = true; + logDevelopmentDebug("question request replied", { + ...debugContext, + requestId: event.properties.requestID, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + emitProgress({ + id: `question-${event.properties.requestID}`, + phase: "question", + status: "completed", + title: "已收到补充信息", + detail: normalizeQuestionAnswers(event.properties.answers) + .map((answer) => answer.join("、")) + .filter(Boolean) + .join("\n"), + }); + write("question_response", { + session_id: clientSessionId, + request_id: event.properties.requestID, + answers: normalizeQuestionAnswers(event.properties.answers), + }); + continue; + } + + if (isQuestionRejectedEvent(event) || isQuestionV2RejectedEvent(event)) { + sawResponseActivity = true; + logDevelopmentDebug("question request rejected", { + ...debugContext, + requestId: event.properties.requestID, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + emitProgress({ + id: `question-${event.properties.requestID}`, + phase: "question", + status: "completed", + title: "已跳过补充信息", + detail: "用户选择跳过本次补充信息。", + }); + write("question_response", { + session_id: clientSessionId, + request_id: event.properties.requestID, + rejected: true, + }); + continue; + } + if (isSkillEvent(event)) { sawResponseActivity = true; const { name, reason, payload } = extractSkillAuditInfo(event); @@ -882,6 +731,36 @@ export const streamPromptResponse = async ({ }); } + const questionToolPayload = normalizeQuestionToolPayload( + part, + toolParams, + clientSessionId, + ); + if (questionToolPayload) { + if (!emittedQuestionToolParts.has(part.id)) { + emittedQuestionToolParts.add(part.id); + emittedQuestionRequestIds.add(questionToolPayload.request_id); + logDevelopmentDebug("question tool request received", { + ...debugContext, + requestId: questionToolPayload.request_id, + tool: part.tool, + questionCount: questionToolPayload.questions.length, + elapsedMs: Math.max(0, Date.now() - requestStartedAt), + }); + emitProgress({ + id: `question-${questionToolPayload.request_id}`, + phase: "question", + status: "running", + title: "等待用户补充信息", + detail: questionToolPayload.questions + .map((question) => question.question) + .join("\n"), + }); + write("question_request", questionToolPayload); + } + continue; + } + emitProgress({ id: part.id, phase: "tool", @@ -937,18 +816,35 @@ export const streamPromptResponse = async ({ if (event.type === "todo.updated") { sawResponseActivity = true; - const completed = event.properties.todos.filter( + const todos = event.properties.todos as Array<{ + content: string; + status: string; + priority: string; + }>; + const normalizedTodos = todos.map((todo, index) => ({ + id: `todo-${index}-${todo.content.slice(0, 24)}`, + content: todo.content, + status: normalizeTodoStatus(todo.status), + priority: normalizeTodoPriority(todo.priority), + updated_at: Date.now(), + })); + const completed = todos.filter( (todo) => todo.status === "completed", ).length; emitProgress({ id: "todo-progress", phase: "planning", - status: completed === event.properties.todos.length ? "completed" : "running", - title: `计划进度 ${completed}/${event.properties.todos.length}`, - detail: event.properties.todos + status: completed === todos.length ? "completed" : "running", + title: `计划进度 ${completed}/${todos.length}`, + detail: todos .map((todo) => `${todo.status}: ${todo.content}`) .join("\n"), }); + write("todo_update", { + session_id: clientSessionId, + todos: normalizedTodos, + created_at: Date.now(), + } satisfies TodoUpdatePayload); continue; } diff --git a/src/routes/chatStreamEvents.ts b/src/routes/chatStreamEvents.ts new file mode 100644 index 0000000..e0090b3 --- /dev/null +++ b/src/routes/chatStreamEvents.ts @@ -0,0 +1,457 @@ +import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; + +import { logger } from "../logger.js"; +import { type QuestionAnswers } from "../runtime/opencode.js"; + +export type PermissionRequestPayload = { + session_id: string; + request_id: string; + permission: string; + patterns: string[]; + metadata: Record; + always: string[]; + tool?: { + messageID: string; + callID: string; + }; + created_at: number; +}; + +type QuestionOptionPayload = { + label: string; + description: string; +}; + +type QuestionInfoPayload = { + header: string; + question: string; + options: QuestionOptionPayload[]; + multiple?: boolean; + custom?: boolean; +}; + +export type QuestionRequestPayload = { + session_id: string; + request_id: string; + questions: QuestionInfoPayload[]; + tool?: { + messageID: string; + callID: string; + }; + created_at: number; +}; + +export type TodoItemPayload = { + id: string; + content: string; + status: "pending" | "in_progress" | "completed" | "cancelled"; + priority?: "low" | "medium" | "high"; + created_at?: number; + updated_at?: number; +}; + +export type TodoUpdatePayload = { + session_id: string; + message_id?: string; + todos: TodoItemPayload[]; + created_at: number; +}; + +const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development"; + +const toolLabels: Record = { + memory_manager: "记忆写入", + session_search: "历史会话检索", + skill_manager: "流程沉淀", + locate_features: "地图定位", + view_history: "历史数据面板", + view_scada: "SCADA 面板", + show_chart: "图表渲染", + render_junctions: "节点渲染", +}; + +export const logDevelopmentDebug = ( + message: string, + metadata: Record, +) => { + if (!isDevelopmentDebugLoggingEnabled) { + return; + } + logger.info(metadata, message); +}; + +export const getErrorMessage = (error: { + name: string; + data?: { message?: string }; +}) => error.data?.message ?? error.name; + +export const getUnknownErrorMessage = (error: unknown) => { + if ( + typeof error === "object" && + error !== null && + "name" in error && + typeof error.name === "string" + ) { + const maybeData = "data" in error ? error.data : undefined; + return getErrorMessage({ + name: error.name, + data: + typeof maybeData === "object" && maybeData !== null && "message" in maybeData + ? { message: typeof maybeData.message === "string" ? maybeData.message : undefined } + : undefined, + }); + } + return error instanceof Error ? error.message : String(error); +}; + +export const isObjectRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +export const normalizeToolParams = (value: unknown): Record => { + if (isObjectRecord(value)) { + return value; + } + if (typeof value === "string") { + try { + const parsed = JSON.parse(value) as unknown; + return isObjectRecord(parsed) ? parsed : {}; + } catch { + return {}; + } + } + return {}; +}; + +export const extractRequestReason = (params: Record) => { + const candidates = ["reason", "request_reason", "why", "purpose", "rationale"]; + for (const key of candidates) { + const value = params[key]; + if (typeof value === "string") { + const normalized = value.trim(); + if (normalized) { + return normalized; + } + } + } + return ""; +}; + +export const isSkillEvent = (event: OpencodeEvent) => + event.type.toLowerCase().includes("skill"); + +export const extractSkillAuditInfo = (event: OpencodeEvent) => { + const payload = isObjectRecord(event.properties) + ? (event.properties as Record) + : {}; + const candidateName = + typeof payload.skill === "string" + ? payload.skill + : typeof payload.skillName === "string" + ? payload.skillName + : typeof payload.name === "string" + ? payload.name + : event.type; + const reason = extractRequestReason(payload); + return { + name: candidateName, + reason, + payload, + }; +}; + +export const hasToolParams = (params: Record) => + Object.keys(params).length > 0; + +export const isSessionEvent = (event: OpencodeEvent, sessionId: string) => + "properties" in event && + typeof event.properties === "object" && + event.properties !== null && + "sessionID" in event.properties && + event.properties.sessionID === sessionId; + +export const isPermissionAskedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "permission.asked"; + +export const isPermissionV2AskedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "permission.v2.asked"; + +export const isPermissionRepliedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "permission.replied"; + +export const isPermissionV2RepliedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "permission.v2.replied"; + +export const isQuestionAskedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.asked"; + +export const isQuestionV2AskedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.v2.asked"; + +export const isQuestionRepliedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.replied"; + +export const isQuestionV2RepliedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.v2.replied"; + +export const isQuestionRejectedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.rejected"; + +export const isQuestionV2RejectedEvent = ( + event: OpencodeEvent, +): event is Extract => + event.type === "question.v2.rejected"; + +export const buildPermissionDetail = ( + event: Extract, +) => { + const patterns = event.properties.patterns.length + ? event.properties.patterns.join(", ") + : event.properties.permission; + return `需要用户确认权限:${event.properties.permission};匹配规则:${patterns}`; +}; + +export const buildPermissionV2Detail = ( + event: Extract, +) => { + const resources = event.properties.resources.length + ? event.properties.resources.join(", ") + : event.properties.action; + return `需要用户确认权限:${event.properties.action};资源:${resources}`; +}; + +export const normalizeQuestionPayload = ( + event: Extract, + clientSessionId: string, +): QuestionRequestPayload => ({ + session_id: clientSessionId, + request_id: event.properties.id, + questions: event.properties.questions.map((question) => ({ + header: question.header, + question: question.question, + options: question.options.map((option) => ({ + label: option.label, + description: option.description, + })), + multiple: question.multiple, + custom: question.custom, + })), + tool: event.properties.tool, + created_at: Date.now(), +}); + +export const normalizeQuestionAnswers = (answers: QuestionAnswers | undefined) => + Array.isArray(answers) + ? answers.map((answer) => + Array.isArray(answer) + ? answer.filter((item): item is string => typeof item === "string") + : [], + ) + : []; + +const questionToolNames = new Set(["question", "request_user_input"]); + +const normalizeQuestionOptions = (value: unknown): QuestionOptionPayload[] => + Array.isArray(value) + ? value.filter(isObjectRecord).map((option) => ({ + label: typeof option.label === "string" ? option.label : "", + description: + typeof option.description === "string" ? option.description : "", + })).filter((option) => option.label.trim().length > 0) + : []; + +const normalizeToolQuestionInfo = (value: unknown): QuestionInfoPayload | undefined => { + if (!isObjectRecord(value) || typeof value.question !== "string") { + return undefined; + } + const question = value.question.trim(); + if (!question) { + return undefined; + } + return { + header: + typeof value.header === "string" && value.header.trim() + ? value.header + : "补充信息", + question, + options: normalizeQuestionOptions(value.options), + multiple: typeof value.multiple === "boolean" ? value.multiple : undefined, + custom: typeof value.custom === "boolean" ? value.custom : undefined, + }; +}; + +export const normalizeQuestionToolPayload = ( + part: Extract, + params: Record, + clientSessionId: string, +): QuestionRequestPayload | undefined => { + if (!questionToolNames.has(part.tool)) { + return undefined; + } + const questions = Array.isArray(params.questions) + ? params.questions + .map(normalizeToolQuestionInfo) + .filter((question): question is QuestionInfoPayload => Boolean(question)) + : []; + if (questions.length === 0) { + return undefined; + } + return { + session_id: clientSessionId, + request_id: part.callID || part.id, + questions, + tool: { + messageID: part.messageID, + callID: part.callID, + }, + created_at: Date.now(), + }; +}; + +export const normalizeTodoStatus = (status: string): TodoItemPayload["status"] => { + if (status === "in_progress" || status === "completed" || status === "cancelled") { + return status; + } + return "pending"; +}; + +export const normalizeTodoPriority = ( + priority: string, +): TodoItemPayload["priority"] | undefined => { + if (priority === "low" || priority === "medium" || priority === "high") { + return priority; + } + return undefined; +}; + +export const collectTextContent = (parts: Part[]) => + parts + .filter((part): part is Extract => part.type === "text") + .map((part) => part.text) + .join(""); + +export const normalizeToolStatus = (status: string) => { + if (status === "completed") return "completed"; + if (status === "error") return "error"; + return "running"; +}; + +const formatProgressValue = (value: unknown): string => { + if (typeof value === "string") { + return value.length > 120 ? `${value.slice(0, 117)}...` : value; + } + if ( + typeof value === "number" || + typeof value === "boolean" || + value === null || + value === undefined + ) { + return String(value); + } + try { + const serialized = JSON.stringify(value); + return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized; + } catch { + return "[unserializable]"; + } +}; + +const normalizeProgressText = (chunks: string[]) => + chunks.join("").replace(/\s+/g, " ").trim(); + +const truncateProgressText = (text: string, maxLength: number) => + text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text; + +const summarizeToolParams = (params: Record) => { + const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]); + const summary = Object.entries(params) + .filter(([key]) => !ignoredKeys.has(key)) + .slice(0, 4) + .map(([key, value]) => `${key}=${formatProgressValue(value)}`) + .join(", "); + + return summary || "无附加参数"; +}; + +export const buildSessionStatusDetail = (status: { type: string; message?: string }) => { + if (status.type === "retry") { + return status.message + ? `模型请求需要重试,原因:${status.message}` + : "模型请求正在重试,等待下一次响应。"; + } + if (status.type === "busy") { + return status.message + ? `Agent 正在处理中:${status.message}` + : "Agent 正在执行推理、工具调用或结果整理。"; + } + if (status.type === "idle") { + return status.message + ? `Agent 已空闲:${status.message}` + : "当前会话暂时没有待处理任务。"; + } + return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`; +}; + +export const buildReasoningProgressDetail = ( + chunks: string[], + ended?: string | number | Date | null, +) => { + const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800); + if (ended) { + return reasoningText + ? `推理过程:${reasoningText}` + : "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。"; + } + return reasoningText + ? `正在推理:${reasoningText}` + : "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。"; +}; + +export const buildToolProgressDetail = ( + tool: string, + status: string, + params: Record, + reason: string, + error?: string, +) => { + const toolName = toolLabels[tool] ?? tool; + const reasonText = reason ? `;调用原因:${reason}` : ""; + const paramsText = `;关键参数:${summarizeToolParams(params)}`; + + if (status === "error") { + const errorText = error ? `;错误:${error}` : ""; + return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`; + } + if (status === "completed") { + return `${toolName} 已执行完成${reasonText}${paramsText}`; + } + if (status === "pending") { + return `${toolName} 已进入待执行状态${reasonText}${paramsText}`; + } + return `${toolName} 正在执行${reasonText}${paramsText}`; +}; + +export const getToolProgressTitle = (tool: string, status: string) => { + const toolName = toolLabels[tool] ?? tool; + if (status === "completed") return `${toolName} 已完成`; + if (status === "error") return `${toolName} 执行失败`; + if (status === "pending") return `准备调用 ${toolName}`; + return `正在调用 ${toolName}`; +}; diff --git a/src/routes/chatUiState.ts b/src/routes/chatUiState.ts new file mode 100644 index 0000000..d8e7fe3 --- /dev/null +++ b/src/routes/chatUiState.ts @@ -0,0 +1,308 @@ +import { type PermissionReply } from "../runtime/opencode.js"; +import { + type PermissionRequestPayload, + type QuestionRequestPayload, + type TodoUpdatePayload, +} from "./chatStream.js"; + +export type RunStatus = "running" | "completed" | "error" | "aborted"; + +export type StreamSubscriber = { + write: (event: string, data: Record) => void; + close: () => void; +}; + +export type ActiveRun = { + clientSessionId: string; + controller: AbortController; + messages: unknown[]; + pendingPermissions: Map; + pendingQuestions: Map; + status: RunStatus; + subscribers: Set; +}; + +export const isObjectRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); + +const createFrontendMessageId = () => + `msg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; + +export const createInitialStreamingMessages = ( + existingMessages: unknown[], + userContent: string, +) => { + const userMessage = { + id: createFrontendMessageId(), + role: "user", + content: userContent, + }; + return [ + ...existingMessages, + { + ...userMessage, + branchRootId: userMessage.id, + }, + { + id: createFrontendMessageId(), + role: "assistant", + content: "", + progress: [ + { + id: "request-received", + phase: "start", + status: "running", + title: "已收到请求,正在启动 Agent 分析", + detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。", + startedAt: Date.now(), + elapsedMs: 0, + elapsedSnapshotAt: Date.now(), + }, + ], + }, + ]; +}; + +export const countFrontendUserMessages = (messages: unknown[]) => + messages.filter( + (message) => isObjectRecord(message) && message.role === "user", + ).length; + +export const pruneBranchGroupsForMessageIndex = ( + branchGroups: unknown[], + messageIndex: number | undefined, +) => { + if (messageIndex === undefined) { + return branchGroups; + } + return branchGroups.filter( + (group) => + !isObjectRecord(group) || + typeof group.parentCount !== "number" || + group.parentCount < messageIndex, + ); +}; + +export const upsertBackendProgress = ( + progress: unknown, + payload: Record, +) => { + const next = Array.isArray(progress) ? [...progress] : []; + const id = typeof payload.id === "string" ? payload.id : `progress-${Date.now()}`; + const index = next.findIndex((item) => isObjectRecord(item) && item.id === id); + const nextItem = { + id, + phase: typeof payload.phase === "string" ? payload.phase : "progress", + status: + payload.status === "completed" || payload.status === "error" + ? payload.status + : "running", + title: typeof payload.title === "string" ? payload.title : "正在处理", + detail: typeof payload.detail === "string" ? payload.detail : undefined, + startedAt: typeof payload.started_at === "number" ? payload.started_at : undefined, + endedAt: typeof payload.ended_at === "number" ? payload.ended_at : undefined, + elapsedMs: typeof payload.elapsed_ms === "number" ? payload.elapsed_ms : undefined, + elapsedSnapshotAt: + typeof payload.elapsed_ms === "number" ? Date.now() : undefined, + durationMs: typeof payload.duration_ms === "number" ? payload.duration_ms : undefined, + }; + if (index >= 0) { + next[index] = nextItem; + } else { + next.push(nextItem); + } + return next; +}; + +export const completeBackendProgress = (progress: unknown) => + Array.isArray(progress) + ? progress.map((item) => { + if (!isObjectRecord(item) || item.status !== "running") { + return item; + } + const endedAt = Date.now(); + const startedAt = typeof item.startedAt === "number" ? item.startedAt : undefined; + return { + ...item, + status: "completed", + endedAt, + elapsedMs: undefined, + elapsedSnapshotAt: undefined, + durationMs: + typeof item.durationMs === "number" + ? item.durationMs + : startedAt !== undefined + ? Math.max(0, endedAt - startedAt) + : item.elapsedMs, + }; + }) + : progress; + +export const cancelBackendTodos = (todos: unknown) => + Array.isArray(todos) + ? todos.map((todoUpdate) => { + if (!isObjectRecord(todoUpdate) || !Array.isArray(todoUpdate.todos)) { + return todoUpdate; + } + return { + ...todoUpdate, + todos: todoUpdate.todos.map((todo) => { + if (!isObjectRecord(todo)) { + return todo; + } + if (todo.status !== "pending" && todo.status !== "in_progress") { + return todo; + } + return { + ...todo, + status: "cancelled", + updatedAt: Date.now(), + }; + }), + }; + }) + : todos; + +export const updateLastAssistantMessage = ( + messages: unknown[], + updater: (message: Record) => Record, +) => { + for (let index = messages.length - 1; index >= 0; index -= 1) { + const message = messages[index]; + if (isObjectRecord(message) && message.role === "assistant") { + const next = [...messages]; + next[index] = updater(message); + return next; + } + } + return messages; +}; + +export const updateLastAssistantPermission = ( + messages: unknown[], + requestId: string, + updater: (permission: Record) => Record, +) => + updateLastAssistantMessage(messages, (message) => { + const permissions = Array.isArray(message.permissions) + ? message.permissions + : []; + return { + ...message, + permissions: permissions.map((permission) => + isObjectRecord(permission) && permission.requestId === requestId + ? updater(permission) + : permission, + ), + }; + }); + +export const updateLastAssistantQuestion = ( + messages: unknown[], + requestId: string, + updater: (question: Record) => Record, +) => + updateLastAssistantMessage(messages, (message) => { + const questions = Array.isArray(message.questions) + ? message.questions + : []; + return { + ...message, + questions: questions.map((question) => + isObjectRecord(question) && question.requestId === requestId + ? updater(question) + : question, + ), + }; + }); + +export const toFrontendPermission = ( + payload: PermissionRequestPayload, + status: "pending" | "approved_once" | "approved_always" | "rejected" | "error" = "pending", +) => ({ + requestId: payload.request_id, + sessionId: payload.session_id, + permission: payload.permission, + patterns: payload.patterns, + metadata: payload.metadata, + always: payload.always, + tool: payload.tool, + createdAt: payload.created_at, + status, +}); + +const toFrontendQuestion = ( + payload: QuestionRequestPayload, + status: "pending" | "submitting" | "answered" | "rejected" | "error" = "pending", +) => ({ + requestId: payload.request_id, + sessionId: payload.session_id, + questions: payload.questions, + tool: payload.tool, + createdAt: payload.created_at, + status, +}); + +export const toPermissionStatus = (reply: PermissionReply) => { + if (reply === "always") return "approved_always"; + if (reply === "once") return "approved_once"; + return "rejected"; +}; + +export const upsertBackendQuestion = ( + questions: unknown, + payload: QuestionRequestPayload, +) => { + const next = Array.isArray(questions) ? [...questions] : []; + const index = next.findIndex((item) => { + if (!isObjectRecord(item)) return false; + if (item.requestId === payload.request_id) return true; + const tool = isObjectRecord(item.tool) ? item.tool : undefined; + return Boolean( + payload.tool?.callID && + tool?.callID === payload.tool.callID, + ); + }); + const nextItem = toFrontendQuestion(payload); + if (index >= 0) { + const current = next[index]; + const currentRequestId = isObjectRecord(current) ? current.requestId : undefined; + const currentTool = isObjectRecord(current) && isObjectRecord(current.tool) + ? current.tool + : undefined; + const currentIsActionable = + typeof currentRequestId === "string" && + currentRequestId !== currentTool?.callID; + const payloadIsToolPlaceholder = + Boolean(payload.tool?.callID) && payload.request_id === payload.tool?.callID; + next[index] = { + ...(isObjectRecord(current) ? current : {}), + ...(currentIsActionable && payloadIsToolPlaceholder + ? { + questions: nextItem.questions, + tool: nextItem.tool, + createdAt: nextItem.createdAt, + } + : nextItem), + status: + isObjectRecord(current) && current.status === "submitting" + ? "submitting" + : nextItem.status, + }; + } else { + next.push(nextItem); + } + return next; +}; + +export const upsertBackendTodoUpdate = ( + _todos: unknown, + payload: TodoUpdatePayload, +) => [ + { + sessionId: payload.session_id, + messageId: payload.message_id, + todos: payload.todos, + createdAt: payload.created_at, + }, +]; diff --git a/src/runtime/opencode.ts b/src/runtime/opencode.ts index 15ae31c..d63185e 100644 --- a/src/runtime/opencode.ts +++ b/src/runtime/opencode.ts @@ -32,6 +32,7 @@ type RuntimeModelOverride = { }; export type PermissionReply = "once" | "always" | "reject"; +export type QuestionAnswers = string[][]; type RuntimeMessage = { info: { @@ -135,6 +136,13 @@ export class OpencodeRuntimeAdapter { const targetUserMessage = userMessages[options.userOrdinal - 1]; if (!targetUserMessage) { + if (messages.length === 0 && options.userOrdinal === 1) { + logger.warn( + { sessionId, userOrdinal: options.userOrdinal }, + "skipping opencode revert because runtime session has no messages", + ); + return; + } throw new Error("target user message not found to revert"); } @@ -221,6 +229,96 @@ export class OpencodeRuntimeAdapter { throw new Error("opencode permission reply API is unavailable"); } + async replyQuestion(options: { + requestId: string; + sessionId?: string; + answers: QuestionAnswers; + }) { + const client = await this.ensureClient(); + if ("question" in client && client.question?.reply) { + try { + const response = await client.question.reply({ + requestID: options.requestId, + answers: options.answers, + }); + return response.data; + } catch (error) { + if (!options.sessionId) { + throw error; + } + } + } + + const v2Question = (client as unknown as { + v2?: { + session?: { + question?: { + reply?: (parameters: { + sessionID: string; + requestID: string; + questionV2Reply: { answers: QuestionAnswers }; + }) => Promise<{ data: unknown }>; + }; + }; + }; + }).v2?.session?.question; + + if (v2Question?.reply && options.sessionId) { + const response = await v2Question.reply({ + sessionID: options.sessionId, + requestID: options.requestId, + questionV2Reply: { + answers: options.answers, + }, + }); + return response.data; + } + + throw new Error("opencode question reply API is unavailable"); + } + + async rejectQuestion(options: { + requestId: string; + sessionId?: string; + }) { + const client = await this.ensureClient(); + if ("question" in client && client.question?.reject) { + try { + const response = await client.question.reject({ + requestID: options.requestId, + }); + return response.data; + } catch (error) { + if (!options.sessionId) { + throw error; + } + } + } + + const v2Question = (client as unknown as { + v2?: { + session?: { + question?: { + reject?: (parameters: { + sessionID: string; + requestID: string; + }) => Promise<{ data: unknown }>; + }; + }; + }; + }).v2?.session?.question; + + if (v2Question?.reject && options.sessionId) { + const response = await v2Question.reject({ + sessionID: options.sessionId, + requestID: options.requestId, + }); + return response.data; + } + + throw new Error("opencode question reject API is unavailable"); + } + async dispose(): Promise { this.closeServer?.(); this.closeServer = null; diff --git a/tests/routes/chatStream.test.ts b/tests/routes/chatStream.test.ts index 875cbcb..521e74b 100644 --- a/tests/routes/chatStream.test.ts +++ b/tests/routes/chatStream.test.ts @@ -161,4 +161,217 @@ describe("streamPromptResponse", () => { always: ["/tmp"], } satisfies Partial); }); + + it("forwards opencode question requests and replies as SSE payloads", async () => { + const runtime = { + subscribeEvents: async () => + createEventStream([ + { + type: "question.asked", + properties: { + id: "question-1", + sessionID: "runtime-session-1", + questions: [ + { + header: "范围", + question: "选择分析范围", + options: [{ label: "城区", description: "中心城区" }], + multiple: false, + custom: true, + }, + ], + }, + }, + { + type: "question.replied", + properties: { + sessionID: "runtime-session-1", + requestID: "question-1", + answers: [["城区", "补充说明"]], + }, + }, + { + type: "session.idle", + properties: { + sessionID: "runtime-session-1", + }, + }, + ]), + prompt: async () => undefined, + messages: async () => [], + } as unknown as OpencodeRuntimeAdapter; + const events: Array<{ event: string; data: Record }> = []; + + await streamPromptResponse({ + runtime, + sessionId: "runtime-session-1", + clientSessionId: "client-session-1", + message: "ask", + write: (event, data) => events.push({ event, data }), + }); + + expect(events.find((item) => item.event === "question_request")?.data).toMatchObject({ + session_id: "client-session-1", + request_id: "question-1", + questions: [ + { + header: "范围", + question: "选择分析范围", + options: [{ label: "城区", description: "中心城区" }], + multiple: false, + custom: true, + }, + ], + }); + expect(events.find((item) => item.event === "question_response")?.data).toEqual({ + session_id: "client-session-1", + request_id: "question-1", + answers: [["城区", "补充说明"]], + }); + }); + + it("converts question tool parts into question request SSE payloads", async () => { + const runtime = { + subscribeEvents: async () => + createEventStream([ + { + type: "message.part.updated", + properties: { + sessionID: "runtime-session-1", + part: { + id: "tool-part-1", + sessionID: "runtime-session-1", + messageID: "message-1", + type: "tool", + callID: "call-1", + tool: "question", + state: { + status: "running", + input: { + questions: [ + { + question: "你觉得这个 question 工具好用吗?", + header: "测试问题", + options: [ + { + label: "非常好用", + description: "交互清晰,选项方便", + }, + ], + }, + ], + }, + time: { start: Date.now() }, + }, + }, + time: Date.now(), + }, + }, + { + type: "session.idle", + properties: { + sessionID: "runtime-session-1", + }, + }, + ]), + prompt: async () => undefined, + messages: async () => [], + } as unknown as OpencodeRuntimeAdapter; + const events: Array<{ event: string; data: Record }> = []; + + await streamPromptResponse({ + runtime, + sessionId: "runtime-session-1", + clientSessionId: "client-session-1", + message: "ask", + write: (event, data) => events.push({ event, data }), + }); + + expect(events.find((item) => item.event === "question_request")?.data).toMatchObject({ + session_id: "client-session-1", + request_id: "call-1", + questions: [ + { + header: "测试问题", + question: "你觉得这个 question 工具好用吗?", + options: [ + { + label: "非常好用", + description: "交互清晰,选项方便", + }, + ], + }, + ], + tool: { + messageID: "message-1", + callID: "call-1", + }, + }); + expect( + events.some( + (item) => item.event === "tool_call" && item.data.tool === "question", + ), + ).toBe(false); + }); + + it("forwards todo updates as structured SSE payloads and progress", async () => { + const runtime = { + subscribeEvents: async () => + createEventStream([ + { + type: "todo.updated", + properties: { + sessionID: "runtime-session-1", + todos: [ + { content: "分析水位", status: "completed", priority: "high" }, + { content: "生成建议", status: "in_progress", priority: "medium" }, + ], + }, + }, + { + type: "session.idle", + properties: { + sessionID: "runtime-session-1", + }, + }, + ]), + prompt: async () => undefined, + messages: async () => [], + } as unknown as OpencodeRuntimeAdapter; + const events: Array<{ event: string; data: Record }> = []; + + await streamPromptResponse({ + runtime, + sessionId: "runtime-session-1", + clientSessionId: "client-session-1", + message: "plan", + write: (event, data) => events.push({ event, data }), + }); + + expect( + events.find( + (item) => item.event === "progress" && item.data.id === "todo-progress", + )?.data, + ).toMatchObject({ + id: "todo-progress", + phase: "planning", + title: "计划进度 1/2", + }); + expect(events.find((item) => item.event === "todo_update")?.data).toMatchObject({ + session_id: "client-session-1", + todos: [ + expect.objectContaining({ + content: "分析水位", + status: "completed", + priority: "high", + }), + expect.objectContaining({ + content: "生成建议", + status: "in_progress", + priority: "medium", + }), + ], + }); + }); + }); diff --git a/tests/routes/chatUiState.test.ts b/tests/routes/chatUiState.test.ts new file mode 100644 index 0000000..d07877e --- /dev/null +++ b/tests/routes/chatUiState.test.ts @@ -0,0 +1,127 @@ +import { describe, expect, it } from "bun:test"; + +import { + cancelBackendTodos, + upsertBackendQuestion, +} from "../../src/routes/chatUiState.js"; + +describe("upsertBackendQuestion", () => { + it("replaces a tool-call placeholder with the actionable question request", () => { + const questions = upsertBackendQuestion( + [ + { + requestId: "call-1", + sessionId: "session-1", + questions: [ + { + header: "测试问题", + question: "你觉得这个 question 工具好用吗?", + options: [{ label: "非常好用", description: "交互清晰,选项方便" }], + }, + ], + tool: { messageID: "message-1", callID: "call-1" }, + createdAt: 123, + status: "pending", + }, + ], + { + session_id: "session-1", + request_id: "question-1", + questions: [ + { + header: "测试问题", + question: "你觉得这个 question 工具好用吗?", + options: [{ label: "非常好用", description: "交互清晰,选项方便" }], + }, + ], + tool: { messageID: "message-1", callID: "call-1" }, + created_at: 456, + }, + ); + + expect(questions).toHaveLength(1); + expect(questions[0]).toMatchObject({ + requestId: "question-1", + tool: { callID: "call-1" }, + status: "pending", + }); + }); + + it("does not replace an actionable question request with a later tool-call placeholder", () => { + const questions = upsertBackendQuestion( + [ + { + requestId: "question-1", + sessionId: "session-1", + questions: [ + { + header: "测试问题", + question: "你觉得这个 question 工具好用吗?", + options: [{ label: "非常好用", description: "交互清晰,选项方便" }], + }, + ], + tool: { messageID: "message-1", callID: "call-1" }, + createdAt: 123, + status: "pending", + }, + ], + { + session_id: "session-1", + request_id: "call-1", + questions: [ + { + header: "测试问题", + question: "你觉得这个 question 工具好用吗?", + options: [{ label: "非常好用", description: "交互清晰,选项方便" }], + }, + ], + tool: { messageID: "message-1", callID: "call-1" }, + created_at: 456, + }, + ); + + expect(questions).toHaveLength(1); + expect(questions[0]).toMatchObject({ + requestId: "question-1", + tool: { callID: "call-1" }, + status: "pending", + }); + }); +}); + +describe("cancelBackendTodos", () => { + it("marks pending and in-progress todos as cancelled", () => { + const cancelled = cancelBackendTodos([ + { + sessionId: "session-1", + todos: [ + { id: "todo-1", content: "分析水位", status: "in_progress" }, + { id: "todo-2", content: "生成建议", status: "pending" }, + { id: "todo-3", content: "完成报告", status: "completed" }, + ], + createdAt: 123, + }, + ]); + + expect(cancelled).toEqual([ + expect.objectContaining({ + todos: [ + expect.objectContaining({ + id: "todo-1", + status: "cancelled", + updatedAt: expect.any(Number), + }), + expect.objectContaining({ + id: "todo-2", + status: "cancelled", + updatedAt: expect.any(Number), + }), + expect.objectContaining({ + id: "todo-3", + status: "completed", + }), + ], + }), + ]); + }); +}); diff --git a/tests/runtime/opencode.test.ts b/tests/runtime/opencode.test.ts new file mode 100644 index 0000000..ffd9a0e --- /dev/null +++ b/tests/runtime/opencode.test.ts @@ -0,0 +1,62 @@ +import { describe, expect, it } from "bun:test"; + +import { OpencodeRuntimeAdapter } from "../../src/runtime/opencode.js"; + +const createRuntimeAdapter = ( + messages: unknown[], + calls: { + reverted: string[]; + removed: string[]; + } = { reverted: [], removed: [] }, +) => + Object.assign(Object.create(OpencodeRuntimeAdapter.prototype), { + messages: async () => messages, + revertMessage: async (_sessionId: string, messageId: string) => { + calls.reverted.push(messageId); + }, + removeMessage: async (_sessionId: string, messageId: string) => { + calls.removed.push(messageId); + }, + }) as OpencodeRuntimeAdapter; + +describe("OpencodeRuntimeAdapter.revertToUserMessage", () => { + it("skips reverting the first user message when the runtime session is empty", async () => { + const calls = { reverted: [] as string[], removed: [] as string[] }; + const runtime = createRuntimeAdapter([], calls); + + await runtime.revertToUserMessage("session-1", { userOrdinal: 1 }); + + expect(calls).toEqual({ reverted: [], removed: [] }); + }); + + it("keeps ordinal mismatches visible when runtime messages exist", async () => { + const runtime = createRuntimeAdapter([ + { info: { id: "user-1", role: "user" } }, + { info: { id: "assistant-1", role: "assistant" } }, + ]); + + await expect( + runtime.revertToUserMessage("session-1", { userOrdinal: 2 }), + ).rejects.toThrow("target user message not found to revert"); + }); + + it("reverts and removes messages from the target user message onward", async () => { + const calls = { reverted: [] as string[], removed: [] as string[] }; + const runtime = createRuntimeAdapter( + [ + { info: { id: "user-1", role: "user" } }, + { info: { id: "assistant-1", role: "assistant" } }, + { info: { id: "user-2", role: "user" } }, + { info: { id: "assistant-2", role: "assistant" } }, + ], + calls, + ); + + await runtime.revertToUserMessage("session-1", { userOrdinal: 2 }); + + expect(calls).toEqual({ + reverted: ["user-2"], + removed: ["assistant-2", "user-2"], + }); + }); +});