fix(chat): wire question and todo cards

This commit is contained in:
2026-06-08 18:10:28 +08:00
parent 2691f42581
commit b23cb6acdd
9 changed files with 1713 additions and 10 deletions
@@ -7,6 +7,7 @@ import {
abortAgentChat,
forkAgentChat,
replyAgentPermission,
replyAgentQuestion,
resumeAgentChatStream,
streamAgentChat,
} from "@/lib/chatStream";
@@ -16,6 +17,7 @@ jest.mock("@/lib/chatStream", () => ({
abortAgentChat: jest.fn(async () => undefined),
forkAgentChat: jest.fn(async () => "forked-session"),
replyAgentPermission: jest.fn(async () => undefined),
replyAgentQuestion: jest.fn(async () => undefined),
resumeAgentChatStream: jest.fn(async () => undefined),
streamAgentChat: jest.fn(async () => undefined),
}));
@@ -53,11 +55,13 @@ describe("useAgentChatSession", () => {
jest.mocked(abortAgentChat).mockReset();
jest.mocked(forkAgentChat).mockReset();
jest.mocked(replyAgentPermission).mockReset();
jest.mocked(replyAgentQuestion).mockReset();
jest.mocked(resumeAgentChatStream).mockReset();
jest.mocked(streamAgentChat).mockReset();
jest.mocked(abortAgentChat).mockImplementation(async () => undefined);
jest.mocked(forkAgentChat).mockImplementation(async () => "forked-session");
jest.mocked(replyAgentPermission).mockImplementation(async () => undefined);
jest.mocked(replyAgentQuestion).mockImplementation(async () => undefined);
jest.mocked(resumeAgentChatStream).mockImplementation(async () => undefined);
jest.mocked(streamAgentChat).mockImplementation(async () => undefined);
deleteChatSession.mockImplementation(async () => undefined);
@@ -333,6 +337,337 @@ describe("useAgentChatSession", () => {
]);
});
it("applies question responses to the message that owns the request", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{
id: "a1",
role: "assistant",
content: "需要确认",
questions: [
{
requestId: "q-1",
sessionId: "session-loaded",
questions: [
{
header: "范围",
question: "选择范围",
options: [],
custom: true,
},
],
createdAt: 123,
status: "pending",
},
],
},
{ id: "a2", role: "assistant", content: "后续消息" },
],
isStreaming: true,
runStatus: "running",
});
onEvent({
type: "question_response",
sessionId: "session-loaded",
requestId: "q-1",
answers: [["城区"]],
rejected: false,
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
expect(result.current.messages[1].questions?.[0]).toEqual(
expect.objectContaining({
requestId: "q-1",
status: "answered",
answers: [["城区"]],
}),
);
expect(result.current.messages[2].questions).toBeUndefined();
});
it("deduplicates question requests across assistant messages", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{
id: "a1",
role: "assistant",
content: "需要确认",
questions: [
{
requestId: "question-1",
sessionId: "session-loaded",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
createdAt: 123,
status: "pending",
},
],
},
{ id: "a2", role: "assistant", content: "后续消息" },
],
isStreaming: true,
runStatus: "running",
});
onEvent({
type: "question_request",
sessionId: "session-loaded",
requestId: "call-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
createdAt: 456,
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
const allQuestions = result.current.messages.flatMap(
(message) => message.questions ?? [],
);
expect(allQuestions).toHaveLength(1);
expect(result.current.messages[1].questions?.[0]).toEqual(
expect.objectContaining({
requestId: "question-1",
tool: expect.objectContaining({ callID: "call-1" }),
}),
);
expect(result.current.messages[2].questions).toBeUndefined();
});
it("keeps the actionable question request id when a tool-part duplicate arrives later", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{
id: "a1",
role: "assistant",
content: "需要确认",
questions: [
{
requestId: "question-1",
sessionId: "session-loaded",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
createdAt: 123,
status: "pending",
},
],
},
],
isStreaming: true,
runStatus: "running",
});
onEvent({
type: "question_request",
sessionId: "session-loaded",
requestId: "call-1",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
createdAt: 456,
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
const allQuestions = result.current.messages.flatMap(
(message) => message.questions ?? [],
);
expect(allQuestions).toHaveLength(1);
expect(allQuestions[0]).toEqual(
expect.objectContaining({
requestId: "question-1",
tool: expect.objectContaining({ callID: "call-1" }),
}),
);
});
it("deduplicates persisted duplicate questions from state events", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
const duplicateQuestion = {
sessionId: "session-loaded",
questions: [
{
header: "测试问题",
question: "你觉得这个 question 工具好用吗?",
options: [
{
label: "非常好用",
description: "交互清晰,选项方便",
},
],
},
],
tool: {
messageID: "message-1",
callID: "call-1",
},
createdAt: 123,
status: "pending" as const,
};
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{
id: "a1",
role: "assistant",
content: "需要确认",
questions: [{ ...duplicateQuestion, requestId: "question-1" }],
},
{
id: "a2",
role: "assistant",
content: "后续消息",
questions: [{ ...duplicateQuestion, requestId: "call-1" }],
},
],
isStreaming: true,
runStatus: "running",
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
expect(
result.current.messages.flatMap((message) => message.questions ?? []),
).toHaveLength(1);
expect(result.current.messages[1].questions).toHaveLength(1);
expect(result.current.messages[2].questions).toBeUndefined();
});
it("aborts a resumed streaming session through the backend abort endpoint", async () => {
listChatSessions.mockResolvedValue([
{
@@ -433,6 +768,23 @@ describe("useAgentChatSession", () => {
title: "开始分析",
startedAt: 1000,
} satisfies StreamEvent);
onEvent({
type: "todo_update",
sessionId: "session-1",
todos: [
{
id: "todo-1",
content: "分析水位",
status: "in_progress",
},
{
id: "todo-2",
content: "生成建议",
status: "pending",
},
],
createdAt: 1001,
} satisfies StreamEvent);
signal?.addEventListener("abort", () => {
reject(new Error("aborted"));
@@ -474,6 +826,22 @@ describe("useAgentChatSession", () => {
endedAt: expect.any(Number),
}),
],
todos: [
expect.objectContaining({
todos: [
expect.objectContaining({
id: "todo-1",
status: "cancelled",
updatedAt: expect.any(Number),
}),
expect.objectContaining({
id: "todo-2",
status: "cancelled",
updatedAt: expect.any(Number),
}),
],
}),
],
}),
);
expect(abortAgentChat).toHaveBeenCalledWith("session-1");
@@ -5,13 +5,17 @@ import { useCallback, useEffect, useRef, useState } from "react";
import {
abortAgentChat,
forkAgentChat,
rejectAgentQuestion,
replyAgentPermission,
replyAgentQuestion,
resumeAgentChatStream,
streamAgentChat,
} from "@/lib/chatStream";
import type {
AgentApprovalMode,
AgentModel,
AgentQuestionRequest,
AgentTodoUpdate,
PermissionReply,
StreamEvent,
} from "@/lib/chatStream";
@@ -135,6 +139,20 @@ const completeRunningProgress = (progress: ChatProgress[] | undefined) =>
};
});
const cancelRunningTodos = (todos: AgentTodoUpdate[] | undefined) =>
todos?.map((todoUpdate) => ({
...todoUpdate,
todos: todoUpdate.todos.map((todo) =>
todo.status === "pending" || todo.status === "in_progress"
? {
...todo,
status: "cancelled" as const,
updatedAt: Date.now(),
}
: todo,
),
}));
const upsertPermission = (
permissions: AgentPermissionRequest[] | undefined,
event: StreamEvent & { type: "permission_request" },
@@ -170,12 +188,192 @@ const toPermissionStatus = (reply: PermissionReply): AgentPermissionRequest["sta
return "rejected";
};
const isActionableQuestionRequest = (question: {
requestId: string;
tool?: AgentQuestionRequest["tool"];
}) => Boolean(question.requestId && question.requestId !== question.tool?.callID);
const toQuestionRequest = (
event: StreamEvent & { type: "question_request" },
status: AgentQuestionRequest["status"] = "pending",
): AgentQuestionRequest => ({
requestId: event.requestId,
sessionId: event.sessionId,
questions: event.questions,
tool: event.tool,
createdAt: event.createdAt,
status,
});
const getQuestionContentSignature = (
questions: AgentQuestionRequest["questions"],
) =>
JSON.stringify(
questions.map((question) => ({
header: question.header,
question: question.question,
options: question.options.map((option) => ({
label: option.label,
description: option.description,
})),
multiple: question.multiple ?? false,
custom: question.custom ?? false,
})),
);
const isSameQuestionRequest = (
question: AgentQuestionRequest,
event: StreamEvent & { type: "question_request" },
) => {
if (question.requestId === event.requestId) return true;
if (question.tool?.callID && event.tool?.callID) {
return question.tool.callID === event.tool.callID;
}
return (
question.status === "pending" &&
question.sessionId === event.sessionId &&
getQuestionContentSignature(question.questions) ===
getQuestionContentSignature(event.questions)
);
};
const isSameQuestionPair = (
left: AgentQuestionRequest,
right: AgentQuestionRequest,
) => {
if (left.requestId === right.requestId) return true;
if (left.tool?.callID && right.tool?.callID) {
return left.tool.callID === right.tool.callID;
}
return (
left.status === "pending" &&
right.status === "pending" &&
left.sessionId === right.sessionId &&
getQuestionContentSignature(left.questions) ===
getQuestionContentSignature(right.questions)
);
};
const dedupeQuestionsAcrossMessages = (messages: Message[]) => {
const seen: AgentQuestionRequest[] = [];
let changed = false;
const nextMessages = messages.map((message) => {
if (!message.questions?.length) {
return message;
}
const nextQuestions = message.questions.filter((question) => {
if (seen.some((existing) => isSameQuestionPair(existing, question))) {
changed = true;
return false;
}
seen.push(question);
return true;
});
if (nextQuestions.length === message.questions.length) {
return message;
}
return {
...message,
questions: nextQuestions.length ? nextQuestions : undefined,
};
});
return changed ? nextMessages : messages;
};
const upsertQuestionAcrossMessages = (
messages: Message[],
event: StreamEvent & { type: "question_request" },
assistantMessageId: string,
) => {
let existing: AgentQuestionRequest | undefined;
for (const message of messages) {
const match = message.questions?.find((question) =>
isSameQuestionRequest(question, event),
);
if (match) {
existing = match;
break;
}
}
const existingStatus: AgentQuestionRequest["status"] | undefined =
existing?.status === "submitting" ? "submitting" : undefined;
const nextQuestion =
existing &&
isActionableQuestionRequest(existing) &&
!isActionableQuestionRequest(event)
? {
...existing,
sessionId: event.sessionId,
questions: event.questions,
tool: event.tool ?? existing.tool,
createdAt: event.createdAt,
status: existingStatus ?? existing.status,
}
: toQuestionRequest(event, existingStatus ?? "pending");
const targetMessageId = existing
? messages.find((message) =>
message.questions?.some((question) => isSameQuestionRequest(question, event)),
)?.id ?? assistantMessageId
: assistantMessageId;
return messages.map((message) => {
const filteredQuestions = message.questions?.filter(
(question) => !isSameQuestionRequest(question, event),
);
if (message.id !== targetMessageId) {
return filteredQuestions?.length === message.questions?.length
? message
: {
...message,
questions: filteredQuestions?.length ? filteredQuestions : undefined,
};
}
const nextQuestions = [...(filteredQuestions ?? []), nextQuestion];
return {
...message,
questions: nextQuestions,
};
});
};
const applyQuestionResponse = (
questions: AgentQuestionRequest[] | undefined,
event: StreamEvent & { type: "question_response" },
) =>
(questions ?? []).map((question) =>
question.requestId === event.requestId
? {
...question,
status: event.rejected ? "rejected" as const : "answered" as const,
answers: event.answers ?? question.answers,
repliedAt: Date.now(),
error: undefined,
}
: question,
);
const upsertTodoUpdate = (
todos: AgentTodoUpdate[] | undefined,
event: StreamEvent & { type: "todo_update" },
) => [
{
sessionId: event.sessionId,
messageId: event.messageId,
todos: event.todos,
createdAt: event.createdAt,
},
];
const finalizeAssistantMessageAfterAbort = (message: Message): Message => {
const completedProgress = completeRunningProgress(message.progress);
const cancelledTodos = cancelRunningTodos(message.todos);
const hasVisibleOutput =
message.content.trim().length > 0 ||
Boolean(message.artifacts?.length) ||
Boolean(completedProgress?.length);
Boolean(completedProgress?.length) ||
Boolean(cancelledTodos?.length);
if (!hasVisibleOutput) {
return message;
@@ -186,6 +384,7 @@ const finalizeAssistantMessageAfterAbort = (message: Message): Message => {
content: message.content || "⚠️ **请求已中断**",
isError: true,
progress: completedProgress,
todos: cancelledTodos,
};
};
@@ -291,7 +490,7 @@ export const useAgentChatSession = ({
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
setMessages(loadedState.messages);
setMessages(dedupeQuestionsAcrossMessages(loadedState.messages));
setSessionTitle(loadedState.title);
setIsSessionTitleManuallyEdited(loadedState.isTitleManuallyEdited ?? false);
setSessionId(loadedState.sessionId);
@@ -401,7 +600,9 @@ export const useAgentChatSession = ({
}
if (event.type === "state") {
const nextMessages = cloneMessages(event.messages as Message[]);
const nextMessages = dedupeQuestionsAcrossMessages(
cloneMessages(event.messages as Message[]),
);
messagesRef.current = nextMessages;
setMessages(nextMessages);
setIsStreaming(event.isStreaming);
@@ -502,6 +703,32 @@ export const useAgentChatSession = ({
};
}),
);
} else if (event.type === "question_request") {
setMessages((prev) =>
upsertQuestionAcrossMessages(prev, event, assistantMessageId),
);
} else if (event.type === "question_response") {
setMessages((prev) =>
prev.map((message) =>
message.questions?.some((question) => question.requestId === event.requestId)
? {
...message,
questions: applyQuestionResponse(message.questions, event),
}
: message,
),
);
} else if (event.type === "todo_update") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
todos: upsertTodoUpdate(message.todos, event),
}
: message,
),
);
} else if (event.type === "done") {
setMessages((prev) =>
prev.map((message) => {
@@ -531,6 +758,7 @@ export const useAgentChatSession = ({
content: message.content || `⚠️ **错误:** ${event.message}`,
isError: true,
progress: completeRunningProgress(message.progress),
todos: cancelRunningTodos(message.todos),
}
: message,
),
@@ -621,11 +849,7 @@ export const useAgentChatSession = ({
prev
.map((message) =>
message.id === nextAssistantMessage.id
? {
...message,
content: message.content || "⚠️ **请求已中断**",
isError: true,
}
? finalizeAssistantMessageAfterAbort(message)
: message,
)
.filter(
@@ -635,7 +859,8 @@ export const useAgentChatSession = ({
message.role === "assistant" &&
message.content.trim().length === 0 &&
!(message.artifacts?.length) &&
!(message.progress?.length)
!(message.progress?.length) &&
!(message.todos?.length)
),
),
);
@@ -766,6 +991,145 @@ export const useAgentChatSession = ({
[],
);
const replyQuestion = useCallback(
async (requestId: string, answers: string[][]) => {
const target = messagesRef.current
.flatMap((message) => message.questions ?? [])
.find((question) => question.requestId === requestId);
if (!target || target.status === "submitting") {
return;
}
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? { ...question, status: "submitting", error: undefined }
: question,
),
},
),
);
try {
await replyAgentQuestion(target.sessionId, requestId, answers);
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "answered",
answers,
repliedAt: Date.now(),
error: undefined,
}
: question,
),
},
),
);
} catch (error) {
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "error",
error: error instanceof Error ? error.message : String(error),
}
: question,
),
},
),
);
}
},
[],
);
const rejectQuestion = useCallback(
async (requestId: string) => {
const target = messagesRef.current
.flatMap((message) => message.questions ?? [])
.find((question) => question.requestId === requestId);
if (!target || target.status === "submitting") {
return;
}
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? { ...question, status: "submitting", error: undefined }
: question,
),
},
),
);
try {
await rejectAgentQuestion(target.sessionId, requestId);
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "rejected",
repliedAt: Date.now(),
error: undefined,
}
: question,
),
},
),
);
} catch (error) {
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "error",
error: error instanceof Error ? error.message : String(error),
}
: question,
),
},
),
);
}
},
[],
);
const createSession = useCallback(() => {
if (isHydrating || isStreaming) return;
@@ -1009,6 +1373,8 @@ export const useAgentChatSession = ({
createBranch,
abort,
replyPermission,
replyQuestion,
rejectQuestion,
createSession,
renameSession,
removeSession,