Files
TJWaterFrontend_Refine/src/components/chat/hooks/useAgentChatSession.ts
T
jiang e5f13c3d46
Build Push and Deploy / docker-image (push) Successful in 1m7s
Build Push and Deploy / deploy-fallback-log (push) Has been skipped
fix(chat): remove regenerate action
2026-06-08 19:33:06 +08:00

950 lines
31 KiB
TypeScript

"use client";
import { useCallback, useEffect, useRef, useState } from "react";
import { abortAgentChat, forkAgentChat, rejectAgentQuestion, replyAgentPermission, replyAgentQuestion, resumeAgentChatStream, streamAgentChat } from "@/lib/chatStream";
import type { PermissionReply, StreamEvent } from "@/lib/chatStream";
import type { AgentArtifact, ChatSessionSummary, LoadedChatState, Message } from "../GlobalChatbox.types";
import { cloneMessages } from "../GlobalChatbox.utils";
import { createEmptyChatState, deleteChatSession, listChatSessions, loadChatSessionById, saveActiveChatState, updateChatSessionTitle } from "../chatStorage";
import { applyQuestionResponse, cancelRunningTodos, completeRunningProgress, createAssistantMessage, createPersistedStateKey, createTodoUpdateFromEvent, createUserMessage, dedupeQuestionsAcrossMessages, finalizeAssistantMessageAfterAbort, normalizeSessionTodos, toPermissionStatus, upsertPermission, upsertProgress, upsertQuestionAcrossMessages } from "./agentChatSessionState";
import type { PromptRunOptions, UseAgentChatSessionOptions } from "./useAgentChatSession.types";
export const useAgentChatSession = ({
projectId,
onToolCall,
onBeforeSend,
getModel,
getApprovalMode,
}: UseAgentChatSessionOptions) => {
const hydrationCompletedRef = useRef(false);
const hydrationNonceRef = useRef(0);
const [messages, setMessages] = useState<Message[]>([]);
const [sessionTitle, setSessionTitle] = useState<string | undefined>(undefined);
const [isSessionTitleManuallyEdited, setIsSessionTitleManuallyEdited] = useState(false);
const [sessionId, setSessionId] = useState<string | undefined>(undefined);
const [chatSessions, setChatSessions] = useState<ChatSessionSummary[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const [isHydrating, setIsHydrating] = useState(true);
const abortRef = useRef<AbortController | null>(null);
const sessionIdRef = useRef<string | undefined>(undefined);
const messagesRef = useRef<Message[]>([]);
const resumeStreamingSessionRef = useRef<((sessionId: string) => void) | null>(null);
const isSessionTitleManuallyEditedRef = useRef(false);
const cancelPromiseRef = useRef<Promise<void> | null>(null);
const titleUpdateNonceRef = useRef(0);
const lastPersistedStateKeyRef = useRef(
createPersistedStateKey({
sessionId: undefined,
title: undefined,
isTitleManuallyEdited: false,
messages: [],
}),
);
useEffect(() => {
sessionIdRef.current = sessionId;
}, [sessionId]);
useEffect(() => {
messagesRef.current = messages;
}, [messages]);
useEffect(() => {
isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited;
}, [isSessionTitleManuallyEdited]);
useEffect(() => {
let cancelled = false;
const hydrate = async () => {
setIsHydrating(true);
hydrationCompletedRef.current = false;
if (!projectId) {
sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({
title: undefined,
isTitleManuallyEdited: false,
messages: [],
sessionId: undefined,
});
hydrationCompletedRef.current = true;
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
setMessages([]);
setSessionTitle(undefined);
setIsSessionTitleManuallyEdited(false);
setSessionId(undefined);
setChatSessions([]);
setIsHydrating(false);
return;
}
try {
const sessions = await listChatSessions();
const streamingSession = sessions.find((session) => session.isStreaming);
const loadedState = streamingSession
? await loadChatSessionById(streamingSession.id)
: createEmptyChatState();
if (cancelled) return;
sessionIdRef.current = loadedState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(loadedState);
hydrationCompletedRef.current = true;
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
setMessages(
normalizeSessionTodos(dedupeQuestionsAcrossMessages(loadedState.messages)),
);
setSessionTitle(loadedState.title);
setIsSessionTitleManuallyEdited(loadedState.isTitleManuallyEdited ?? false);
setSessionId(loadedState.sessionId);
setChatSessions(sessions);
if (
loadedState.sessionId &&
(loadedState.isStreaming || streamingSession?.isStreaming)
) {
resumeStreamingSessionRef.current?.(loadedState.sessionId);
}
} catch (error) {
console.error("[GlobalChatbox] Failed to hydrate chat state:", error);
} finally {
if (!cancelled) {
setIsHydrating(false);
}
}
};
void hydrate();
return () => {
cancelled = true;
};
}, [projectId]);
useEffect(() => {
if (!projectId || isHydrating || !hydrationCompletedRef.current) return;
const currentHydrationNonce = hydrationNonceRef.current;
const persistTimer = window.setTimeout(() => {
if (isStreaming) {
return;
}
const state: LoadedChatState = {
title: sessionTitle,
isTitleManuallyEdited: isSessionTitleManuallyEdited,
messages,
sessionId,
};
const currentStateKey = createPersistedStateKey(state);
if (currentStateKey === lastPersistedStateKeyRef.current) {
return;
}
void saveActiveChatState(state)
.then((sessionId) => {
if (hydrationNonceRef.current !== currentHydrationNonce) return;
sessionIdRef.current = sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey({
...state,
sessionId,
});
return listChatSessions();
})
.then((sessions) => {
if (!sessions || hydrationNonceRef.current !== currentHydrationNonce) return;
setChatSessions(sessions);
})
.catch((error) => {
console.error("[GlobalChatbox] Failed to persist chat state:", error);
});
}, 150);
return () => {
window.clearTimeout(persistTimer);
};
}, [isHydrating, isSessionTitleManuallyEdited, isStreaming, messages, projectId, sessionId, sessionTitle]);
const appendArtifact = useCallback((messageId: string, artifact: AgentArtifact) => {
setMessages((prev) =>
prev.map((message) =>
message.id === messageId
? {
...message,
artifacts: [...(message.artifacts ?? []), artifact],
}
: message,
),
);
}, []);
const getLastAssistantMessageId = useCallback((fallback?: string) => {
const assistant = [...messagesRef.current]
.reverse()
.find((message) => message.role === "assistant");
return assistant?.id ?? fallback;
}, []);
const applyStreamEvent = useCallback(
(
event: StreamEvent,
options?: {
assistantMessageId?: string;
},
) => {
if (
event.type !== "session_title" &&
"sessionId" in event &&
event.sessionId &&
event.sessionId !== sessionIdRef.current
) {
sessionIdRef.current = event.sessionId;
setSessionId(event.sessionId);
}
if (event.type === "state") {
const nextMessages = normalizeSessionTodos(
dedupeQuestionsAcrossMessages(cloneMessages(event.messages as Message[])),
);
messagesRef.current = nextMessages;
setMessages(nextMessages);
setIsStreaming(event.isStreaming);
return;
}
if (event.type === "session_title") {
const nextTitle = event.title.trim();
if (nextTitle && !isSessionTitleManuallyEditedRef.current) {
const currentSessionId = sessionIdRef.current;
const targetSessionId = event.sessionId || currentSessionId;
if (targetSessionId === currentSessionId) {
setSessionTitle(nextTitle);
lastPersistedStateKeyRef.current = createPersistedStateKey({
sessionId: targetSessionId,
title: nextTitle,
isTitleManuallyEdited: false,
messages: messagesRef.current,
});
}
if (targetSessionId) {
const currentNonce = ++titleUpdateNonceRef.current;
void updateChatSessionTitle(targetSessionId, nextTitle, {
isTitleManuallyEdited: false,
})
.then(() => listChatSessions())
.then((sessions) => {
if (titleUpdateNonceRef.current !== currentNonce) return;
setChatSessions(sessions);
})
.catch((error) => {
console.error("[GlobalChatbox] Failed to persist session title:", error);
});
}
}
return;
}
const assistantMessageId = getLastAssistantMessageId(options?.assistantMessageId);
if (!assistantMessageId) {
return;
}
if (event.type === "token") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
content: message.content + event.content,
isError: false,
}
: message,
),
);
} else if (event.type === "progress") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? { ...message, progress: upsertProgress(message.progress, event) }
: message,
),
);
} else if (event.type === "tool_call") {
onToolCall(event, {
assistantMessageId,
appendArtifact,
});
} else if (event.type === "permission_request") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
permissions: upsertPermission(message.permissions, event),
}
: message,
),
);
} else if (event.type === "permission_response") {
setMessages((prev) =>
prev.map((message) => {
if (message.id !== assistantMessageId || !message.permissions?.length) {
return message;
}
return {
...message,
permissions: message.permissions.map((permission) =>
permission.requestId === event.requestId
? {
...permission,
status: toPermissionStatus(event.reply),
repliedAt: Date.now(),
error: undefined,
}
: permission,
),
};
}),
);
} else if (event.type === "question_request") {
setMessages((prev) =>
upsertQuestionAcrossMessages(prev, event, assistantMessageId),
);
} else if (event.type === "question_response") {
setMessages((prev) =>
prev.map((message) =>
message.questions?.some((question) => question.requestId === event.requestId)
? {
...message,
questions: applyQuestionResponse(message.questions, event),
}
: message,
),
);
} else if (event.type === "todo_update") {
setMessages((prev) =>
normalizeSessionTodos(
prev,
createTodoUpdateFromEvent(event),
assistantMessageId,
),
);
} else if (event.type === "done") {
setMessages((prev) =>
prev.map((message) => {
if (message.id !== assistantMessageId) return message;
const completedProgress = completeRunningProgress(message.progress);
if (
message.content.trim().length === 0 &&
!(message.artifacts?.length)
) {
return {
...message,
content:
"Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。",
progress: completedProgress,
};
}
return { ...message, progress: completedProgress };
}),
);
setIsStreaming(false);
} else if (event.type === "error") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
content: message.content || `⚠️ **错误:** ${event.message}`,
isError: true,
progress: completeRunningProgress(message.progress),
todos: cancelRunningTodos(message.todos),
}
: message,
),
);
setIsStreaming(false);
}
},
[appendArtifact, getLastAssistantMessageId, onToolCall],
);
const resumeStreamingSession = useCallback(
(nextSessionId: string) => {
const controller = new AbortController();
abortRef.current?.abort();
abortRef.current = controller;
setIsStreaming(true);
void resumeAgentChatStream({
sessionId: nextSessionId,
signal: controller.signal,
onEvent: (event) => applyStreamEvent(event),
})
.catch((error) => {
if (!controller.signal.aborted) {
console.error("[GlobalChatbox] Failed to resume chat stream:", error);
setIsStreaming(false);
}
})
.finally(() => {
if (abortRef.current === controller) {
abortRef.current = null;
}
});
},
[applyStreamEvent],
);
resumeStreamingSessionRef.current = resumeStreamingSession;
const runPrompt = useCallback(
async ({
prompt: rawPrompt,
sessionIdOverride,
preparedMessages,
userMessage,
assistantMessage,
}: PromptRunOptions) => {
const prompt = rawPrompt.trim();
if (!prompt || isStreaming || isHydrating) return;
await cancelPromiseRef.current?.catch(() => undefined);
onBeforeSend?.();
const nextUserMessage = userMessage ?? createUserMessage(prompt);
const nextAssistantMessage = assistantMessage ?? createAssistantMessage();
const nextMessages =
preparedMessages ??
[...messages, nextUserMessage, nextAssistantMessage];
const clonedNextMessages = cloneMessages(nextMessages);
setIsStreaming(true);
messagesRef.current = clonedNextMessages;
setMessages(clonedNextMessages);
if (sessionIdOverride !== undefined) {
sessionIdRef.current = sessionIdOverride;
setSessionId(sessionIdOverride);
}
const controller = new AbortController();
abortRef.current = controller;
try {
await streamAgentChat({
message: prompt,
sessionId: sessionIdOverride ?? sessionIdRef.current,
model: getModel?.(),
approvalMode: getApprovalMode?.(),
signal: controller.signal,
onEvent: (event) =>
applyStreamEvent(event, {
assistantMessageId: nextAssistantMessage.id,
}),
});
} catch (error) {
if (controller.signal.aborted) {
setMessages((prev) =>
prev
.map((message) =>
message.id === nextAssistantMessage.id
? finalizeAssistantMessageAfterAbort(message)
: message,
)
.filter(
(message) =>
!(
message.id === nextAssistantMessage.id &&
message.role === "assistant" &&
message.content.trim().length === 0 &&
!(message.artifacts?.length) &&
!(message.progress?.length) &&
!message.todos
),
),
);
return;
}
setMessages((prev) =>
prev.map((message) =>
message.id === nextAssistantMessage.id
? {
...message,
content: `⚠️ **错误:** ${String(error)}`,
isError: true,
progress: completeRunningProgress(message.progress),
}
: message,
),
);
setIsStreaming(false);
} finally {
abortRef.current = null;
setIsStreaming(false);
}
},
[
applyStreamEvent,
getApprovalMode,
getModel,
isHydrating,
isStreaming,
messages,
onBeforeSend,
],
);
const abort = useCallback(() => {
const controller = abortRef.current;
controller?.abort();
setIsStreaming(false);
const assistantMessageId = getLastAssistantMessageId();
if (assistantMessageId) {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? finalizeAssistantMessageAfterAbort(message)
: message,
),
);
}
const cancelPromise = abortAgentChat(sessionIdRef.current).catch((error) => {
console.error("[GlobalChatbox] Failed to abort agent session:", error);
});
const trackedCancelPromise = cancelPromise.finally(() => {
if (cancelPromiseRef.current === trackedCancelPromise) {
cancelPromiseRef.current = null;
}
});
cancelPromiseRef.current = trackedCancelPromise;
}, [getLastAssistantMessageId]);
const replyPermission = useCallback(
async (requestId: string, reply: PermissionReply) => {
const target = messagesRef.current
.flatMap((message) => message.permissions ?? [])
.find((permission) => permission.requestId === requestId);
if (!target || target.status === "submitting") {
return;
}
setMessages((prev) =>
prev.map((message) =>
!message.permissions?.some((permission) => permission.requestId === requestId)
? message
: {
...message,
permissions: message.permissions.map((permission) =>
permission.requestId === requestId
? { ...permission, status: "submitting", error: undefined }
: permission,
),
},
),
);
try {
await replyAgentPermission(target.sessionId, requestId, reply);
setMessages((prev) =>
prev.map((message) =>
!message.permissions?.some((permission) => permission.requestId === requestId)
? message
: {
...message,
permissions: message.permissions.map((permission) =>
permission.requestId === requestId
? {
...permission,
status: toPermissionStatus(reply),
repliedAt: Date.now(),
error: undefined,
}
: permission,
),
},
),
);
} catch (error) {
setMessages((prev) =>
prev.map((message) =>
!message.permissions?.some((permission) => permission.requestId === requestId)
? message
: {
...message,
permissions: message.permissions.map((permission) =>
permission.requestId === requestId
? {
...permission,
status: "error",
error: error instanceof Error ? error.message : String(error),
}
: permission,
),
},
),
);
}
},
[],
);
const replyQuestion = useCallback(
async (requestId: string, answers: string[][]) => {
const target = messagesRef.current
.flatMap((message) => message.questions ?? [])
.find((question) => question.requestId === requestId);
if (!target || target.status === "submitting") {
return;
}
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? { ...question, status: "submitting", error: undefined }
: question,
),
},
),
);
try {
await replyAgentQuestion(target.sessionId, requestId, answers);
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "answered",
answers,
repliedAt: Date.now(),
error: undefined,
}
: question,
),
},
),
);
} catch (error) {
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "error",
error: error instanceof Error ? error.message : String(error),
}
: question,
),
},
),
);
}
},
[],
);
const rejectQuestion = useCallback(
async (requestId: string) => {
const target = messagesRef.current
.flatMap((message) => message.questions ?? [])
.find((question) => question.requestId === requestId);
if (!target || target.status === "submitting") {
return;
}
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? { ...question, status: "submitting", error: undefined }
: question,
),
},
),
);
try {
await rejectAgentQuestion(target.sessionId, requestId);
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "rejected",
repliedAt: Date.now(),
error: undefined,
}
: question,
),
},
),
);
} catch (error) {
setMessages((prev) =>
prev.map((message) =>
!message.questions?.some((question) => question.requestId === requestId)
? message
: {
...message,
questions: message.questions.map((question) =>
question.requestId === requestId
? {
...question,
status: "error",
error: error instanceof Error ? error.message : String(error),
}
: question,
),
},
),
);
}
},
[],
);
const createSession = useCallback(() => {
if (isHydrating || isStreaming) return;
const controller = abortRef.current;
controller?.abort();
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({
title: "新对话",
isTitleManuallyEdited: false,
messages: [],
sessionId: undefined,
});
setMessages([]);
setSessionTitle("新对话");
setIsSessionTitleManuallyEdited(false);
setSessionId(undefined);
setIsStreaming(false);
}, [isHydrating, isStreaming]);
const switchSession = useCallback(
async (nextSessionId: string) => {
if (isHydrating || isStreaming || sessionIdRef.current === nextSessionId) {
return;
}
setIsHydrating(true);
try {
const [nextState, sessions] = await Promise.all([
loadChatSessionById(nextSessionId),
listChatSessions(),
]);
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
sessionIdRef.current = nextState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(nextState);
setMessages(nextState.messages);
setSessionTitle(nextState.title);
setIsSessionTitleManuallyEdited(nextState.isTitleManuallyEdited ?? false);
setSessionId(nextState.sessionId);
setChatSessions(sessions);
if (nextState.sessionId && nextState.isStreaming) {
resumeStreamingSession(nextState.sessionId);
} else {
setIsStreaming(false);
}
} catch (error) {
console.error("[GlobalChatbox] Failed to switch chat session:", error);
} finally {
setIsHydrating(false);
}
},
[isHydrating, isStreaming, resumeStreamingSession],
);
const removeSession = useCallback(
async (targetSessionId: string) => {
if (isHydrating || isStreaming) return;
setChatSessions((prev) =>
prev.filter((session) => session.id !== targetSessionId),
);
try {
const nextActiveSessionId = await deleteChatSession(
targetSessionId,
);
const sessions = await listChatSessions();
setChatSessions(sessions);
if (sessionIdRef.current !== targetSessionId) {
return;
}
if (!nextActiveSessionId) {
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({
title: undefined,
isTitleManuallyEdited: false,
messages: [],
sessionId: undefined,
});
setMessages([]);
setSessionTitle(undefined);
setIsSessionTitleManuallyEdited(false);
setSessionId(undefined);
return;
}
setIsHydrating(true);
const [nextState, sessionsAfterDelete] = await Promise.all([
loadChatSessionById(nextActiveSessionId),
listChatSessions(),
]);
hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1;
sessionIdRef.current = nextState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(nextState);
setMessages(nextState.messages);
setSessionTitle(nextState.title);
setIsSessionTitleManuallyEdited(nextState.isTitleManuallyEdited ?? false);
setSessionId(nextState.sessionId);
setChatSessions(sessionsAfterDelete);
} catch (error) {
console.error("[GlobalChatbox] Failed to delete chat session:", error);
try {
setChatSessions(await listChatSessions());
} catch (refreshError) {
console.error("[GlobalChatbox] Failed to refresh chat sessions:", refreshError);
}
} finally {
setIsHydrating(false);
}
},
[isHydrating, isStreaming],
);
const sendPrompt = useCallback(
async (rawPrompt: string) => {
await runPrompt({ prompt: rawPrompt });
},
[runPrompt],
);
const renameSession = useCallback(
async (targetSessionId: string, nextTitle: string) => {
const normalizedTitle = nextTitle.trim();
if (!normalizedTitle || isHydrating) return;
try {
await updateChatSessionTitle(targetSessionId, normalizedTitle, {
isTitleManuallyEdited: true,
});
const sessions = await listChatSessions();
setChatSessions(sessions);
if (sessionIdRef.current === targetSessionId) {
setSessionTitle(normalizedTitle);
setIsSessionTitleManuallyEdited(true);
lastPersistedStateKeyRef.current = createPersistedStateKey({
sessionId: targetSessionId,
title: normalizedTitle,
isTitleManuallyEdited: true,
messages,
});
}
} catch (error) {
console.error("[GlobalChatbox] Failed to rename chat session:", error);
}
},
[isHydrating, messages],
);
const createBranch = useCallback(
async (messageId: string) => {
if (isHydrating || isStreaming) return;
const assistantIndex = messages.findIndex(
(message) => message.id === messageId && message.role === "assistant",
);
if (assistantIndex < 0) return;
const currentSessionId = sessionIdRef.current;
const keepMessageCount = assistantIndex + 1;
const copiedMessages = cloneMessages(messages.slice(0, keepMessageCount));
const forkedSessionId = await forkAgentChat(currentSessionId, keepMessageCount);
sessionIdRef.current = forkedSessionId;
setSessionId(forkedSessionId);
messagesRef.current = copiedMessages;
setMessages(copiedMessages);
setIsSessionTitleManuallyEdited(false);
const forkTitle = sessionTitle ? `${sessionTitle} 副本` : "新对话副本";
setSessionTitle(forkTitle);
try {
await saveActiveChatState({
title: forkTitle,
isTitleManuallyEdited: false,
messages: copiedMessages,
sessionId: forkedSessionId,
});
setChatSessions(await listChatSessions());
} catch (error) {
console.error("[GlobalChatbox] Failed to refresh chat sessions after fork:", error);
}
},
[isHydrating, isStreaming, messages, sessionTitle],
);
return {
messages,
chatSessions,
activeSessionId: sessionIdRef.current,
isHydrating,
isStreaming,
sessionTitle,
sessionId,
sendPrompt,
createBranch,
abort,
replyPermission,
replyQuestion,
rejectQuestion,
createSession,
renameSession,
removeSession,
switchSession,
};
};