fix(chat): handle question and todo state

This commit is contained in:
2026-06-08 18:10:28 +08:00
parent f20847399a
commit 15c3263369
10 changed files with 2173 additions and 724 deletions
+122 -447
View File
@@ -8,9 +8,7 @@ import { MemoryStore } from "../memory/store.js";
import { type SessionUiStateStore } from "../sessions/uiStateStore.js";
import { type SessionMetadataStore } from "../sessions/metadataStore.js";
import { type ResultReferenceResolver } from "../results/resolver.js";
import { RESULT_REFERENCE_KIND } from "../results/store.js";
import {
type PermissionReply,
type OpencodeRuntimeAdapter,
} from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
@@ -22,13 +20,36 @@ import {
generateSessionTitle,
shouldGenerateSessionTitle,
} from "./chatSession.js";
import { registerChatAuxiliaryRoutes } from "./chatAuxiliaryRoutes.js";
import { registerChatInteractionRoutes } from "./chatInteractionRoutes.js";
import {
collectTextContent,
type PermissionRequestPayload,
type QuestionRequestPayload,
streamPromptResponse,
supportedModels,
type SupportedModel,
type TodoUpdatePayload,
} from "./chatStream.js";
import {
type ActiveRun,
type RunStatus,
type StreamSubscriber,
cancelBackendTodos,
completeBackendProgress,
countFrontendUserMessages,
createInitialStreamingMessages,
isObjectRecord,
pruneBranchGroupsForMessageIndex,
toFrontendPermission,
toPermissionStatus,
updateLastAssistantMessage,
updateLastAssistantPermission,
updateLastAssistantQuestion,
upsertBackendProgress,
upsertBackendQuestion,
upsertBackendTodoUpdate,
} from "./chatUiState.js";
const payloadSchema = z.object({
message: z.string().min(1).max(10000),
@@ -38,16 +59,6 @@ const payloadSchema = z.object({
regenerate_from_message_index: z.coerce.number().int().min(0).optional(),
});
const abortPayloadSchema = z.object({
session_id: z.string().max(128),
});
const permissionReplyPayloadSchema = z.object({
session_id: z.string().max(128),
reply: z.enum(["once", "always", "reject"]),
message: z.string().max(1000).optional(),
});
const createSessionPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
parent_session_id: z.string().max(128).optional(),
@@ -65,22 +76,6 @@ const sessionStateSchema = z.object({
branch_groups: z.array(z.unknown()).default([]),
});
type RunStatus = "running" | "completed" | "error" | "aborted";
type StreamSubscriber = {
write: (event: string, data: Record<string, unknown>) => void;
close: () => void;
};
type ActiveRun = {
clientSessionId: string;
controller: AbortController;
messages: unknown[];
pendingPermissions: Map<string, PermissionRequestPayload>;
status: RunStatus;
subscribers: Set<StreamSubscriber>;
};
const activeRuns = new Map<string, ActiveRun>();
const lastRunStatuses = new Map<string, RunStatus>();
@@ -91,174 +86,6 @@ const toSessionUiStateContext = (sessionRecord: SessionRecord) => ({
const getSessionRunStatus = (sessionId: string) =>
activeRuns.get(sessionId)?.status ?? lastRunStatuses.get(sessionId);
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const createFrontendMessageId = () =>
`msg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
const createInitialStreamingMessages = (existingMessages: unknown[], userContent: string) => {
const userMessage = {
id: createFrontendMessageId(),
role: "user",
content: userContent,
};
return [
...existingMessages,
{
...userMessage,
branchRootId: userMessage.id,
},
{
id: createFrontendMessageId(),
role: "assistant",
content: "",
progress: [
{
id: "request-received",
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。",
startedAt: Date.now(),
elapsedMs: 0,
elapsedSnapshotAt: Date.now(),
},
],
},
];
};
const countFrontendUserMessages = (messages: unknown[]) =>
messages.filter(
(message) => isObjectRecord(message) && message.role === "user",
).length;
const pruneBranchGroupsForMessageIndex = (
branchGroups: unknown[],
messageIndex: number | undefined,
) => {
if (messageIndex === undefined) {
return branchGroups;
}
return branchGroups.filter(
(group) =>
!isObjectRecord(group) ||
typeof group.parentCount !== "number" ||
group.parentCount < messageIndex,
);
};
const upsertBackendProgress = (
progress: unknown,
payload: Record<string, unknown>,
) => {
const next = Array.isArray(progress) ? [...progress] : [];
const id = typeof payload.id === "string" ? payload.id : `progress-${Date.now()}`;
const index = next.findIndex((item) => isObjectRecord(item) && item.id === id);
const nextItem = {
id,
phase: typeof payload.phase === "string" ? payload.phase : "progress",
status:
payload.status === "completed" || payload.status === "error"
? payload.status
: "running",
title: typeof payload.title === "string" ? payload.title : "正在处理",
detail: typeof payload.detail === "string" ? payload.detail : undefined,
startedAt: typeof payload.started_at === "number" ? payload.started_at : undefined,
endedAt: typeof payload.ended_at === "number" ? payload.ended_at : undefined,
elapsedMs: typeof payload.elapsed_ms === "number" ? payload.elapsed_ms : undefined,
elapsedSnapshotAt:
typeof payload.elapsed_ms === "number" ? Date.now() : undefined,
durationMs: typeof payload.duration_ms === "number" ? payload.duration_ms : undefined,
};
if (index >= 0) {
next[index] = nextItem;
} else {
next.push(nextItem);
}
return next;
};
const completeBackendProgress = (progress: unknown) =>
Array.isArray(progress)
? progress.map((item) => {
if (!isObjectRecord(item) || item.status !== "running") {
return item;
}
const endedAt = Date.now();
const startedAt = typeof item.startedAt === "number" ? item.startedAt : undefined;
return {
...item,
status: "completed",
endedAt,
elapsedMs: undefined,
elapsedSnapshotAt: undefined,
durationMs:
typeof item.durationMs === "number"
? item.durationMs
: startedAt !== undefined
? Math.max(0, endedAt - startedAt)
: item.elapsedMs,
};
})
: progress;
const updateLastAssistantMessage = (
messages: unknown[],
updater: (message: Record<string, unknown>) => Record<string, unknown>,
) => {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
if (isObjectRecord(message) && message.role === "assistant") {
const next = [...messages];
next[index] = updater(message);
return next;
}
}
return messages;
};
const updateLastAssistantPermission = (
messages: unknown[],
requestId: string,
updater: (permission: Record<string, unknown>) => Record<string, unknown>,
) =>
updateLastAssistantMessage(messages, (message) => {
const permissions = Array.isArray(message.permissions)
? message.permissions
: [];
return {
...message,
permissions: permissions.map((permission) =>
isObjectRecord(permission) && permission.requestId === requestId
? updater(permission)
: permission,
),
};
});
const toFrontendPermission = (
payload: PermissionRequestPayload,
status: "pending" | "approved_once" | "approved_always" | "rejected" | "error" = "pending",
) => ({
requestId: payload.request_id,
sessionId: payload.session_id,
permission: payload.permission,
patterns: payload.patterns,
metadata: payload.metadata,
always: payload.always,
tool: payload.tool,
createdAt: payload.created_at,
status,
});
const toPermissionStatus = (reply: PermissionReply) => {
if (reply === "always") return "approved_always";
if (reply === "once") return "approved_once";
return "rejected";
};
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
@@ -580,258 +407,20 @@ export const buildChatRouter = (
res.status(204).end();
});
chatRouter.get("/render-ref/:renderRef", async (req, res) => {
const renderRef = req.params.renderRef?.trim();
const userId = req.header("x-user-id")?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const clientSessionId =
typeof req.query.session_id === "string"
? req.query.session_id.trim()
: undefined;
if (!userId) {
res.status(400).json({
message: "x-user-id is required",
});
return;
}
if (!renderRef) {
res.status(400).json({
message: "render_ref is required",
});
return;
}
const result = await resultReferenceResolver.getFullAuthorized(
renderRef,
{
actorKey: toActorKey(userId),
clientSessionId,
projectId,
},
{
expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload,
},
);
if (!result) {
res.status(404).json({ message: "render_ref not found" });
return;
}
res.json(result);
registerChatAuxiliaryRoutes(chatRouter, {
activeRuns,
lastRunStatuses,
resultReferenceResolver,
sessionBridge,
sessionMetadataStore,
sessionUiStateStore,
});
chatRouter.post("/abort", async (req, res) => {
const parsed = abortPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
const binding = sessionRecord
? await sessionBridge.abort({
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
})
: null;
const run = activeRuns.get(parsed.data.session_id);
if (run && run.status === "running") {
run.status = "aborted";
lastRunStatuses.set(parsed.data.session_id, "aborted");
run.controller.abort();
run.messages = updateLastAssistantMessage(run.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: "⚠️ **请求已中断**",
isError: true,
progress: completeBackendProgress(message.progress),
}));
if (sessionRecord) {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
}
for (const subscriber of run.subscribers) {
subscriber.write("error", {
session_id: parsed.data.session_id,
message: "请求已中断",
});
subscriber.close();
}
run.subscribers.clear();
}
if (!binding && !run) {
res.status(204).end();
return;
}
logger.info(
{
clientSessionId: parsed.data.session_id,
sessionId: binding?.sessionId ?? parsed.data.session_id,
},
"aborted chat session by client request",
);
res.status(202).json({
session_id: parsed.data.session_id,
aborted: true,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat abort failed");
res.status(500).json({
message: "chat abort failed",
detail,
});
}
});
chatRouter.post("/permission/:requestId/reply", async (req, res) => {
const requestId = req.params.requestId?.trim();
const parsed = permissionReplyPayloadSchema.safeParse(req.body);
if (!requestId) {
res.status(400).json({ message: "request_id is required" });
return;
}
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const run = activeRuns.get(sessionRecord.sessionId);
if (!run || run.status !== "running") {
res.status(409).json({ message: "session is not waiting for permissions" });
return;
}
const pendingPermission = run.pendingPermissions.get(requestId);
if (!pendingPermission) {
res.status(404).json({ message: "permission request not found" });
return;
}
const persistPermissionState = async () => {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
};
try {
await runtime.replyPermission({
requestId,
sessionId: sessionRecord.sessionId,
reply: parsed.data.reply,
message: parsed.data.message,
});
} catch (error) {
run.messages = updateLastAssistantPermission(
run.messages,
requestId,
(permission) => ({
...permission,
status: "error",
error:
error instanceof Error
? error.message
: "failed to reply permission",
}),
);
await persistPermissionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist permission error state",
);
});
res.status(502).json({
message: "permission reply failed",
detail: error instanceof Error ? error.message : String(error),
});
return;
}
run.pendingPermissions.delete(requestId);
const status = toPermissionStatus(parsed.data.reply);
run.messages = updateLastAssistantPermission(
run.messages,
requestId,
(permission) => ({
...permission,
status,
repliedAt: Date.now(),
}),
);
await persistPermissionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist permission reply state",
);
});
for (const subscriber of run.subscribers) {
subscriber.write("permission_response", {
session_id: sessionRecord.sessionId,
request_id: requestId,
reply: parsed.data.reply,
});
}
res.status(202).json({
session_id: sessionRecord.sessionId,
request_id: requestId,
reply: parsed.data.reply,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "permission reply route failed");
res.status(500).json({
message: "permission reply route failed",
detail,
});
}
registerChatInteractionRoutes(chatRouter, {
activeRuns,
runtime,
sessionMetadataStore,
sessionUiStateStore,
});
chatRouter.post("/fork", async (req, res) => {
@@ -1045,6 +634,7 @@ export const buildChatRouter = (
controller: abortController,
messages: initialMessages,
pendingPermissions: new Map(),
pendingQuestions: new Map(),
status: "running",
subscribers: new Set(),
};
@@ -1129,6 +719,7 @@ export const buildChatRouter = (
: `⚠️ **错误:** ${typeof data.message === "string" ? data.message : "unknown error"}`,
isError: true,
progress: completeBackendProgress(message.progress),
todos: cancelBackendTodos(message.todos),
}));
} else if (event === "permission_request") {
const payload = data as PermissionRequestPayload;
@@ -1159,6 +750,60 @@ export const buildChatRouter = (
}),
);
}
} else if (event === "question_request") {
const payload = data as QuestionRequestPayload;
let shouldTrackQuestion = true;
if (payload.tool?.callID) {
if (payload.request_id !== payload.tool.callID) {
activeRun.pendingQuestions.delete(payload.tool.callID);
} else {
const hasActionableQuestion = [...activeRun.pendingQuestions.values()].some(
(question) =>
question.tool?.callID === payload.tool?.callID &&
question.request_id !== payload.tool?.callID,
);
if (hasActionableQuestion) {
activeRun.messages = updateLastAssistantMessage(
activeRun.messages,
(message) => ({
...message,
questions: upsertBackendQuestion(message.questions, payload),
}),
);
shouldTrackQuestion = false;
}
}
}
if (shouldTrackQuestion) {
activeRun.pendingQuestions.set(payload.request_id, payload);
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
questions: upsertBackendQuestion(message.questions, payload),
}));
}
} else if (event === "question_response") {
const requestId =
typeof data.request_id === "string" ? data.request_id : undefined;
if (requestId) {
activeRun.pendingQuestions.delete(requestId);
activeRun.messages = updateLastAssistantQuestion(
activeRun.messages,
requestId,
(question) => ({
...question,
status: data.rejected === true ? "rejected" : "answered",
repliedAt: Date.now(),
answers: Array.isArray(data.answers) ? data.answers : question.answers,
error: undefined,
}),
);
}
} else if (event === "todo_update") {
const payload = data as TodoUpdatePayload;
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
todos: upsertBackendTodoUpdate(message.todos, payload),
}));
}
for (const subscriber of activeRun.subscribers) {
@@ -1257,6 +902,21 @@ export const buildChatRouter = (
}
}
} finally {
if (abortController.signal.aborted) {
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: "⚠️ **请求已中断**",
isError: true,
progress: completeBackendProgress(message.progress),
todos: cancelBackendTodos(message.todos),
}));
void queueSessionUiStatePersist().catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist aborted chat stream state");
});
}
await persistQueue.catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state");
});
@@ -1273,7 +933,12 @@ export const buildChatRouter = (
subscriber.close();
}
activeRun.subscribers.clear();
activeRuns.delete(clientSessionId);
if (
activeRun.pendingPermissions.size === 0 &&
activeRun.pendingQuestions.size === 0
) {
activeRuns.delete(clientSessionId);
}
streamClosed = true;
req.off("close", handleClientClose);
res.off("close", handleClientClose);
@@ -1285,6 +950,16 @@ export const buildChatRouter = (
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat stream failed");
if (res.headersSent) {
if (!res.writableEnded && !res.destroyed) {
res.write(toSse("error", {
message: "chat stream failed",
detail,
}));
res.end();
}
return;
}
res.status(500).json({
message: "chat stream failed",
detail,
+176
View File
@@ -0,0 +1,176 @@
import { type Router } from "express";
import { z } from "zod";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
import { logger } from "../logger.js";
import { type ResultReferenceResolver } from "../results/resolver.js";
import { RESULT_REFERENCE_KIND } from "../results/store.js";
import { type SessionMetadataStore } from "../sessions/metadataStore.js";
import { type SessionUiStateStore } from "../sessions/uiStateStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
import {
type ActiveRun,
type RunStatus,
cancelBackendTodos,
completeBackendProgress,
updateLastAssistantMessage,
} from "./chatUiState.js";
const abortPayloadSchema = z.object({
session_id: z.string().max(128),
});
type RegisterAuxiliaryRoutesOptions = {
activeRuns: Map<string, ActiveRun>;
lastRunStatuses: Map<string, RunStatus>;
resultReferenceResolver: ResultReferenceResolver;
sessionBridge: ChatSessionBridge;
sessionMetadataStore: SessionMetadataStore;
sessionUiStateStore: SessionUiStateStore;
};
const toSessionUiStateContext = (sessionId: string) => ({
sessionId,
});
export const registerChatAuxiliaryRoutes = (
chatRouter: Router,
{
activeRuns,
lastRunStatuses,
resultReferenceResolver,
sessionBridge,
sessionMetadataStore,
sessionUiStateStore,
}: RegisterAuxiliaryRoutesOptions,
) => {
chatRouter.get("/render-ref/:renderRef", async (req, res) => {
const renderRef = req.params.renderRef?.trim();
const userId = req.header("x-user-id")?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const clientSessionId =
typeof req.query.session_id === "string"
? req.query.session_id.trim()
: undefined;
if (!userId) {
res.status(400).json({
message: "x-user-id is required",
});
return;
}
if (!renderRef) {
res.status(400).json({
message: "render_ref is required",
});
return;
}
const result = await resultReferenceResolver.getFullAuthorized(
renderRef,
{
actorKey: toActorKey(userId),
clientSessionId,
projectId,
},
{
expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload,
},
);
if (!result) {
res.status(404).json({ message: "render_ref not found" });
return;
}
res.json(result);
});
chatRouter.post("/abort", async (req, res) => {
const parsed = abortPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
const binding = sessionRecord
? await sessionBridge.abort({
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
})
: null;
const run = activeRuns.get(parsed.data.session_id);
if (run && run.status === "running") {
run.status = "aborted";
lastRunStatuses.set(parsed.data.session_id, "aborted");
run.controller.abort();
run.messages = updateLastAssistantMessage(run.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: "⚠️ **请求已中断**",
isError: true,
progress: completeBackendProgress(message.progress),
todos: cancelBackendTodos(message.todos),
}));
if (sessionRecord) {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord.sessionId),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
}
for (const subscriber of run.subscribers) {
subscriber.write("error", {
session_id: parsed.data.session_id,
message: "请求已中断",
});
subscriber.close();
}
run.subscribers.clear();
}
if (!binding && !run) {
res.status(204).end();
return;
}
logger.info(
{
clientSessionId: parsed.data.session_id,
sessionId: binding?.sessionId ?? parsed.data.session_id,
},
"aborted chat session by client request",
);
res.status(202).json({
session_id: parsed.data.session_id,
aborted: true,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat abort failed");
res.status(500).json({
message: "chat abort failed",
detail,
});
}
});
};
+437
View File
@@ -0,0 +1,437 @@
import { type Router } from "express";
import { z } from "zod";
import { logger } from "../logger.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type SessionMetadataStore } from "../sessions/metadataStore.js";
import { type SessionUiStateStore } from "../sessions/uiStateStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
import {
type ActiveRun,
toPermissionStatus,
updateLastAssistantPermission,
updateLastAssistantQuestion,
} from "./chatUiState.js";
const permissionReplyPayloadSchema = z.object({
session_id: z.string().max(128),
reply: z.enum(["once", "always", "reject"]),
message: z.string().max(1000).optional(),
});
const questionReplyPayloadSchema = z.object({
session_id: z.string().max(128),
answers: z.array(z.array(z.string().max(2000))).default([]),
});
const questionRejectPayloadSchema = z.object({
session_id: z.string().max(128),
});
type RegisterInteractionRoutesOptions = {
activeRuns: Map<string, ActiveRun>;
runtime: OpencodeRuntimeAdapter;
sessionMetadataStore: SessionMetadataStore;
sessionUiStateStore: SessionUiStateStore;
};
const toSessionUiStateContext = (sessionId: string) => ({
sessionId,
});
export const registerChatInteractionRoutes = (
chatRouter: Router,
{
activeRuns,
runtime,
sessionMetadataStore,
sessionUiStateStore,
}: RegisterInteractionRoutesOptions,
) => {
chatRouter.post("/permission/:requestId/reply", async (req, res) => {
const requestId = req.params.requestId?.trim();
const parsed = permissionReplyPayloadSchema.safeParse(req.body);
if (!requestId) {
res.status(400).json({ message: "request_id is required" });
return;
}
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const run = activeRuns.get(sessionRecord.sessionId);
if (!run || run.status !== "running") {
res.status(409).json({ message: "session is not waiting for permissions" });
return;
}
const pendingPermission = run.pendingPermissions.get(requestId);
if (!pendingPermission) {
res.status(404).json({ message: "permission request not found" });
return;
}
const persistPermissionState = async () => {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord.sessionId),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
};
try {
await runtime.replyPermission({
requestId,
sessionId: sessionRecord.sessionId,
reply: parsed.data.reply,
message: parsed.data.message,
});
} catch (error) {
run.messages = updateLastAssistantPermission(
run.messages,
requestId,
(permission) => ({
...permission,
status: "error",
error:
error instanceof Error
? error.message
: "failed to reply permission",
}),
);
await persistPermissionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist permission error state",
);
});
res.status(502).json({
message: "permission reply failed",
detail: error instanceof Error ? error.message : String(error),
});
return;
}
run.pendingPermissions.delete(requestId);
const status = toPermissionStatus(parsed.data.reply);
run.messages = updateLastAssistantPermission(
run.messages,
requestId,
(permission) => ({
...permission,
status,
repliedAt: Date.now(),
}),
);
await persistPermissionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist permission reply state",
);
});
for (const subscriber of run.subscribers) {
subscriber.write("permission_response", {
session_id: sessionRecord.sessionId,
request_id: requestId,
reply: parsed.data.reply,
});
}
res.status(202).json({
session_id: sessionRecord.sessionId,
request_id: requestId,
reply: parsed.data.reply,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "permission reply route failed");
res.status(500).json({
message: "permission reply route failed",
detail,
});
}
});
chatRouter.post("/question/:requestId/reply", async (req, res) => {
const requestId = req.params.requestId?.trim();
const parsed = questionReplyPayloadSchema.safeParse(req.body);
if (!requestId) {
res.status(400).json({ message: "request_id is required" });
return;
}
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const run = activeRuns.get(sessionRecord.sessionId);
if (!run) {
res.status(409).json({ message: "session is not waiting for questions" });
return;
}
const pendingQuestion = run.pendingQuestions.get(requestId);
if (!pendingQuestion) {
res.status(404).json({ message: "question request not found" });
return;
}
const persistQuestionState = async () => {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord.sessionId),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
};
try {
await runtime.replyQuestion({
requestId,
sessionId: sessionRecord.sessionId,
answers: parsed.data.answers,
});
} catch (error) {
run.messages = updateLastAssistantQuestion(
run.messages,
requestId,
(question) => ({
...question,
status: "error",
error:
error instanceof Error
? error.message
: "failed to reply question",
}),
);
await persistQuestionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist question error state",
);
});
res.status(502).json({
message: "question reply failed",
detail: error instanceof Error ? error.message : String(error),
});
return;
}
run.pendingQuestions.delete(requestId);
run.messages = updateLastAssistantQuestion(
run.messages,
requestId,
(question) => ({
...question,
status: "answered",
answers: parsed.data.answers,
repliedAt: Date.now(),
error: undefined,
}),
);
await persistQuestionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist question reply state",
);
});
for (const subscriber of run.subscribers) {
subscriber.write("question_response", {
session_id: pendingQuestion.session_id,
request_id: requestId,
answers: parsed.data.answers,
});
}
if (
run.status !== "running" &&
run.pendingPermissions.size === 0 &&
run.pendingQuestions.size === 0
) {
activeRuns.delete(sessionRecord.sessionId);
}
res.status(202).json({
session_id: pendingQuestion.session_id,
request_id: requestId,
answers: parsed.data.answers,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "question reply route failed");
res.status(500).json({
message: "question reply route failed",
detail,
});
}
});
chatRouter.post("/question/:requestId/reject", async (req, res) => {
const requestId = req.params.requestId?.trim();
const parsed = questionRejectPayloadSchema.safeParse(req.body);
if (!requestId) {
res.status(400).json({ message: "request_id is required" });
return;
}
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const run = activeRuns.get(sessionRecord.sessionId);
if (!run) {
res.status(409).json({ message: "session is not waiting for questions" });
return;
}
const pendingQuestion = run.pendingQuestions.get(requestId);
if (!pendingQuestion) {
res.status(404).json({ message: "question request not found" });
return;
}
const persistQuestionState = async () => {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord.sessionId),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord.sessionId), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
};
try {
await runtime.rejectQuestion({
requestId,
sessionId: sessionRecord.sessionId,
});
} catch (error) {
run.messages = updateLastAssistantQuestion(
run.messages,
requestId,
(question) => ({
...question,
status: "error",
error:
error instanceof Error
? error.message
: "failed to reject question",
}),
);
await persistQuestionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist question error state",
);
});
res.status(502).json({
message: "question reject failed",
detail: error instanceof Error ? error.message : String(error),
});
return;
}
run.pendingQuestions.delete(requestId);
run.messages = updateLastAssistantQuestion(
run.messages,
requestId,
(question) => ({
...question,
status: "rejected",
repliedAt: Date.now(),
error: undefined,
}),
);
await persistQuestionState().catch((persistError) => {
logger.warn(
{ err: persistError, sessionId: sessionRecord.sessionId },
"failed to persist question reject state",
);
});
for (const subscriber of run.subscribers) {
subscriber.write("question_response", {
session_id: pendingQuestion.session_id,
request_id: requestId,
rejected: true,
});
}
if (
run.status !== "running" &&
run.pendingPermissions.size === 0 &&
run.pendingQuestions.size === 0
) {
activeRuns.delete(sessionRecord.sessionId);
}
res.status(202).json({
session_id: pendingQuestion.session_id,
request_id: requestId,
rejected: true,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "question reject route failed");
res.status(500).json({
message: "question reject route failed",
detail,
});
}
});
};
+173 -277
View File
@@ -2,7 +2,56 @@ import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2";
import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js";
import { logger } from "../logger.js";
import { type PermissionReply, type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import {
type PermissionReply,
type OpencodeRuntimeAdapter,
} from "../runtime/opencode.js";
import {
buildPermissionDetail,
buildPermissionV2Detail,
buildReasoningProgressDetail,
buildSessionStatusDetail,
buildToolProgressDetail,
collectTextContent,
extractRequestReason,
extractSkillAuditInfo,
getErrorMessage,
getToolProgressTitle,
getUnknownErrorMessage,
hasToolParams,
isPermissionAskedEvent,
isPermissionRepliedEvent,
isPermissionV2AskedEvent,
isPermissionV2RepliedEvent,
isQuestionAskedEvent,
isQuestionRejectedEvent,
isQuestionRepliedEvent,
isQuestionV2AskedEvent,
isQuestionV2RejectedEvent,
isQuestionV2RepliedEvent,
isSessionEvent,
isSkillEvent,
logDevelopmentDebug,
normalizeQuestionAnswers,
normalizeQuestionPayload,
normalizeQuestionToolPayload,
normalizeTodoPriority,
normalizeTodoStatus,
normalizeToolParams,
normalizeToolStatus,
type PermissionRequestPayload,
type QuestionRequestPayload,
type TodoItemPayload,
type TodoUpdatePayload,
} from "./chatStreamEvents.js";
export {
collectTextContent,
type PermissionRequestPayload,
type QuestionRequestPayload,
type TodoItemPayload,
type TodoUpdatePayload,
} from "./chatStreamEvents.js";
export const supportedModels = [
"deepseek/deepseek-v4-flash",
@@ -35,124 +84,6 @@ type ProgressPayload = {
detail?: string;
};
export type PermissionRequestPayload = {
session_id: string;
request_id: string;
permission: string;
patterns: string[];
metadata: Record<string, unknown>;
always: string[];
tool?: {
messageID: string;
callID: string;
};
created_at: number;
};
const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development";
const toolLabels: Record<string, string> = {
memory_manager: "记忆写入",
session_search: "历史会话检索",
skill_manager: "流程沉淀",
locate_features: "地图定位",
view_history: "历史数据面板",
view_scada: "SCADA 面板",
show_chart: "图表渲染",
render_junctions: "节点渲染",
};
const logDevelopmentDebug = (
message: string,
metadata: Record<string, unknown>,
) => {
if (!isDevelopmentDebugLoggingEnabled) {
return;
}
logger.info(metadata, message);
};
const getErrorMessage = (error: {
name: string;
data?: { message?: string };
}) => error.data?.message ?? error.name;
const getUnknownErrorMessage = (error: unknown) => {
if (
typeof error === "object" &&
error !== null &&
"name" in error &&
typeof error.name === "string"
) {
const maybeData = "data" in error ? error.data : undefined;
return getErrorMessage({
name: error.name,
data:
typeof maybeData === "object" && maybeData !== null && "message" in maybeData
? { message: typeof maybeData.message === "string" ? maybeData.message : undefined }
: undefined,
});
}
return error instanceof Error ? error.message : String(error);
};
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const normalizeToolParams = (value: unknown): Record<string, unknown> => {
if (isObjectRecord(value)) {
return value;
}
if (typeof value === "string") {
try {
const parsed = JSON.parse(value) as unknown;
return isObjectRecord(parsed) ? parsed : {};
} catch {
return {};
}
}
return {};
};
const extractRequestReason = (params: Record<string, unknown>) => {
const candidates = ["reason", "request_reason", "why", "purpose", "rationale"];
for (const key of candidates) {
const value = params[key];
if (typeof value === "string") {
const normalized = value.trim();
if (normalized) {
return normalized;
}
}
}
return "";
};
const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill");
const extractSkillAuditInfo = (event: OpencodeEvent) => {
const payload = isObjectRecord(event.properties)
? (event.properties as Record<string, unknown>)
: {};
const candidateName =
typeof payload.skill === "string"
? payload.skill
: typeof payload.skillName === "string"
? payload.skillName
: typeof payload.name === "string"
? payload.name
: event.type;
const reason = extractRequestReason(payload);
return {
name: candidateName,
reason,
payload,
};
};
const hasToolParams = (params: Record<string, unknown>) =>
Object.keys(params).length > 0;
const toRuntimeModel = (model?: SupportedModel) => {
if (!model) {
return undefined;
@@ -167,55 +98,6 @@ const toRuntimeModel = (model?: SupportedModel) => {
};
};
const isSessionEvent = (event: OpencodeEvent, sessionId: string) =>
"properties" in event &&
typeof event.properties === "object" &&
event.properties !== null &&
"sessionID" in event.properties &&
event.properties.sessionID === sessionId;
const isPermissionAskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.asked" }> =>
event.type === "permission.asked";
const isPermissionV2AskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.asked" }> =>
event.type === "permission.v2.asked";
const isPermissionRepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.replied" }> =>
event.type === "permission.replied";
const isPermissionV2RepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.replied" }> =>
event.type === "permission.v2.replied";
const buildPermissionDetail = (event: Extract<OpencodeEvent, { type: "permission.asked" }>) => {
const patterns = event.properties.patterns.length
? event.properties.patterns.join(", ")
: event.properties.permission;
return `需要用户确认权限:${event.properties.permission};匹配规则:${patterns}`;
};
const buildPermissionV2Detail = (
event: Extract<OpencodeEvent, { type: "permission.v2.asked" }>,
) => {
const resources = event.properties.resources.length
? event.properties.resources.join(", ")
: event.properties.action;
return `需要用户确认权限:${event.properties.action};资源:${resources}`;
};
export const collectTextContent = (parts: Part[]) =>
parts
.filter((part): part is Extract<Part, { type: "text" }> => part.type === "text")
.map((part) => part.text)
.join("");
const emitFallbackMessage = async (
runtime: OpencodeRuntimeAdapter,
sessionId: string,
@@ -236,111 +118,6 @@ const emitFallbackMessage = async (
}
};
const normalizeToolStatus = (status: string) => {
if (status === "completed") return "completed";
if (status === "error") return "error";
return "running";
};
const formatProgressValue = (value: unknown): string => {
if (typeof value === "string") {
return value.length > 120 ? `${value.slice(0, 117)}...` : value;
}
if (
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value === undefined
) {
return String(value);
}
try {
const serialized = JSON.stringify(value);
return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized;
} catch {
return "[unserializable]";
}
};
const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim();
const truncateProgressText = (text: string, maxLength: number) =>
text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text;
const summarizeToolParams = (params: Record<string, unknown>) => {
const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]);
const summary = Object.entries(params)
.filter(([key]) => !ignoredKeys.has(key))
.slice(0, 4)
.map(([key, value]) => `${key}=${formatProgressValue(value)}`)
.join(", ");
return summary || "无附加参数";
};
const buildSessionStatusDetail = (status: { type: string; message?: string }) => {
if (status.type === "retry") {
return status.message
? `模型请求需要重试,原因:${status.message}`
: "模型请求正在重试,等待下一次响应。";
}
if (status.type === "busy") {
return status.message
? `Agent 正在处理中:${status.message}`
: "Agent 正在执行推理、工具调用或结果整理。";
}
if (status.type === "idle") {
return status.message
? `Agent 已空闲:${status.message}`
: "当前会话暂时没有待处理任务。";
}
return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`;
};
const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => {
const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800);
if (ended) {
return reasoningText
? `推理过程:${reasoningText}`
: "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。";
}
return reasoningText
? `正在推理:${reasoningText}`
: "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。";
};
const buildToolProgressDetail = (
tool: string,
status: string,
params: Record<string, unknown>,
reason: string,
error?: string,
) => {
const toolName = toolLabels[tool] ?? tool;
const reasonText = reason ? `;调用原因:${reason}` : "";
const paramsText = `;关键参数:${summarizeToolParams(params)}`;
if (status === "error") {
const errorText = error ? `;错误:${error}` : "";
return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`;
}
if (status === "completed") {
return `${toolName} 已执行完成${reasonText}${paramsText}`;
}
if (status === "pending") {
return `${toolName} 已进入待执行状态${reasonText}${paramsText}`;
}
return `${toolName} 正在执行${reasonText}${paramsText}`;
};
const getToolProgressTitle = (tool: string, status: string) => {
const toolName = toolLabels[tool] ?? tool;
if (status === "completed") return `${toolName} 已完成`;
if (status === "error") return `${toolName} 执行失败`;
if (status === "pending") return `准备调用 ${toolName}`;
return `正在调用 ${toolName}`;
};
export const streamPromptResponse = async ({
runtime,
sessionId,
@@ -364,6 +141,8 @@ export const streamPromptResponse = async ({
const progressStartedAtMap = new Map<string, number>();
const finalizedProgressIds = new Set<string>();
const emittedToolParts = new Set<string>();
const emittedQuestionToolParts = new Set<string>();
const emittedQuestionRequestIds = new Set<string>();
const partTypes = new Map<string, Part["type"]>();
const pendingPartTextDeltas = new Map<string, string[]>();
const reasoningDeltas = new Map<string, string[]>();
@@ -734,6 +513,76 @@ export const streamPromptResponse = async ({
continue;
}
if (isQuestionAskedEvent(event) || isQuestionV2AskedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("question request received", {
...debugContext,
requestId: event.properties.id,
questionCount: event.properties.questions.length,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `question-${event.properties.id}`,
phase: "question",
status: "running",
title: "等待用户补充信息",
detail: event.properties.questions
.map((question) => question.question)
.join("\n"),
});
const payload = normalizeQuestionPayload(event, clientSessionId);
emittedQuestionRequestIds.add(payload.request_id);
write("question_request", payload);
continue;
}
if (isQuestionRepliedEvent(event) || isQuestionV2RepliedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("question request replied", {
...debugContext,
requestId: event.properties.requestID,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `question-${event.properties.requestID}`,
phase: "question",
status: "completed",
title: "已收到补充信息",
detail: normalizeQuestionAnswers(event.properties.answers)
.map((answer) => answer.join("、"))
.filter(Boolean)
.join("\n"),
});
write("question_response", {
session_id: clientSessionId,
request_id: event.properties.requestID,
answers: normalizeQuestionAnswers(event.properties.answers),
});
continue;
}
if (isQuestionRejectedEvent(event) || isQuestionV2RejectedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("question request rejected", {
...debugContext,
requestId: event.properties.requestID,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `question-${event.properties.requestID}`,
phase: "question",
status: "completed",
title: "已跳过补充信息",
detail: "用户选择跳过本次补充信息。",
});
write("question_response", {
session_id: clientSessionId,
request_id: event.properties.requestID,
rejected: true,
});
continue;
}
if (isSkillEvent(event)) {
sawResponseActivity = true;
const { name, reason, payload } = extractSkillAuditInfo(event);
@@ -882,6 +731,36 @@ export const streamPromptResponse = async ({
});
}
const questionToolPayload = normalizeQuestionToolPayload(
part,
toolParams,
clientSessionId,
);
if (questionToolPayload) {
if (!emittedQuestionToolParts.has(part.id)) {
emittedQuestionToolParts.add(part.id);
emittedQuestionRequestIds.add(questionToolPayload.request_id);
logDevelopmentDebug("question tool request received", {
...debugContext,
requestId: questionToolPayload.request_id,
tool: part.tool,
questionCount: questionToolPayload.questions.length,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `question-${questionToolPayload.request_id}`,
phase: "question",
status: "running",
title: "等待用户补充信息",
detail: questionToolPayload.questions
.map((question) => question.question)
.join("\n"),
});
write("question_request", questionToolPayload);
}
continue;
}
emitProgress({
id: part.id,
phase: "tool",
@@ -937,18 +816,35 @@ export const streamPromptResponse = async ({
if (event.type === "todo.updated") {
sawResponseActivity = true;
const completed = event.properties.todos.filter(
const todos = event.properties.todos as Array<{
content: string;
status: string;
priority: string;
}>;
const normalizedTodos = todos.map((todo, index) => ({
id: `todo-${index}-${todo.content.slice(0, 24)}`,
content: todo.content,
status: normalizeTodoStatus(todo.status),
priority: normalizeTodoPriority(todo.priority),
updated_at: Date.now(),
}));
const completed = todos.filter(
(todo) => todo.status === "completed",
).length;
emitProgress({
id: "todo-progress",
phase: "planning",
status: completed === event.properties.todos.length ? "completed" : "running",
title: `计划进度 ${completed}/${event.properties.todos.length}`,
detail: event.properties.todos
status: completed === todos.length ? "completed" : "running",
title: `计划进度 ${completed}/${todos.length}`,
detail: todos
.map((todo) => `${todo.status}: ${todo.content}`)
.join("\n"),
});
write("todo_update", {
session_id: clientSessionId,
todos: normalizedTodos,
created_at: Date.now(),
} satisfies TodoUpdatePayload);
continue;
}
+457
View File
@@ -0,0 +1,457 @@
import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2";
import { logger } from "../logger.js";
import { type QuestionAnswers } from "../runtime/opencode.js";
export type PermissionRequestPayload = {
session_id: string;
request_id: string;
permission: string;
patterns: string[];
metadata: Record<string, unknown>;
always: string[];
tool?: {
messageID: string;
callID: string;
};
created_at: number;
};
type QuestionOptionPayload = {
label: string;
description: string;
};
type QuestionInfoPayload = {
header: string;
question: string;
options: QuestionOptionPayload[];
multiple?: boolean;
custom?: boolean;
};
export type QuestionRequestPayload = {
session_id: string;
request_id: string;
questions: QuestionInfoPayload[];
tool?: {
messageID: string;
callID: string;
};
created_at: number;
};
export type TodoItemPayload = {
id: string;
content: string;
status: "pending" | "in_progress" | "completed" | "cancelled";
priority?: "low" | "medium" | "high";
created_at?: number;
updated_at?: number;
};
export type TodoUpdatePayload = {
session_id: string;
message_id?: string;
todos: TodoItemPayload[];
created_at: number;
};
const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development";
const toolLabels: Record<string, string> = {
memory_manager: "记忆写入",
session_search: "历史会话检索",
skill_manager: "流程沉淀",
locate_features: "地图定位",
view_history: "历史数据面板",
view_scada: "SCADA 面板",
show_chart: "图表渲染",
render_junctions: "节点渲染",
};
export const logDevelopmentDebug = (
message: string,
metadata: Record<string, unknown>,
) => {
if (!isDevelopmentDebugLoggingEnabled) {
return;
}
logger.info(metadata, message);
};
export const getErrorMessage = (error: {
name: string;
data?: { message?: string };
}) => error.data?.message ?? error.name;
export const getUnknownErrorMessage = (error: unknown) => {
if (
typeof error === "object" &&
error !== null &&
"name" in error &&
typeof error.name === "string"
) {
const maybeData = "data" in error ? error.data : undefined;
return getErrorMessage({
name: error.name,
data:
typeof maybeData === "object" && maybeData !== null && "message" in maybeData
? { message: typeof maybeData.message === "string" ? maybeData.message : undefined }
: undefined,
});
}
return error instanceof Error ? error.message : String(error);
};
export const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
export const normalizeToolParams = (value: unknown): Record<string, unknown> => {
if (isObjectRecord(value)) {
return value;
}
if (typeof value === "string") {
try {
const parsed = JSON.parse(value) as unknown;
return isObjectRecord(parsed) ? parsed : {};
} catch {
return {};
}
}
return {};
};
export const extractRequestReason = (params: Record<string, unknown>) => {
const candidates = ["reason", "request_reason", "why", "purpose", "rationale"];
for (const key of candidates) {
const value = params[key];
if (typeof value === "string") {
const normalized = value.trim();
if (normalized) {
return normalized;
}
}
}
return "";
};
export const isSkillEvent = (event: OpencodeEvent) =>
event.type.toLowerCase().includes("skill");
export const extractSkillAuditInfo = (event: OpencodeEvent) => {
const payload = isObjectRecord(event.properties)
? (event.properties as Record<string, unknown>)
: {};
const candidateName =
typeof payload.skill === "string"
? payload.skill
: typeof payload.skillName === "string"
? payload.skillName
: typeof payload.name === "string"
? payload.name
: event.type;
const reason = extractRequestReason(payload);
return {
name: candidateName,
reason,
payload,
};
};
export const hasToolParams = (params: Record<string, unknown>) =>
Object.keys(params).length > 0;
export const isSessionEvent = (event: OpencodeEvent, sessionId: string) =>
"properties" in event &&
typeof event.properties === "object" &&
event.properties !== null &&
"sessionID" in event.properties &&
event.properties.sessionID === sessionId;
export const isPermissionAskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.asked" }> =>
event.type === "permission.asked";
export const isPermissionV2AskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.asked" }> =>
event.type === "permission.v2.asked";
export const isPermissionRepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.replied" }> =>
event.type === "permission.replied";
export const isPermissionV2RepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.replied" }> =>
event.type === "permission.v2.replied";
export const isQuestionAskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.asked" }> =>
event.type === "question.asked";
export const isQuestionV2AskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.v2.asked" }> =>
event.type === "question.v2.asked";
export const isQuestionRepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.replied" }> =>
event.type === "question.replied";
export const isQuestionV2RepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.v2.replied" }> =>
event.type === "question.v2.replied";
export const isQuestionRejectedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.rejected" }> =>
event.type === "question.rejected";
export const isQuestionV2RejectedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "question.v2.rejected" }> =>
event.type === "question.v2.rejected";
export const buildPermissionDetail = (
event: Extract<OpencodeEvent, { type: "permission.asked" }>,
) => {
const patterns = event.properties.patterns.length
? event.properties.patterns.join(", ")
: event.properties.permission;
return `需要用户确认权限:${event.properties.permission};匹配规则:${patterns}`;
};
export const buildPermissionV2Detail = (
event: Extract<OpencodeEvent, { type: "permission.v2.asked" }>,
) => {
const resources = event.properties.resources.length
? event.properties.resources.join(", ")
: event.properties.action;
return `需要用户确认权限:${event.properties.action};资源:${resources}`;
};
export const normalizeQuestionPayload = (
event: Extract<OpencodeEvent, { type: "question.asked" | "question.v2.asked" }>,
clientSessionId: string,
): QuestionRequestPayload => ({
session_id: clientSessionId,
request_id: event.properties.id,
questions: event.properties.questions.map((question) => ({
header: question.header,
question: question.question,
options: question.options.map((option) => ({
label: option.label,
description: option.description,
})),
multiple: question.multiple,
custom: question.custom,
})),
tool: event.properties.tool,
created_at: Date.now(),
});
export const normalizeQuestionAnswers = (answers: QuestionAnswers | undefined) =>
Array.isArray(answers)
? answers.map((answer) =>
Array.isArray(answer)
? answer.filter((item): item is string => typeof item === "string")
: [],
)
: [];
const questionToolNames = new Set(["question", "request_user_input"]);
const normalizeQuestionOptions = (value: unknown): QuestionOptionPayload[] =>
Array.isArray(value)
? value.filter(isObjectRecord).map((option) => ({
label: typeof option.label === "string" ? option.label : "",
description:
typeof option.description === "string" ? option.description : "",
})).filter((option) => option.label.trim().length > 0)
: [];
const normalizeToolQuestionInfo = (value: unknown): QuestionInfoPayload | undefined => {
if (!isObjectRecord(value) || typeof value.question !== "string") {
return undefined;
}
const question = value.question.trim();
if (!question) {
return undefined;
}
return {
header:
typeof value.header === "string" && value.header.trim()
? value.header
: "补充信息",
question,
options: normalizeQuestionOptions(value.options),
multiple: typeof value.multiple === "boolean" ? value.multiple : undefined,
custom: typeof value.custom === "boolean" ? value.custom : undefined,
};
};
export const normalizeQuestionToolPayload = (
part: Extract<Part, { type: "tool" }>,
params: Record<string, unknown>,
clientSessionId: string,
): QuestionRequestPayload | undefined => {
if (!questionToolNames.has(part.tool)) {
return undefined;
}
const questions = Array.isArray(params.questions)
? params.questions
.map(normalizeToolQuestionInfo)
.filter((question): question is QuestionInfoPayload => Boolean(question))
: [];
if (questions.length === 0) {
return undefined;
}
return {
session_id: clientSessionId,
request_id: part.callID || part.id,
questions,
tool: {
messageID: part.messageID,
callID: part.callID,
},
created_at: Date.now(),
};
};
export const normalizeTodoStatus = (status: string): TodoItemPayload["status"] => {
if (status === "in_progress" || status === "completed" || status === "cancelled") {
return status;
}
return "pending";
};
export const normalizeTodoPriority = (
priority: string,
): TodoItemPayload["priority"] | undefined => {
if (priority === "low" || priority === "medium" || priority === "high") {
return priority;
}
return undefined;
};
export const collectTextContent = (parts: Part[]) =>
parts
.filter((part): part is Extract<Part, { type: "text" }> => part.type === "text")
.map((part) => part.text)
.join("");
export const normalizeToolStatus = (status: string) => {
if (status === "completed") return "completed";
if (status === "error") return "error";
return "running";
};
const formatProgressValue = (value: unknown): string => {
if (typeof value === "string") {
return value.length > 120 ? `${value.slice(0, 117)}...` : value;
}
if (
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value === undefined
) {
return String(value);
}
try {
const serialized = JSON.stringify(value);
return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized;
} catch {
return "[unserializable]";
}
};
const normalizeProgressText = (chunks: string[]) =>
chunks.join("").replace(/\s+/g, " ").trim();
const truncateProgressText = (text: string, maxLength: number) =>
text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text;
const summarizeToolParams = (params: Record<string, unknown>) => {
const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]);
const summary = Object.entries(params)
.filter(([key]) => !ignoredKeys.has(key))
.slice(0, 4)
.map(([key, value]) => `${key}=${formatProgressValue(value)}`)
.join(", ");
return summary || "无附加参数";
};
export const buildSessionStatusDetail = (status: { type: string; message?: string }) => {
if (status.type === "retry") {
return status.message
? `模型请求需要重试,原因:${status.message}`
: "模型请求正在重试,等待下一次响应。";
}
if (status.type === "busy") {
return status.message
? `Agent 正在处理中:${status.message}`
: "Agent 正在执行推理、工具调用或结果整理。";
}
if (status.type === "idle") {
return status.message
? `Agent 已空闲:${status.message}`
: "当前会话暂时没有待处理任务。";
}
return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`;
};
export const buildReasoningProgressDetail = (
chunks: string[],
ended?: string | number | Date | null,
) => {
const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800);
if (ended) {
return reasoningText
? `推理过程:${reasoningText}`
: "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。";
}
return reasoningText
? `正在推理:${reasoningText}`
: "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。";
};
export const buildToolProgressDetail = (
tool: string,
status: string,
params: Record<string, unknown>,
reason: string,
error?: string,
) => {
const toolName = toolLabels[tool] ?? tool;
const reasonText = reason ? `;调用原因:${reason}` : "";
const paramsText = `;关键参数:${summarizeToolParams(params)}`;
if (status === "error") {
const errorText = error ? `;错误:${error}` : "";
return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`;
}
if (status === "completed") {
return `${toolName} 已执行完成${reasonText}${paramsText}`;
}
if (status === "pending") {
return `${toolName} 已进入待执行状态${reasonText}${paramsText}`;
}
return `${toolName} 正在执行${reasonText}${paramsText}`;
};
export const getToolProgressTitle = (tool: string, status: string) => {
const toolName = toolLabels[tool] ?? tool;
if (status === "completed") return `${toolName} 已完成`;
if (status === "error") return `${toolName} 执行失败`;
if (status === "pending") return `准备调用 ${toolName}`;
return `正在调用 ${toolName}`;
};
+308
View File
@@ -0,0 +1,308 @@
import { type PermissionReply } from "../runtime/opencode.js";
import {
type PermissionRequestPayload,
type QuestionRequestPayload,
type TodoUpdatePayload,
} from "./chatStream.js";
export type RunStatus = "running" | "completed" | "error" | "aborted";
export type StreamSubscriber = {
write: (event: string, data: Record<string, unknown>) => void;
close: () => void;
};
export type ActiveRun = {
clientSessionId: string;
controller: AbortController;
messages: unknown[];
pendingPermissions: Map<string, PermissionRequestPayload>;
pendingQuestions: Map<string, QuestionRequestPayload>;
status: RunStatus;
subscribers: Set<StreamSubscriber>;
};
export const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const createFrontendMessageId = () =>
`msg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
export const createInitialStreamingMessages = (
existingMessages: unknown[],
userContent: string,
) => {
const userMessage = {
id: createFrontendMessageId(),
role: "user",
content: userContent,
};
return [
...existingMessages,
{
...userMessage,
branchRootId: userMessage.id,
},
{
id: createFrontendMessageId(),
role: "assistant",
content: "",
progress: [
{
id: "request-received",
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。",
startedAt: Date.now(),
elapsedMs: 0,
elapsedSnapshotAt: Date.now(),
},
],
},
];
};
export const countFrontendUserMessages = (messages: unknown[]) =>
messages.filter(
(message) => isObjectRecord(message) && message.role === "user",
).length;
export const pruneBranchGroupsForMessageIndex = (
branchGroups: unknown[],
messageIndex: number | undefined,
) => {
if (messageIndex === undefined) {
return branchGroups;
}
return branchGroups.filter(
(group) =>
!isObjectRecord(group) ||
typeof group.parentCount !== "number" ||
group.parentCount < messageIndex,
);
};
export const upsertBackendProgress = (
progress: unknown,
payload: Record<string, unknown>,
) => {
const next = Array.isArray(progress) ? [...progress] : [];
const id = typeof payload.id === "string" ? payload.id : `progress-${Date.now()}`;
const index = next.findIndex((item) => isObjectRecord(item) && item.id === id);
const nextItem = {
id,
phase: typeof payload.phase === "string" ? payload.phase : "progress",
status:
payload.status === "completed" || payload.status === "error"
? payload.status
: "running",
title: typeof payload.title === "string" ? payload.title : "正在处理",
detail: typeof payload.detail === "string" ? payload.detail : undefined,
startedAt: typeof payload.started_at === "number" ? payload.started_at : undefined,
endedAt: typeof payload.ended_at === "number" ? payload.ended_at : undefined,
elapsedMs: typeof payload.elapsed_ms === "number" ? payload.elapsed_ms : undefined,
elapsedSnapshotAt:
typeof payload.elapsed_ms === "number" ? Date.now() : undefined,
durationMs: typeof payload.duration_ms === "number" ? payload.duration_ms : undefined,
};
if (index >= 0) {
next[index] = nextItem;
} else {
next.push(nextItem);
}
return next;
};
export const completeBackendProgress = (progress: unknown) =>
Array.isArray(progress)
? progress.map((item) => {
if (!isObjectRecord(item) || item.status !== "running") {
return item;
}
const endedAt = Date.now();
const startedAt = typeof item.startedAt === "number" ? item.startedAt : undefined;
return {
...item,
status: "completed",
endedAt,
elapsedMs: undefined,
elapsedSnapshotAt: undefined,
durationMs:
typeof item.durationMs === "number"
? item.durationMs
: startedAt !== undefined
? Math.max(0, endedAt - startedAt)
: item.elapsedMs,
};
})
: progress;
export const cancelBackendTodos = (todos: unknown) =>
Array.isArray(todos)
? todos.map((todoUpdate) => {
if (!isObjectRecord(todoUpdate) || !Array.isArray(todoUpdate.todos)) {
return todoUpdate;
}
return {
...todoUpdate,
todos: todoUpdate.todos.map((todo) => {
if (!isObjectRecord(todo)) {
return todo;
}
if (todo.status !== "pending" && todo.status !== "in_progress") {
return todo;
}
return {
...todo,
status: "cancelled",
updatedAt: Date.now(),
};
}),
};
})
: todos;
export const updateLastAssistantMessage = (
messages: unknown[],
updater: (message: Record<string, unknown>) => Record<string, unknown>,
) => {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
if (isObjectRecord(message) && message.role === "assistant") {
const next = [...messages];
next[index] = updater(message);
return next;
}
}
return messages;
};
export const updateLastAssistantPermission = (
messages: unknown[],
requestId: string,
updater: (permission: Record<string, unknown>) => Record<string, unknown>,
) =>
updateLastAssistantMessage(messages, (message) => {
const permissions = Array.isArray(message.permissions)
? message.permissions
: [];
return {
...message,
permissions: permissions.map((permission) =>
isObjectRecord(permission) && permission.requestId === requestId
? updater(permission)
: permission,
),
};
});
export const updateLastAssistantQuestion = (
messages: unknown[],
requestId: string,
updater: (question: Record<string, unknown>) => Record<string, unknown>,
) =>
updateLastAssistantMessage(messages, (message) => {
const questions = Array.isArray(message.questions)
? message.questions
: [];
return {
...message,
questions: questions.map((question) =>
isObjectRecord(question) && question.requestId === requestId
? updater(question)
: question,
),
};
});
export const toFrontendPermission = (
payload: PermissionRequestPayload,
status: "pending" | "approved_once" | "approved_always" | "rejected" | "error" = "pending",
) => ({
requestId: payload.request_id,
sessionId: payload.session_id,
permission: payload.permission,
patterns: payload.patterns,
metadata: payload.metadata,
always: payload.always,
tool: payload.tool,
createdAt: payload.created_at,
status,
});
const toFrontendQuestion = (
payload: QuestionRequestPayload,
status: "pending" | "submitting" | "answered" | "rejected" | "error" = "pending",
) => ({
requestId: payload.request_id,
sessionId: payload.session_id,
questions: payload.questions,
tool: payload.tool,
createdAt: payload.created_at,
status,
});
export const toPermissionStatus = (reply: PermissionReply) => {
if (reply === "always") return "approved_always";
if (reply === "once") return "approved_once";
return "rejected";
};
export const upsertBackendQuestion = (
questions: unknown,
payload: QuestionRequestPayload,
) => {
const next = Array.isArray(questions) ? [...questions] : [];
const index = next.findIndex((item) => {
if (!isObjectRecord(item)) return false;
if (item.requestId === payload.request_id) return true;
const tool = isObjectRecord(item.tool) ? item.tool : undefined;
return Boolean(
payload.tool?.callID &&
tool?.callID === payload.tool.callID,
);
});
const nextItem = toFrontendQuestion(payload);
if (index >= 0) {
const current = next[index];
const currentRequestId = isObjectRecord(current) ? current.requestId : undefined;
const currentTool = isObjectRecord(current) && isObjectRecord(current.tool)
? current.tool
: undefined;
const currentIsActionable =
typeof currentRequestId === "string" &&
currentRequestId !== currentTool?.callID;
const payloadIsToolPlaceholder =
Boolean(payload.tool?.callID) && payload.request_id === payload.tool?.callID;
next[index] = {
...(isObjectRecord(current) ? current : {}),
...(currentIsActionable && payloadIsToolPlaceholder
? {
questions: nextItem.questions,
tool: nextItem.tool,
createdAt: nextItem.createdAt,
}
: nextItem),
status:
isObjectRecord(current) && current.status === "submitting"
? "submitting"
: nextItem.status,
};
} else {
next.push(nextItem);
}
return next;
};
export const upsertBackendTodoUpdate = (
_todos: unknown,
payload: TodoUpdatePayload,
) => [
{
sessionId: payload.session_id,
messageId: payload.message_id,
todos: payload.todos,
createdAt: payload.created_at,
},
];
+98
View File
@@ -32,6 +32,7 @@ type RuntimeModelOverride = {
};
export type PermissionReply = "once" | "always" | "reject";
export type QuestionAnswers = string[][];
type RuntimeMessage = {
info: {
@@ -135,6 +136,13 @@ export class OpencodeRuntimeAdapter {
const targetUserMessage = userMessages[options.userOrdinal - 1];
if (!targetUserMessage) {
if (messages.length === 0 && options.userOrdinal === 1) {
logger.warn(
{ sessionId, userOrdinal: options.userOrdinal },
"skipping opencode revert because runtime session has no messages",
);
return;
}
throw new Error("target user message not found to revert");
}
@@ -221,6 +229,96 @@ export class OpencodeRuntimeAdapter {
throw new Error("opencode permission reply API is unavailable");
}
async replyQuestion(options: {
requestId: string;
sessionId?: string;
answers: QuestionAnswers;
}) {
const client = await this.ensureClient();
if ("question" in client && client.question?.reply) {
try {
const response = await client.question.reply({
requestID: options.requestId,
answers: options.answers,
});
return response.data;
} catch (error) {
if (!options.sessionId) {
throw error;
}
}
}
const v2Question = (client as unknown as {
v2?: {
session?: {
question?: {
reply?: (parameters: {
sessionID: string;
requestID: string;
questionV2Reply: { answers: QuestionAnswers };
}) => Promise<{ data: unknown }>;
};
};
};
}).v2?.session?.question;
if (v2Question?.reply && options.sessionId) {
const response = await v2Question.reply({
sessionID: options.sessionId,
requestID: options.requestId,
questionV2Reply: {
answers: options.answers,
},
});
return response.data;
}
throw new Error("opencode question reply API is unavailable");
}
async rejectQuestion(options: {
requestId: string;
sessionId?: string;
}) {
const client = await this.ensureClient();
if ("question" in client && client.question?.reject) {
try {
const response = await client.question.reject({
requestID: options.requestId,
});
return response.data;
} catch (error) {
if (!options.sessionId) {
throw error;
}
}
}
const v2Question = (client as unknown as {
v2?: {
session?: {
question?: {
reject?: (parameters: {
sessionID: string;
requestID: string;
}) => Promise<{ data: unknown }>;
};
};
};
}).v2?.session?.question;
if (v2Question?.reject && options.sessionId) {
const response = await v2Question.reject({
sessionID: options.sessionId,
requestID: options.requestId,
});
return response.data;
}
throw new Error("opencode question reject API is unavailable");
}
async dispose(): Promise<void> {
this.closeServer?.();
this.closeServer = null;
+213
View File
@@ -161,4 +161,217 @@ describe("streamPromptResponse", () => {
always: ["/tmp"],
} satisfies Partial<PermissionRequestPayload>);
});
it("forwards opencode question requests and replies as SSE payloads", async () => {
const runtime = {
subscribeEvents: async () =>
createEventStream([
{
type: "question.asked",
properties: {
id: "question-1",
sessionID: "runtime-session-1",
questions: [
{
header: "范围",
question: "选择分析范围",
options: [{ label: "城区", description: "中心城区" }],
multiple: false,
custom: true,
},
],
},
},
{
type: "question.replied",
properties: {
sessionID: "runtime-session-1",
requestID: "question-1",
answers: [["城区", "补充说明"]],
},
},
{
type: "session.idle",
properties: {
sessionID: "runtime-session-1",
},
},
]),
prompt: async () => undefined,
messages: async () => [],
} as unknown as OpencodeRuntimeAdapter;
const events: Array<{ event: string; data: Record<string, unknown> }> = [];
await streamPromptResponse({
runtime,
sessionId: "runtime-session-1",
clientSessionId: "client-session-1",
message: "ask",
write: (event, data) => events.push({ event, data }),
});
expect(events.find((item) => item.event === "question_request")?.data).toMatchObject({
session_id: "client-session-1",
request_id: "question-1",
questions: [
{
header: "范围",
question: "选择分析范围",
options: [{ label: "城区", description: "中心城区" }],
multiple: false,
custom: true,
},
],
});
expect(events.find((item) => item.event === "question_response")?.data).toEqual({
session_id: "client-session-1",
request_id: "question-1",
answers: [["城区", "补充说明"]],
});
});
it("converts question tool parts into question request SSE payloads", async () => {
const runtime = {
subscribeEvents: async () =>
createEventStream([
{
type: "message.part.updated",
properties: {
sessionID: "runtime-session-1",
part: {
id: "tool-part-1",
sessionID: "runtime-session-1",
messageID: "message-1",
type: "tool",
callID: "call-1",
tool: "question",
state: {
status: "running",
input: {
questions: [
{
question: "你觉得这个 question 工具好用吗?",
header: "测试问题",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
},
time: { start: Date.now() },
},
},
time: Date.now(),
},
},
{
type: "session.idle",
properties: {
sessionID: "runtime-session-1",
},
},
]),
prompt: async () => undefined,
messages: async () => [],
} as unknown as OpencodeRuntimeAdapter;
const events: Array<{ event: string; data: Record<string, unknown> }> = [];
await streamPromptResponse({
runtime,
sessionId: "runtime-session-1",
clientSessionId: "client-session-1",
message: "ask",
write: (event, data) => events.push({ event, data }),
});
expect(events.find((item) => item.event === "question_request")?.data).toMatchObject({
session_id: "client-session-1",
request_id: "call-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
});
expect(
events.some(
(item) => item.event === "tool_call" && item.data.tool === "question",
),
).toBe(false);
});
it("forwards todo updates as structured SSE payloads and progress", async () => {
const runtime = {
subscribeEvents: async () =>
createEventStream([
{
type: "todo.updated",
properties: {
sessionID: "runtime-session-1",
todos: [
{ content: "分析水位", status: "completed", priority: "high" },
{ content: "生成建议", status: "in_progress", priority: "medium" },
],
},
},
{
type: "session.idle",
properties: {
sessionID: "runtime-session-1",
},
},
]),
prompt: async () => undefined,
messages: async () => [],
} as unknown as OpencodeRuntimeAdapter;
const events: Array<{ event: string; data: Record<string, unknown> }> = [];
await streamPromptResponse({
runtime,
sessionId: "runtime-session-1",
clientSessionId: "client-session-1",
message: "plan",
write: (event, data) => events.push({ event, data }),
});
expect(
events.find(
(item) => item.event === "progress" && item.data.id === "todo-progress",
)?.data,
).toMatchObject({
id: "todo-progress",
phase: "planning",
title: "计划进度 1/2",
});
expect(events.find((item) => item.event === "todo_update")?.data).toMatchObject({
session_id: "client-session-1",
todos: [
expect.objectContaining({
content: "分析水位",
status: "completed",
priority: "high",
}),
expect.objectContaining({
content: "生成建议",
status: "in_progress",
priority: "medium",
}),
],
});
});
});
+127
View File
@@ -0,0 +1,127 @@
import { describe, expect, it } from "bun:test";
import {
cancelBackendTodos,
upsertBackendQuestion,
} from "../../src/routes/chatUiState.js";
describe("upsertBackendQuestion", () => {
it("replaces a tool-call placeholder with the actionable question request", () => {
const questions = upsertBackendQuestion(
[
{
requestId: "call-1",
sessionId: "session-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [{ label: "非常好用", description: "交互清晰,选项方便" }],
},
],
tool: { messageID: "message-1", callID: "call-1" },
createdAt: 123,
status: "pending",
},
],
{
session_id: "session-1",
request_id: "question-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [{ label: "非常好用", description: "交互清晰,选项方便" }],
},
],
tool: { messageID: "message-1", callID: "call-1" },
created_at: 456,
},
);
expect(questions).toHaveLength(1);
expect(questions[0]).toMatchObject({
requestId: "question-1",
tool: { callID: "call-1" },
status: "pending",
});
});
it("does not replace an actionable question request with a later tool-call placeholder", () => {
const questions = upsertBackendQuestion(
[
{
requestId: "question-1",
sessionId: "session-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [{ label: "非常好用", description: "交互清晰,选项方便" }],
},
],
tool: { messageID: "message-1", callID: "call-1" },
createdAt: 123,
status: "pending",
},
],
{
session_id: "session-1",
request_id: "call-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [{ label: "非常好用", description: "交互清晰,选项方便" }],
},
],
tool: { messageID: "message-1", callID: "call-1" },
created_at: 456,
},
);
expect(questions).toHaveLength(1);
expect(questions[0]).toMatchObject({
requestId: "question-1",
tool: { callID: "call-1" },
status: "pending",
});
});
});
describe("cancelBackendTodos", () => {
it("marks pending and in-progress todos as cancelled", () => {
const cancelled = cancelBackendTodos([
{
sessionId: "session-1",
todos: [
{ id: "todo-1", content: "分析水位", status: "in_progress" },
{ id: "todo-2", content: "生成建议", status: "pending" },
{ id: "todo-3", content: "完成报告", status: "completed" },
],
createdAt: 123,
},
]);
expect(cancelled).toEqual([
expect.objectContaining({
todos: [
expect.objectContaining({
id: "todo-1",
status: "cancelled",
updatedAt: expect.any(Number),
}),
expect.objectContaining({
id: "todo-2",
status: "cancelled",
updatedAt: expect.any(Number),
}),
expect.objectContaining({
id: "todo-3",
status: "completed",
}),
],
}),
]);
});
});
+62
View File
@@ -0,0 +1,62 @@
import { describe, expect, it } from "bun:test";
import { OpencodeRuntimeAdapter } from "../../src/runtime/opencode.js";
const createRuntimeAdapter = (
messages: unknown[],
calls: {
reverted: string[];
removed: string[];
} = { reverted: [], removed: [] },
) =>
Object.assign(Object.create(OpencodeRuntimeAdapter.prototype), {
messages: async () => messages,
revertMessage: async (_sessionId: string, messageId: string) => {
calls.reverted.push(messageId);
},
removeMessage: async (_sessionId: string, messageId: string) => {
calls.removed.push(messageId);
},
}) as OpencodeRuntimeAdapter;
describe("OpencodeRuntimeAdapter.revertToUserMessage", () => {
it("skips reverting the first user message when the runtime session is empty", async () => {
const calls = { reverted: [] as string[], removed: [] as string[] };
const runtime = createRuntimeAdapter([], calls);
await runtime.revertToUserMessage("session-1", { userOrdinal: 1 });
expect(calls).toEqual({ reverted: [], removed: [] });
});
it("keeps ordinal mismatches visible when runtime messages exist", async () => {
const runtime = createRuntimeAdapter([
{ info: { id: "user-1", role: "user" } },
{ info: { id: "assistant-1", role: "assistant" } },
]);
await expect(
runtime.revertToUserMessage("session-1", { userOrdinal: 2 }),
).rejects.toThrow("target user message not found to revert");
});
it("reverts and removes messages from the target user message onward", async () => {
const calls = { reverted: [] as string[], removed: [] as string[] };
const runtime = createRuntimeAdapter(
[
{ info: { id: "user-1", role: "user" } },
{ info: { id: "assistant-1", role: "assistant" } },
{ info: { id: "user-2", role: "user" } },
{ info: { id: "assistant-2", role: "assistant" } },
],
calls,
);
await runtime.revertToUserMessage("session-1", { userOrdinal: 2 });
expect(calls).toEqual({
reverted: ["user-2"],
removed: ["assistant-2", "user-2"],
});
});
});