3 Commits

Author SHA1 Message Date
jiang 57369772c7 fix(chat): 更新工具进度名称
Build Push and Deploy / docker-image (push) Successful in 1m17s
Build Push and Deploy / deploy-fallback-log (push) Has been skipped
2026-06-04 18:02:38 +08:00
jiang 7764e25398 增加流式信息中断处理机制 2026-06-04 16:27:15 +08:00
jiang e60e1f6453 refactor: use backend chat sessions 2026-06-04 15:02:27 +08:00
11 changed files with 649 additions and 374 deletions
@@ -165,9 +165,6 @@ export const AgentHistoryPanel = ({
<Typography variant="subtitle2" fontWeight={800} color="text.primary"> <Typography variant="subtitle2" fontWeight={800} color="text.primary">
</Typography> </Typography>
<Typography variant="caption" color="text.secondary">
</Typography>
</Box> </Box>
<Tooltip title="新建对话"> <Tooltip title="新建对话">
<motion.div whileHover={{ scale: 1.08 }} whileTap={{ scale: 0.92 }} style={{ display: "flex" }}> <motion.div whileHover={{ scale: 1.08 }} whileTap={{ scale: 0.92 }} style={{ display: "flex" }}>
@@ -30,8 +30,8 @@ describe("AgentProgressTimeline", () => {
id: "tool", id: "tool",
phase: "tool", phase: "tool",
status: "running", status: "running",
title: "正在调用 dynamic_http_call", title: "正在调用 tjwater_cli",
detail: "GET /api/v1/network/bottlenecks", detail: "analysis bottlenecks",
startedAt: now - 1200, startedAt: now - 1200,
elapsedMs: 1200, elapsedMs: 1200,
elapsedSnapshotAt: now, elapsedSnapshotAt: now,
@@ -43,7 +43,7 @@ describe("AgentProgressTimeline", () => {
expect(screen.getByText(/Agent 过程:/)).toBeInTheDocument(); expect(screen.getByText(/Agent 过程:/)).toBeInTheDocument();
expect(screen.getByText(/耗时 5.0s/)).toBeInTheDocument(); expect(screen.getByText(/耗时 5.0s/)).toBeInTheDocument();
expect(screen.getByText("查询后端数据")).toBeInTheDocument(); expect(screen.getByText("查询后端数据")).toBeInTheDocument();
expect(screen.getByText("GET /api/v1/network/bottlenecks")).toBeInTheDocument(); expect(screen.getByText("analysis bottlenecks")).toBeInTheDocument();
expect(screen.getByText("1.2s")).toBeInTheDocument(); expect(screen.getByText("1.2s")).toBeInTheDocument();
}); });
@@ -86,7 +86,7 @@ describe("AgentProgressTimeline", () => {
id: "tool", id: "tool",
phase: "tool", phase: "tool",
status: "completed", status: "completed",
title: "正在调用 dynamic_http_call", title: "正在调用 tjwater_cli",
startedAt: Date.now() - 4000, startedAt: Date.now() - 4000,
endedAt: Date.now(), endedAt: Date.now(),
}, },
@@ -76,7 +76,7 @@ const phaseIcon = (phase: string, status: ChatProgress["status"]) => {
const formatToolTitle = (item: ChatProgress) => { const formatToolTitle = (item: ChatProgress) => {
const text = `${item.title} ${item.detail ?? ""}`; const text = `${item.title} ${item.detail ?? ""}`;
if (text.includes("dynamic_http_call")) return "查询后端数据"; if (text.includes("tjwater_cli")) return "查询后端数据";
if (text.includes("show_chart")) return "生成图表"; if (text.includes("show_chart")) return "生成图表";
if (text.includes("locate_features")) return "地图定位"; if (text.includes("locate_features")) return "地图定位";
if (text.includes("view_history")) return "打开历史曲线"; if (text.includes("view_history")) return "打开历史曲线";
+12 -12
View File
@@ -60,7 +60,7 @@ export const GlobalChatbox: React.FC<Props> = ({ open, onClose }) => {
const { const {
messages, messages,
chatSessions, chatSessions,
activeStorageSessionId, activeSessionId,
branchGroups, branchGroups,
branchTransition, branchTransition,
isHydrating, isHydrating,
@@ -129,33 +129,33 @@ export const GlobalChatbox: React.FC<Props> = ({ open, onClose }) => {
}, []); }, []);
const handleSelectSession = useCallback( const handleSelectSession = useCallback(
(storageSessionId: string) => { (sessionId: string) => {
composerRef.current?.clear(); composerRef.current?.clear();
void switchSession(storageSessionId); void switchSession(sessionId);
}, },
[switchSession], [switchSession],
); );
const handleDeleteSession = useCallback( const handleDeleteSession = useCallback(
(storageSessionId: string) => { (sessionId: string) => {
void removeSession(storageSessionId); void removeSession(sessionId);
}, },
[removeSession], [removeSession],
); );
const handleRenameSession = useCallback( const handleRenameSession = useCallback(
(storageSessionId: string, title: string) => { (sessionId: string, title: string) => {
void renameSession(storageSessionId, title); void renameSession(sessionId, title);
}, },
[renameSession], [renameSession],
); );
const handleRenameActiveSession = useCallback( const handleRenameActiveSession = useCallback(
(title: string) => { (title: string) => {
if (!activeStorageSessionId) return; if (!activeSessionId) return;
void renameSession(activeStorageSessionId, title); void renameSession(activeSessionId, title);
}, },
[activeStorageSessionId, renameSession], [activeSessionId, renameSession],
); );
const handleMouseDown = useCallback((event: React.MouseEvent) => { const handleMouseDown = useCallback((event: React.MouseEvent) => {
@@ -255,7 +255,7 @@ export const GlobalChatbox: React.FC<Props> = ({ open, onClose }) => {
<AgentHeader <AgentHeader
sessionTitle={sessionTitle} sessionTitle={sessionTitle}
canRenameSessionTitle={Boolean(activeStorageSessionId)} canRenameSessionTitle={Boolean(activeSessionId)}
isHydrating={isHydrating} isHydrating={isHydrating}
isStreaming={isStreaming} isStreaming={isStreaming}
isHistoryOpen={isHistoryOpen} isHistoryOpen={isHistoryOpen}
@@ -294,7 +294,7 @@ export const GlobalChatbox: React.FC<Props> = ({ open, onClose }) => {
> >
<AgentHistoryPanel <AgentHistoryPanel
sessions={chatSessions} sessions={chatSessions}
activeSessionId={activeStorageSessionId} activeSessionId={activeSessionId}
isHydrating={isHydrating} isHydrating={isHydrating}
onNewSession={() => { onNewSession={() => {
handleNewConversation(); handleNewConversation();
+5 -25
View File
@@ -66,41 +66,21 @@ export type Props = {
export type SpeechState = "idle" | "playing" | "paused"; export type SpeechState = "idle" | "playing" | "paused";
export type LegacyPersistedChatState = {
messages: Message[];
sessionId?: string;
branchGroups?: BranchGroup[];
};
export type ChatSessionRecord = {
id: string;
title: string;
isTitleManuallyEdited?: boolean;
createdAt: number;
updatedAt: number;
sessionId?: string;
messages: Message[];
branchGroups: BranchGroup[];
};
export type ChatSessionSummary = { export type ChatSessionSummary = {
id: string; id: string;
title: string; title: string;
createdAt: number; createdAt: number;
updatedAt: number; updatedAt: number;
}; isStreaming?: boolean;
runStatus?: string;
export type ChatStorageMeta = {
key: "chat-meta";
activeSessionId?: string;
migratedFromLocalStorage?: boolean;
}; };
export type LoadedChatState = { export type LoadedChatState = {
storageSessionId?: string; sessionId?: string;
title?: string; title?: string;
isTitleManuallyEdited?: boolean; isTitleManuallyEdited?: boolean;
messages: Message[]; messages: Message[];
sessionId?: string;
branchGroups: BranchGroup[]; branchGroups: BranchGroup[];
isStreaming?: boolean;
runStatus?: string;
}; };
+3 -31
View File
@@ -1,5 +1,5 @@
import { import {
loadActiveChatState, createEmptyChatState,
saveActiveChatState, saveActiveChatState,
} from "./chatStorage"; } from "./chatStorage";
@@ -11,17 +11,13 @@ jest.mock("@/lib/apiFetch", () => ({
describe("chatStorage backend-only persistence", () => { describe("chatStorage backend-only persistence", () => {
beforeEach(() => { beforeEach(() => {
window.localStorage.clear();
apiFetch.mockReset(); apiFetch.mockReset();
}); });
it("starts from an empty conversation instead of restoring a stored active id", async () => { it("creates an empty initial conversation state without backend calls", () => {
window.localStorage.setItem("tjwater_agent_active_session_id_v2", "chat-active-1"); const loaded = createEmptyChatState();
const loaded = await loadActiveChatState();
expect(loaded).toMatchObject({ expect(loaded).toMatchObject({
storageSessionId: undefined,
title: undefined, title: undefined,
messages: [], messages: [],
sessionId: undefined, sessionId: undefined,
@@ -30,24 +26,6 @@ describe("chatStorage backend-only persistence", () => {
expect(apiFetch).not.toHaveBeenCalled(); expect(apiFetch).not.toHaveBeenCalled();
}); });
it("starts from an empty conversation when a project has a stored active id", async () => {
window.localStorage.setItem(
"tjwater_agent_active_session_id_v2:project-a",
"chat-project-a",
);
window.localStorage.setItem(
"tjwater_agent_active_session_id_v2:project-b",
"chat-project-b",
);
const loaded = await loadActiveChatState("project-b");
expect(loaded.storageSessionId).toBeUndefined();
expect(loaded.title).toBeUndefined();
expect(loaded.messages).toEqual([]);
expect(apiFetch).not.toHaveBeenCalled();
});
it("creates a backend conversation when saving the first non-empty state", async () => { it("creates a backend conversation when saving the first non-empty state", async () => {
apiFetch.mockImplementation(async (url: string, init?: RequestInit) => { apiFetch.mockImplementation(async (url: string, init?: RequestInit) => {
if (url.endsWith("/api/v1/agent/chat/session")) { if (url.endsWith("/api/v1/agent/chat/session")) {
@@ -75,7 +53,6 @@ describe("chatStorage backend-only persistence", () => {
const savedSessionId = await saveActiveChatState( const savedSessionId = await saveActiveChatState(
{ {
storageSessionId: undefined,
title: "新对话", title: "新对话",
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [ messages: [
@@ -89,13 +66,8 @@ describe("chatStorage backend-only persistence", () => {
sessionId: undefined, sessionId: undefined,
branchGroups: [], branchGroups: [],
}, },
"project-a",
); );
expect(savedSessionId).toBe("chat-new-1"); expect(savedSessionId).toBe("chat-new-1");
expect(
window.localStorage.getItem("tjwater_agent_active_session_id_v2:project-a"),
).toBeNull();
}); });
}); });
+32 -36
View File
@@ -9,15 +9,16 @@ import type {
} from "./GlobalChatbox.types"; } from "./GlobalChatbox.types";
import { cloneBranchGroups, cloneMessages } from "./GlobalChatbox.utils"; import { cloneBranchGroups, cloneMessages } from "./GlobalChatbox.utils";
type RemoteSessionPayload = { type BackendSessionPayload = {
id?: string; id?: string;
title?: string; title?: string;
created_at?: string | number; created_at?: string | number;
updated_at?: string | number; updated_at?: string | number;
is_streaming?: boolean;
run_status?: string;
}; };
const emptyLoadedChatState = (): LoadedChatState => ({ export const createEmptyChatState = (): LoadedChatState => ({
storageSessionId: undefined,
title: undefined, title: undefined,
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
@@ -58,7 +59,7 @@ const toMillis = (value: string | number | undefined) =>
const normalizeTitle = (value?: string) => value?.trim() || "新对话"; const normalizeTitle = (value?: string) => value?.trim() || "新对话";
const fetchRemoteChatSessions = async (): Promise<ChatSessionSummary[]> => { const fetchBackendChatSessions = async (): Promise<ChatSessionSummary[]> => {
const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/sessions`, { const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/sessions`, {
method: "GET", method: "GET",
projectHeaderMode: "include", projectHeaderMode: "include",
@@ -69,7 +70,7 @@ const fetchRemoteChatSessions = async (): Promise<ChatSessionSummary[]> => {
throw new Error(await response.text()); throw new Error(await response.text());
} }
const payload = (await response.json()) as { const payload = (await response.json()) as {
sessions?: RemoteSessionPayload[]; sessions?: BackendSessionPayload[];
}; };
return (payload.sessions ?? []) return (payload.sessions ?? [])
.map((session) => ({ .map((session) => ({
@@ -77,12 +78,14 @@ const fetchRemoteChatSessions = async (): Promise<ChatSessionSummary[]> => {
title: normalizeTitle(session.title), title: normalizeTitle(session.title),
createdAt: toMillis(session.created_at), createdAt: toMillis(session.created_at),
updatedAt: toMillis(session.updated_at), updatedAt: toMillis(session.updated_at),
isStreaming: session.is_streaming,
runStatus: session.run_status,
})) }))
.filter((session) => Boolean(session.id)) .filter((session) => Boolean(session.id))
.sort(compareSessionsByAnchorTime); .sort(compareSessionsByAnchorTime);
}; };
const fetchRemoteChatSession = async (sessionId: string): Promise<LoadedChatState> => { const fetchBackendChatSession = async (sessionId: string): Promise<LoadedChatState> => {
const response = await apiFetch( const response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}`, `${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}`,
{ {
@@ -94,7 +97,7 @@ const fetchRemoteChatSession = async (sessionId: string): Promise<LoadedChatStat
); );
if (!response.ok) { if (!response.ok) {
if (response.status === 404) { if (response.status === 404) {
return emptyLoadedChatState(); return createEmptyChatState();
} }
throw new Error(await response.text()); throw new Error(await response.text());
} }
@@ -105,18 +108,21 @@ const fetchRemoteChatSession = async (sessionId: string): Promise<LoadedChatStat
session_id?: string; session_id?: string;
messages?: Message[]; messages?: Message[];
branch_groups?: BranchGroup[]; branch_groups?: BranchGroup[];
is_streaming?: boolean;
run_status?: string;
}; };
return { return {
storageSessionId: payload.id,
title: normalizeTitle(payload.title), title: normalizeTitle(payload.title),
isTitleManuallyEdited: payload.is_title_manually_edited ?? false, isTitleManuallyEdited: payload.is_title_manually_edited ?? false,
messages: sanitizeMessages(payload.messages), messages: sanitizeMessages(payload.messages),
sessionId: payload.session_id, sessionId: payload.session_id ?? payload.id,
branchGroups: sanitizeBranchGroups(payload.branch_groups), branchGroups: sanitizeBranchGroups(payload.branch_groups),
isStreaming: payload.is_streaming ?? false,
runStatus: payload.run_status,
}; };
}; };
const createRemoteChatSession = async (payload?: { const createBackendChatSession = async (payload?: {
sessionId?: string; sessionId?: string;
parentSessionId?: string; parentSessionId?: string;
}) => { }) => {
@@ -146,7 +152,7 @@ const createRemoteChatSession = async (payload?: {
return sessionId; return sessionId;
}; };
const saveRemoteChatState = async ( const saveBackendChatState = async (
sessionId: string, sessionId: string,
state: LoadedChatState, state: LoadedChatState,
): Promise<string> => { ): Promise<string> => {
@@ -175,7 +181,7 @@ const saveRemoteChatState = async (
return payload.id ?? payload.session_id ?? sessionId; return payload.id ?? payload.session_id ?? sessionId;
}; };
const updateRemoteChatSessionTitle = async ( const updateBackendChatSessionTitle = async (
sessionId: string, sessionId: string,
title: string, title: string,
isTitleManuallyEdited?: boolean, isTitleManuallyEdited?: boolean,
@@ -201,7 +207,7 @@ const updateRemoteChatSessionTitle = async (
} }
}; };
const deleteRemoteChatSession = async (sessionId: string) => { const deleteBackendChatSession = async (sessionId: string) => {
const response = await apiFetch( const response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}`, `${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}`,
{ {
@@ -216,42 +222,34 @@ const deleteRemoteChatSession = async (sessionId: string) => {
} }
}; };
export const loadActiveChatState = async (
_projectId?: string | null,
): Promise<LoadedChatState> => {
return emptyLoadedChatState();
};
export const saveActiveChatState = async ( export const saveActiveChatState = async (
state: LoadedChatState, state: LoadedChatState,
_projectId?: string | null,
): Promise<string | undefined> => { ): Promise<string | undefined> => {
if (typeof window === "undefined") return state.storageSessionId; if (typeof window === "undefined") return state.sessionId;
if (!hasChatContent(state)) { if (!hasChatContent(state)) {
return undefined; return undefined;
} }
let remoteSessionId = state.sessionId ?? state.storageSessionId; let backendSessionId = state.sessionId;
if (!remoteSessionId) { if (!backendSessionId) {
remoteSessionId = await createRemoteChatSession(); backendSessionId = await createBackendChatSession();
} }
const savedSessionId = await saveRemoteChatState(remoteSessionId, { const savedSessionId = await saveBackendChatState(backendSessionId, {
...state, ...state,
storageSessionId: remoteSessionId, sessionId: backendSessionId,
sessionId: remoteSessionId,
}); });
return savedSessionId; return savedSessionId;
}; };
export const listChatSessions = async (): Promise<ChatSessionSummary[]> => { export const listChatSessions = async (): Promise<ChatSessionSummary[]> => {
if (typeof window === "undefined") return []; if (typeof window === "undefined") return [];
return await fetchRemoteChatSessions(); return await fetchBackendChatSessions();
}; };
export const updateChatSessionTitle = async ( export const updateChatSessionTitle = async (
storageSessionId: string, sessionId: string,
title: string, title: string,
options?: { options?: {
isTitleManuallyEdited?: boolean; isTitleManuallyEdited?: boolean;
@@ -261,8 +259,8 @@ export const updateChatSessionTitle = async (
const normalizedTitle = title.trim(); const normalizedTitle = title.trim();
if (!normalizedTitle) return; if (!normalizedTitle) return;
await updateRemoteChatSessionTitle( await updateBackendChatSessionTitle(
storageSessionId, sessionId,
normalizedTitle, normalizedTitle,
options?.isTitleManuallyEdited, options?.isTitleManuallyEdited,
); );
@@ -270,20 +268,18 @@ export const updateChatSessionTitle = async (
export const loadChatSessionById = async ( export const loadChatSessionById = async (
sessionId: string, sessionId: string,
_projectId?: string | null,
): Promise<LoadedChatState> => { ): Promise<LoadedChatState> => {
if (typeof window === "undefined") return emptyLoadedChatState(); if (typeof window === "undefined") return createEmptyChatState();
return await fetchRemoteChatSession(sessionId); return await fetchBackendChatSession(sessionId);
}; };
export const deleteChatSession = async ( export const deleteChatSession = async (
sessionId: string, sessionId: string,
_projectId?: string | null,
): Promise<string | undefined> => { ): Promise<string | undefined> => {
if (typeof window === "undefined") return undefined; if (typeof window === "undefined") return undefined;
await deleteRemoteChatSession(sessionId); await deleteBackendChatSession(sessionId);
const nextActiveSession = (await listChatSessions())[0]; const nextActiveSession = (await listChatSessions())[0];
return nextActiveSession?.id; return nextActiveSession?.id;
}; };
@@ -3,53 +3,53 @@
import { act, renderHook, waitFor } from "@testing-library/react"; import { act, renderHook, waitFor } from "@testing-library/react";
import { useAgentChatSession } from "./useAgentChatSession"; import { useAgentChatSession } from "./useAgentChatSession";
import { streamAgentChat } from "@/lib/chatStream"; import { abortAgentChat, resumeAgentChatStream, streamAgentChat } from "@/lib/chatStream";
import type { StreamEvent } from "@/lib/chatStream"; import type { StreamEvent } from "@/lib/chatStream";
jest.mock("@/lib/chatStream", () => ({ jest.mock("@/lib/chatStream", () => ({
abortAgentChat: jest.fn(async () => undefined), abortAgentChat: jest.fn(async () => undefined),
forkAgentChat: jest.fn(async () => "forked-session"), forkAgentChat: jest.fn(async () => "forked-session"),
resumeAgentChatStream: jest.fn(async () => undefined),
streamAgentChat: jest.fn(async () => undefined), streamAgentChat: jest.fn(async () => undefined),
})); }));
const loadActiveChatState = jest.fn();
const listChatSessions = jest.fn(); const listChatSessions = jest.fn();
const saveActiveChatState = jest.fn(); const saveActiveChatState = jest.fn();
const updateChatSessionTitle = jest.fn(); const updateChatSessionTitle = jest.fn();
jest.mock("../chatStorage", () => ({ jest.mock("../chatStorage", () => ({
deleteChatSession: jest.fn(async () => undefined), createEmptyChatState: jest.fn(() => ({
listChatSessions: (...args: unknown[]) => listChatSessions(...args), title: undefined,
loadActiveChatState: (...args: unknown[]) => loadActiveChatState(...args),
loadChatSessionById: jest.fn(async () => ({
storageSessionId: "session-loaded",
title: "已存在会话",
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
sessionId: undefined, sessionId: undefined,
branchGroups: [], branchGroups: [],
})), })),
deleteChatSession: jest.fn(async () => undefined),
listChatSessions: (...args: unknown[]) => listChatSessions(...args),
loadChatSessionById: jest.fn(async () => ({
title: "已存在会话",
isTitleManuallyEdited: false,
messages: [],
sessionId: "session-loaded",
branchGroups: [],
})),
saveActiveChatState: (...args: unknown[]) => saveActiveChatState(...args), saveActiveChatState: (...args: unknown[]) => saveActiveChatState(...args),
updateChatSessionTitle: (...args: unknown[]) => updateChatSessionTitle(...args), updateChatSessionTitle: (...args: unknown[]) => updateChatSessionTitle(...args),
})); }));
describe("useAgentChatSession", () => { describe("useAgentChatSession", () => {
beforeEach(() => { beforeEach(() => {
loadActiveChatState.mockReset();
listChatSessions.mockReset(); listChatSessions.mockReset();
saveActiveChatState.mockReset(); saveActiveChatState.mockReset();
updateChatSessionTitle.mockReset(); updateChatSessionTitle.mockReset();
jest.mocked(abortAgentChat).mockReset();
jest.mocked(resumeAgentChatStream).mockReset();
jest.mocked(streamAgentChat).mockReset(); jest.mocked(streamAgentChat).mockReset();
saveActiveChatState.mockImplementation(async (state) => state.storageSessionId); jest.mocked(abortAgentChat).mockImplementation(async () => undefined);
jest.mocked(resumeAgentChatStream).mockImplementation(async () => undefined);
loadActiveChatState.mockResolvedValue({ jest.mocked(streamAgentChat).mockImplementation(async () => undefined);
storageSessionId: undefined, saveActiveChatState.mockImplementation(async (state) => state.sessionId);
title: undefined,
isTitleManuallyEdited: false,
messages: [],
sessionId: undefined,
branchGroups: [],
});
}); });
it("does not add a new empty session to history until there is actual chat content", async () => { it("does not add a new empty session to history until there is actual chat content", async () => {
@@ -70,7 +70,7 @@ describe("useAgentChatSession", () => {
await waitFor(() => expect(result.current.sessionTitle).toBe("新对话")); await waitFor(() => expect(result.current.sessionTitle).toBe("新对话"));
expect(result.current.chatSessions).toEqual([]); expect(result.current.chatSessions).toEqual([]);
expect(result.current.activeStorageSessionId).toBeUndefined(); expect(result.current.activeSessionId).toBeUndefined();
expect(result.current.messages).toEqual([]); expect(result.current.messages).toEqual([]);
expect(result.current.isStreaming).toBe(false); expect(result.current.isStreaming).toBe(false);
expect(listChatSessions).toHaveBeenCalledTimes(1); expect(listChatSessions).toHaveBeenCalledTimes(1);
@@ -109,7 +109,7 @@ describe("useAgentChatSession", () => {
]); ]);
}); });
it("waits for the stream session id before persisting a new streaming conversation", async () => { it("persists a new conversation only after the stream is done", async () => {
listChatSessions.mockResolvedValue([]); listChatSessions.mockResolvedValue([]);
let emitStreamEvent: ((event: StreamEvent) => void) | undefined; let emitStreamEvent: ((event: StreamEvent) => void) | undefined;
jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => {
@@ -153,25 +153,142 @@ describe("useAgentChatSession", () => {
jest.advanceTimersByTime(200); jest.advanceTimersByTime(200);
}); });
expect(saveActiveChatState).toHaveBeenCalledTimes(1); expect(saveActiveChatState).not.toHaveBeenCalled();
act(() => {
emitStreamEvent?.({
type: "done",
sessionId: "chat-stream-1",
});
});
await act(async () => {
jest.advanceTimersByTime(200);
});
await waitFor(() => expect(saveActiveChatState).toHaveBeenCalledTimes(1));
expect(saveActiveChatState.mock.calls[0][0]).toMatchObject({ expect(saveActiveChatState.mock.calls[0][0]).toMatchObject({
sessionId: "chat-stream-1", sessionId: "chat-stream-1",
messages: [
expect.objectContaining({ role: "user", content: "第一条消息" }),
expect.objectContaining({ role: "assistant", content: "收到" }),
],
}); });
} finally { } finally {
jest.useRealTimers(); jest.useRealTimers();
} }
}); });
it("hydrates a backend streaming session and resumes its stream", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
runStatus: "running",
},
]);
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
expect(result.current.isStreaming).toBe(true);
expect(result.current.activeSessionId).toBe("session-loaded");
expect(resumeAgentChatStream).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "session-loaded",
}),
);
});
it("updates resumed messages from state, token, and done events", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{ id: "a1", role: "assistant", content: "已有" },
],
isStreaming: true,
runStatus: "running",
});
onEvent({
type: "token",
sessionId: "session-loaded",
content: "输出",
});
onEvent({
type: "done",
sessionId: "session-loaded",
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
await waitFor(() => expect(result.current.isStreaming).toBe(false));
expect(result.current.messages).toEqual([
expect.objectContaining({ id: "u1", role: "user", content: "继续分析" }),
expect.objectContaining({ id: "a1", role: "assistant", content: "已有输出" }),
]);
});
it("aborts a resumed streaming session through the backend abort endpoint", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async () => {
await new Promise<void>(() => undefined);
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isStreaming).toBe(true));
act(() => {
result.current.abort();
});
expect(abortAgentChat).toHaveBeenCalledWith("session-loaded");
});
it("ignores generated session titles after the title was edited manually", async () => { it("ignores generated session titles after the title was edited manually", async () => {
listChatSessions.mockResolvedValue([]); listChatSessions.mockResolvedValue([]);
loadActiveChatState.mockResolvedValue({
storageSessionId: "session-1",
title: "手动标题",
isTitleManuallyEdited: true,
messages: [],
sessionId: "session-1",
branchGroups: [],
});
jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => {
onEvent({ onEvent({
type: "session_title", type: "session_title",
@@ -193,13 +310,23 @@ describe("useAgentChatSession", () => {
await waitFor(() => expect(result.current.isHydrating).toBe(false)); await waitFor(() => expect(result.current.isHydrating).toBe(false));
await act(async () => {
await result.current.switchSession("session-loaded");
});
await act(async () => {
await result.current.renameSession("session-loaded", "手动标题");
});
await waitFor(() => expect(updateChatSessionTitle).toHaveBeenCalled());
await act(async () => { await act(async () => {
await result.current.sendPrompt("帮我分析一下"); await result.current.sendPrompt("帮我分析一下");
}); });
expect(result.current.sessionTitle).toBe("手动标题"); expect(result.current.sessionTitle).toBe("手动标题");
expect(updateChatSessionTitle).not.toHaveBeenCalledWith( expect(updateChatSessionTitle).not.toHaveBeenCalledWith(
"session-1", "session-loaded",
"自动标题", "自动标题",
expect.anything(), expect.anything(),
); );
+202 -135
View File
@@ -2,7 +2,12 @@
import { useCallback, useEffect, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { abortAgentChat, forkAgentChat, streamAgentChat } from "@/lib/chatStream"; import {
abortAgentChat,
forkAgentChat,
resumeAgentChatStream,
streamAgentChat,
} from "@/lib/chatStream";
import type { AgentModel, StreamEvent } from "@/lib/chatStream"; import type { AgentModel, StreamEvent } from "@/lib/chatStream";
import type { import type {
AgentArtifact, AgentArtifact,
@@ -19,9 +24,9 @@ import {
createId, createId,
} from "../GlobalChatbox.utils"; } from "../GlobalChatbox.utils";
import { import {
createEmptyChatState,
deleteChatSession, deleteChatSession,
listChatSessions, listChatSessions,
loadActiveChatState,
loadChatSessionById, loadChatSessionById,
saveActiveChatState, saveActiveChatState,
updateChatSessionTitle, updateChatSessionTitle,
@@ -50,7 +55,6 @@ type PromptRunOptions = {
const createPersistedStateKey = (state: LoadedChatState) => const createPersistedStateKey = (state: LoadedChatState) =>
JSON.stringify({ JSON.stringify({
storageSessionId: state.storageSessionId ?? null,
title: state.title ?? null, title: state.title ?? null,
isTitleManuallyEdited: state.isTitleManuallyEdited ?? false, isTitleManuallyEdited: state.isTitleManuallyEdited ?? false,
sessionId: state.sessionId ?? null, sessionId: state.sessionId ?? null,
@@ -151,7 +155,6 @@ export const useAgentChatSession = ({
onBeforeSend, onBeforeSend,
getModel, getModel,
}: UseAgentChatSessionOptions) => { }: UseAgentChatSessionOptions) => {
const storageSessionIdRef = useRef<string | undefined>(undefined);
const hydrationCompletedRef = useRef(false); const hydrationCompletedRef = useRef(false);
const hydrationNonceRef = useRef(0); const hydrationNonceRef = useRef(0);
@@ -166,16 +169,17 @@ export const useAgentChatSession = ({
const [isHydrating, setIsHydrating] = useState(true); const [isHydrating, setIsHydrating] = useState(true);
const abortRef = useRef<AbortController | null>(null); const abortRef = useRef<AbortController | null>(null);
const sessionIdRef = useRef<string | undefined>(undefined); const sessionIdRef = useRef<string | undefined>(undefined);
const messagesRef = useRef<Message[]>([]);
const resumeStreamingSessionRef = useRef<((sessionId: string) => void) | null>(null);
const isSessionTitleManuallyEditedRef = useRef(false); const isSessionTitleManuallyEditedRef = useRef(false);
const cancelPromiseRef = useRef<Promise<void> | null>(null); const cancelPromiseRef = useRef<Promise<void> | null>(null);
const titleUpdateNonceRef = useRef(0); const titleUpdateNonceRef = useRef(0);
const lastPersistedStateKeyRef = useRef( const lastPersistedStateKeyRef = useRef(
createPersistedStateKey({ createPersistedStateKey({
storageSessionId: undefined, sessionId: undefined,
title: undefined, title: undefined,
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
sessionId: undefined,
branchGroups: [], branchGroups: [],
}), }),
); );
@@ -184,6 +188,10 @@ export const useAgentChatSession = ({
sessionIdRef.current = sessionId; sessionIdRef.current = sessionId;
}, [sessionId]); }, [sessionId]);
useEffect(() => {
messagesRef.current = messages;
}, [messages]);
useEffect(() => { useEffect(() => {
isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited; isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited;
}, [isSessionTitleManuallyEdited]); }, [isSessionTitleManuallyEdited]);
@@ -196,10 +204,8 @@ export const useAgentChatSession = ({
hydrationCompletedRef.current = false; hydrationCompletedRef.current = false;
if (!projectId) { if (!projectId) {
storageSessionIdRef.current = undefined;
sessionIdRef.current = undefined; sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({ lastPersistedStateKeyRef.current = createPersistedStateKey({
storageSessionId: undefined,
title: undefined, title: undefined,
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
@@ -221,13 +227,13 @@ export const useAgentChatSession = ({
} }
try { try {
const [loadedState, sessions] = await Promise.all([ const sessions = await listChatSessions();
loadActiveChatState(projectId), const streamingSession = sessions.find((session) => session.isStreaming);
listChatSessions(), const loadedState = streamingSession
]); ? await loadChatSessionById(streamingSession.id)
: createEmptyChatState();
if (cancelled) return; if (cancelled) return;
storageSessionIdRef.current = loadedState.storageSessionId;
sessionIdRef.current = loadedState.sessionId; sessionIdRef.current = loadedState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(loadedState); lastPersistedStateKeyRef.current = createPersistedStateKey(loadedState);
hydrationCompletedRef.current = true; hydrationCompletedRef.current = true;
@@ -240,6 +246,12 @@ export const useAgentChatSession = ({
setSessionId(loadedState.sessionId); setSessionId(loadedState.sessionId);
setBranchGroups(loadedState.branchGroups); setBranchGroups(loadedState.branchGroups);
setChatSessions(sessions); setChatSessions(sessions);
if (
loadedState.sessionId &&
(loadedState.isStreaming || streamingSession?.isStreaming)
) {
resumeStreamingSessionRef.current?.(loadedState.sessionId);
}
} catch (error) { } catch (error) {
console.error("[GlobalChatbox] Failed to hydrate chat state:", error); console.error("[GlobalChatbox] Failed to hydrate chat state:", error);
} finally { } finally {
@@ -261,35 +273,30 @@ export const useAgentChatSession = ({
const currentHydrationNonce = hydrationNonceRef.current; const currentHydrationNonce = hydrationNonceRef.current;
const persistTimer = window.setTimeout(() => { const persistTimer = window.setTimeout(() => {
if (isStreaming) {
return;
}
const state: LoadedChatState = { const state: LoadedChatState = {
storageSessionId: storageSessionIdRef.current,
title: sessionTitle, title: sessionTitle,
isTitleManuallyEdited: isSessionTitleManuallyEdited, isTitleManuallyEdited: isSessionTitleManuallyEdited,
messages, messages,
sessionId, sessionId,
branchGroups, branchGroups,
}; };
if (
isStreaming &&
!state.storageSessionId &&
!state.sessionId &&
state.messages.length > 0
) {
return;
}
const currentStateKey = createPersistedStateKey(state); const currentStateKey = createPersistedStateKey(state);
if (currentStateKey === lastPersistedStateKeyRef.current) { if (currentStateKey === lastPersistedStateKeyRef.current) {
return; return;
} }
void saveActiveChatState(state, projectId) void saveActiveChatState(state)
.then((storageSessionId) => { .then((sessionId) => {
if (hydrationNonceRef.current !== currentHydrationNonce) return; if (hydrationNonceRef.current !== currentHydrationNonce) return;
storageSessionIdRef.current = storageSessionId; sessionIdRef.current = sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey({ lastPersistedStateKeyRef.current = createPersistedStateKey({
...state, ...state,
storageSessionId, sessionId,
}); });
return listChatSessions(); return listChatSessions();
}) })
@@ -359,6 +366,150 @@ export const useAgentChatSession = ({
); );
}, []); }, []);
const getLastAssistantMessageId = useCallback((fallback?: string) => {
const assistant = [...messagesRef.current]
.reverse()
.find((message) => message.role === "assistant");
return assistant?.id ?? fallback;
}, []);
const applyStreamEvent = useCallback(
(
event: StreamEvent,
options?: {
assistantMessageId?: string;
},
) => {
if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) {
sessionIdRef.current = event.sessionId;
setSessionId(event.sessionId);
}
if (event.type === "state") {
const nextMessages = cloneMessages(event.messages as Message[]);
messagesRef.current = nextMessages;
setMessages(nextMessages);
setIsStreaming(event.isStreaming);
return;
}
const assistantMessageId = getLastAssistantMessageId(options?.assistantMessageId);
if (!assistantMessageId) {
return;
}
if (event.type === "token") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
content: message.content + event.content,
isError: false,
}
: message,
),
);
} else if (event.type === "progress") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? { ...message, progress: upsertProgress(message.progress, event) }
: message,
),
);
} else if (event.type === "tool_call") {
onToolCall(event, {
assistantMessageId,
appendArtifact,
});
} else if (event.type === "session_title") {
const nextTitle = event.title.trim();
if (nextTitle && !isSessionTitleManuallyEditedRef.current) {
setSessionTitle(nextTitle);
const currentSessionId = sessionIdRef.current;
if (currentSessionId) {
const currentNonce = ++titleUpdateNonceRef.current;
void updateChatSessionTitle(currentSessionId, nextTitle, {
isTitleManuallyEdited: false,
})
.then(() => listChatSessions())
.then((sessions) => {
if (titleUpdateNonceRef.current !== currentNonce) return;
setChatSessions(sessions);
})
.catch((error) => {
console.error("[GlobalChatbox] Failed to persist session title:", error);
});
}
}
} else if (event.type === "done") {
setMessages((prev) =>
prev.map((message) => {
if (message.id !== assistantMessageId) return message;
const completedProgress = completeRunningProgress(message.progress);
if (
message.content.trim().length === 0 &&
!(message.artifacts?.length)
) {
return {
...message,
content:
"Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。",
progress: completedProgress,
};
}
return { ...message, progress: completedProgress };
}),
);
setIsStreaming(false);
} else if (event.type === "error") {
setMessages((prev) =>
prev.map((message) =>
message.id === assistantMessageId
? {
...message,
content: message.content || `⚠️ **错误:** ${event.message}`,
isError: true,
progress: completeRunningProgress(message.progress),
}
: message,
),
);
setIsStreaming(false);
}
},
[appendArtifact, getLastAssistantMessageId, onToolCall],
);
const resumeStreamingSession = useCallback(
(nextSessionId: string) => {
const controller = new AbortController();
abortRef.current?.abort();
abortRef.current = controller;
setIsStreaming(true);
void resumeAgentChatStream({
sessionId: nextSessionId,
signal: controller.signal,
onEvent: (event) => applyStreamEvent(event),
})
.catch((error) => {
if (!controller.signal.aborted) {
console.error("[GlobalChatbox] Failed to resume chat stream:", error);
setIsStreaming(false);
}
})
.finally(() => {
if (abortRef.current === controller) {
abortRef.current = null;
}
});
},
[applyStreamEvent],
);
resumeStreamingSessionRef.current = resumeStreamingSession;
const runPrompt = useCallback( const runPrompt = useCallback(
async ({ async ({
prompt: rawPrompt, prompt: rawPrompt,
@@ -380,8 +531,10 @@ export const useAgentChatSession = ({
preparedMessages ?? preparedMessages ??
[...messages, nextUserMessage, nextAssistantMessage]; [...messages, nextUserMessage, nextAssistantMessage];
const clonedNextMessages = cloneMessages(nextMessages);
setIsStreaming(true); setIsStreaming(true);
setMessages(cloneMessages(nextMessages)); messagesRef.current = clonedNextMessages;
setMessages(clonedNextMessages);
if (sessionIdOverride !== undefined) { if (sessionIdOverride !== undefined) {
sessionIdRef.current = sessionIdOverride; sessionIdRef.current = sessionIdOverride;
setSessionId(sessionIdOverride); setSessionId(sessionIdOverride);
@@ -396,93 +549,10 @@ export const useAgentChatSession = ({
sessionId: sessionIdOverride ?? sessionIdRef.current, sessionId: sessionIdOverride ?? sessionIdRef.current,
model: getModel?.(), model: getModel?.(),
signal: controller.signal, signal: controller.signal,
onEvent: (event) => { onEvent: (event) =>
if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) { applyStreamEvent(event, {
sessionIdRef.current = event.sessionId;
setSessionId(event.sessionId);
}
if (event.type === "token") {
setMessages((prev) =>
prev.map((message) =>
message.id === nextAssistantMessage.id
? {
...message,
content: message.content + event.content,
isError: false,
}
: message,
),
);
} else if (event.type === "progress") {
setMessages((prev) =>
prev.map((message) =>
message.id === nextAssistantMessage.id
? { ...message, progress: upsertProgress(message.progress, event) }
: message,
),
);
} else if (event.type === "tool_call") {
onToolCall(event, {
assistantMessageId: nextAssistantMessage.id, assistantMessageId: nextAssistantMessage.id,
appendArtifact,
});
} else if (event.type === "session_title") {
const nextTitle = event.title.trim();
if (nextTitle && !isSessionTitleManuallyEditedRef.current) {
setSessionTitle(nextTitle);
const currentStorageSessionId = storageSessionIdRef.current;
if (currentStorageSessionId) {
const currentNonce = ++titleUpdateNonceRef.current;
void updateChatSessionTitle(currentStorageSessionId, nextTitle, {
isTitleManuallyEdited: false,
})
.then(() => listChatSessions())
.then((sessions) => {
if (titleUpdateNonceRef.current !== currentNonce) return;
setChatSessions(sessions);
})
.catch((error) => {
console.error("[GlobalChatbox] Failed to persist session title:", error);
});
}
}
} else if (event.type === "done") {
setMessages((prev) =>
prev.map((message) => {
if (message.id !== nextAssistantMessage.id) return message;
const completedProgress = completeRunningProgress(message.progress);
if (
message.content.trim().length === 0 &&
!(message.artifacts?.length)
) {
return {
...message,
content:
"Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。",
progress: completedProgress,
};
}
return { ...message, progress: completedProgress };
}), }),
);
setIsStreaming(false);
} else if (event.type === "error") {
setMessages((prev) =>
prev.map((message) =>
message.id === nextAssistantMessage.id
? {
...message,
content: message.content || `⚠️ **错误:** ${event.message}`,
isError: true,
progress: completeRunningProgress(message.progress),
}
: message,
),
);
setIsStreaming(false);
}
},
}); });
} catch (error) { } catch (error) {
if (controller.signal.aborted) { if (controller.signal.aborted) {
@@ -528,7 +598,7 @@ export const useAgentChatSession = ({
setIsStreaming(false); setIsStreaming(false);
} }
}, },
[appendArtifact, getModel, isHydrating, isStreaming, messages, onBeforeSend, onToolCall], [applyStreamEvent, getModel, isHydrating, isStreaming, messages, onBeforeSend],
); );
const abort = useCallback(() => { const abort = useCallback(() => {
@@ -555,10 +625,8 @@ export const useAgentChatSession = ({
setBranchTransition(null); setBranchTransition(null);
hydrationNonceRef.current += 1; hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1; titleUpdateNonceRef.current += 1;
storageSessionIdRef.current = undefined;
sessionIdRef.current = undefined; sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({ lastPersistedStateKeyRef.current = createPersistedStateKey({
storageSessionId: undefined,
title: "新对话", title: "新对话",
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
@@ -574,21 +642,20 @@ export const useAgentChatSession = ({
}, [isHydrating, isStreaming]); }, [isHydrating, isStreaming]);
const switchSession = useCallback( const switchSession = useCallback(
async (nextStorageSessionId: string) => { async (nextSessionId: string) => {
if (isHydrating || isStreaming || storageSessionIdRef.current === nextStorageSessionId) { if (isHydrating || isStreaming || sessionIdRef.current === nextSessionId) {
return; return;
} }
setIsHydrating(true); setIsHydrating(true);
try { try {
const [nextState, sessions] = await Promise.all([ const [nextState, sessions] = await Promise.all([
loadChatSessionById(nextStorageSessionId, projectId), loadChatSessionById(nextSessionId),
listChatSessions(), listChatSessions(),
]); ]);
hydrationNonceRef.current += 1; hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1; titleUpdateNonceRef.current += 1;
storageSessionIdRef.current = nextState.storageSessionId;
sessionIdRef.current = nextState.sessionId; sessionIdRef.current = nextState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(nextState); lastPersistedStateKeyRef.current = createPersistedStateKey(nextState);
setBranchTransition(null); setBranchTransition(null);
@@ -598,38 +665,40 @@ export const useAgentChatSession = ({
setSessionId(nextState.sessionId); setSessionId(nextState.sessionId);
setBranchGroups(nextState.branchGroups); setBranchGroups(nextState.branchGroups);
setChatSessions(sessions); setChatSessions(sessions);
if (nextState.sessionId && nextState.isStreaming) {
resumeStreamingSession(nextState.sessionId);
} else {
setIsStreaming(false);
}
} catch (error) { } catch (error) {
console.error("[GlobalChatbox] Failed to switch chat session:", error); console.error("[GlobalChatbox] Failed to switch chat session:", error);
} finally { } finally {
setIsHydrating(false); setIsHydrating(false);
} }
}, },
[isHydrating, isStreaming, projectId], [isHydrating, isStreaming, resumeStreamingSession],
); );
const removeSession = useCallback( const removeSession = useCallback(
async (targetStorageSessionId: string) => { async (targetSessionId: string) => {
if (isHydrating || isStreaming) return; if (isHydrating || isStreaming) return;
try { try {
const nextActiveSessionId = await deleteChatSession( const nextActiveSessionId = await deleteChatSession(
targetStorageSessionId, targetSessionId,
projectId,
); );
const sessions = await listChatSessions(); const sessions = await listChatSessions();
setChatSessions(sessions); setChatSessions(sessions);
if (storageSessionIdRef.current !== targetStorageSessionId) { if (sessionIdRef.current !== targetSessionId) {
return; return;
} }
if (!nextActiveSessionId) { if (!nextActiveSessionId) {
hydrationNonceRef.current += 1; hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1; titleUpdateNonceRef.current += 1;
storageSessionIdRef.current = undefined;
sessionIdRef.current = undefined; sessionIdRef.current = undefined;
lastPersistedStateKeyRef.current = createPersistedStateKey({ lastPersistedStateKeyRef.current = createPersistedStateKey({
storageSessionId: undefined,
title: undefined, title: undefined,
isTitleManuallyEdited: false, isTitleManuallyEdited: false,
messages: [], messages: [],
@@ -647,12 +716,11 @@ export const useAgentChatSession = ({
setIsHydrating(true); setIsHydrating(true);
const [nextState, sessionsAfterDelete] = await Promise.all([ const [nextState, sessionsAfterDelete] = await Promise.all([
loadChatSessionById(nextActiveSessionId, projectId), loadChatSessionById(nextActiveSessionId),
listChatSessions(), listChatSessions(),
]); ]);
hydrationNonceRef.current += 1; hydrationNonceRef.current += 1;
titleUpdateNonceRef.current += 1; titleUpdateNonceRef.current += 1;
storageSessionIdRef.current = nextState.storageSessionId;
sessionIdRef.current = nextState.sessionId; sessionIdRef.current = nextState.sessionId;
lastPersistedStateKeyRef.current = createPersistedStateKey(nextState); lastPersistedStateKeyRef.current = createPersistedStateKey(nextState);
setBranchTransition(null); setBranchTransition(null);
@@ -668,7 +736,7 @@ export const useAgentChatSession = ({
setIsHydrating(false); setIsHydrating(false);
} }
}, },
[isHydrating, isStreaming, projectId], [isHydrating, isStreaming],
); );
const sendPrompt = useCallback( const sendPrompt = useCallback(
@@ -679,26 +747,25 @@ export const useAgentChatSession = ({
); );
const renameSession = useCallback( const renameSession = useCallback(
async (targetStorageSessionId: string, nextTitle: string) => { async (targetSessionId: string, nextTitle: string) => {
const normalizedTitle = nextTitle.trim(); const normalizedTitle = nextTitle.trim();
if (!normalizedTitle || isHydrating) return; if (!normalizedTitle || isHydrating) return;
try { try {
await updateChatSessionTitle(targetStorageSessionId, normalizedTitle, { await updateChatSessionTitle(targetSessionId, normalizedTitle, {
isTitleManuallyEdited: true, isTitleManuallyEdited: true,
}); });
const sessions = await listChatSessions(); const sessions = await listChatSessions();
setChatSessions(sessions); setChatSessions(sessions);
if (storageSessionIdRef.current === targetStorageSessionId) { if (sessionIdRef.current === targetSessionId) {
setSessionTitle(normalizedTitle); setSessionTitle(normalizedTitle);
setIsSessionTitleManuallyEdited(true); setIsSessionTitleManuallyEdited(true);
lastPersistedStateKeyRef.current = createPersistedStateKey({ lastPersistedStateKeyRef.current = createPersistedStateKey({
storageSessionId: targetStorageSessionId, sessionId: targetSessionId,
title: normalizedTitle, title: normalizedTitle,
isTitleManuallyEdited: true, isTitleManuallyEdited: true,
messages, messages,
sessionId: sessionIdRef.current,
branchGroups, branchGroups,
}); });
} }
@@ -864,7 +931,7 @@ export const useAgentChatSession = ({
return { return {
messages, messages,
chatSessions, chatSessions,
activeStorageSessionId: storageSessionIdRef.current, activeSessionId: sessionIdRef.current,
branchGroups, branchGroups,
branchTransition, branchTransition,
isHydrating, isHydrating,
+53 -3
View File
@@ -1,4 +1,9 @@
import { abortAgentChat, forkAgentChat, streamAgentChat } from "./chatStream"; import {
abortAgentChat,
forkAgentChat,
resumeAgentChatStream,
streamAgentChat,
} from "./chatStream";
import { ReadableStream } from "stream/web"; import { ReadableStream } from "stream/web";
import { TextEncoder, TextDecoder } from "util"; import { TextEncoder, TextDecoder } from "util";
@@ -76,6 +81,51 @@ describe("streamAgentChat", () => {
]); ]);
}); });
it("parses state events from a resumed stream", async () => {
apiFetch.mockResolvedValue({
ok: true,
body: makeStream([
'event: state\ndata: {"session_id":"s1","messages":[{"id":"a1","role":"assistant","content":"已输出"}],"is_streaming":true,"run_status":"running"}\n\n',
'event: token\ndata: {"session_id":"s1","content":"继续"}\n\n',
'event: done\ndata: {"session_id":"s1"}\n\n',
]),
});
const events: Array<{
type: string;
sessionId?: string;
messages?: unknown[];
isStreaming?: boolean;
runStatus?: string;
content?: string;
}> = [];
await resumeAgentChatStream({
sessionId: "s1",
onEvent: (event) => events.push(event),
});
expect(apiFetch).toHaveBeenCalledWith(
expect.stringContaining("/api/v1/agent/chat/session/s1/stream"),
expect.objectContaining({
method: "GET",
projectHeaderMode: "include",
skipAuthRedirect: true,
}),
);
expect(events).toEqual([
{
type: "state",
sessionId: "s1",
messages: [{ id: "a1", role: "assistant", content: "已输出" }],
isStreaming: true,
runStatus: "running",
},
{ type: "token", sessionId: "s1", content: "继续" },
{ type: "done", sessionId: "s1" },
]);
});
it("parses progress events", async () => { it("parses progress events", async () => {
apiFetch.mockResolvedValue({ apiFetch.mockResolvedValue({
ok: true, ok: true,
@@ -103,11 +153,11 @@ describe("streamAgentChat", () => {
}); });
}); });
it("parses legacy tool_call arguments when params is empty", async () => { it("parses tool_call arguments when params is empty", async () => {
apiFetch.mockResolvedValue({ apiFetch.mockResolvedValue({
ok: true, ok: true,
body: makeStream([ body: makeStream([
'event: tool_call\ndata: {"conversationId":"agent-1e75dd01-29e","tool":"locate_features","params":{},"arguments":"{\\"ids\\":[\\"142902\\"],\\"feature_type\\":\\"junction\\"}"}\n\n', 'event: tool_call\ndata: {"session_id":"agent-1e75dd01-29e","tool":"locate_features","params":{},"arguments":"{\\"ids\\":[\\"142902\\"],\\"feature_type\\":\\"junction\\"}"}\n\n',
'event: done\ndata: {"session_id":"agent-1e75dd01-29e"}\n\n', 'event: done\ndata: {"session_id":"agent-1e75dd01-29e"}\n\n',
]), ]),
}); });
+170 -84
View File
@@ -6,6 +6,13 @@ export type AgentModel =
| "deepseek/deepseek-v4-pro"; | "deepseek/deepseek-v4-pro";
export type StreamEvent = export type StreamEvent =
| {
type: "state";
sessionId: string;
messages: unknown[];
isStreaming: boolean;
runStatus?: string;
}
| { type: "token"; sessionId: string; content: string } | { type: "token"; sessionId: string; content: string }
| { type: "done"; sessionId: string; totalDurationMs?: number } | { type: "done"; sessionId: string; totalDurationMs?: number }
| { type: "session_title"; sessionId: string; title: string } | { type: "session_title"; sessionId: string; title: string }
@@ -44,6 +51,12 @@ type StreamOptions = {
onEvent: (event: StreamEvent) => void; onEvent: (event: StreamEvent) => void;
}; };
type ResumeStreamOptions = {
sessionId: string;
signal?: AbortSignal;
onEvent: (event: StreamEvent) => void;
};
const parseEventBlock = (block: string): { event?: string; data?: string } => { const parseEventBlock = (block: string): { event?: string; data?: string } => {
const lines = block.split("\n"); const lines = block.split("\n");
let event: string | undefined; let event: string | undefined;
@@ -87,6 +100,126 @@ const resolveToolParams = (
return isObjectRecord(params) ? params : {}; return isObjectRecord(params) ? params : {};
}; };
const emitParsedStreamEvent = (
event: string,
data: string,
onEvent: (event: StreamEvent) => void,
) => {
try {
const parsed = JSON.parse(data) as {
session_id?: string;
content?: string;
message?: string;
detail?: string;
tool?: string;
params?: Record<string, unknown>;
arguments?: unknown;
id?: string;
phase?: string;
status?: "running" | "completed" | "error";
title?: string;
messages?: unknown[];
is_streaming?: boolean;
run_status?: string;
started_at?: number;
ended_at?: number;
elapsed_ms?: number;
duration_ms?: number;
total_duration_ms?: number;
};
if (event === "state") {
onEvent({
type: "state",
sessionId: parsed.session_id ?? "",
messages: Array.isArray(parsed.messages) ? parsed.messages : [],
isStreaming: parsed.is_streaming ?? false,
runStatus: parsed.run_status,
});
} else if (event === "token") {
onEvent({
type: "token",
sessionId: parsed.session_id ?? "",
content: parsed.content ?? "",
});
} else if (event === "progress") {
onEvent({
type: "progress",
sessionId: parsed.session_id ?? "",
id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`,
phase: parsed.phase ?? "progress",
status: parsed.status ?? "running",
title: parsed.title ?? "正在处理",
detail: parsed.detail,
startedAt: parsed.started_at,
endedAt: parsed.ended_at,
elapsedMs: parsed.elapsed_ms,
durationMs: parsed.duration_ms,
});
} else if (event === "done") {
onEvent({
type: "done",
sessionId: parsed.session_id ?? "",
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "session_title") {
onEvent({
type: "session_title",
sessionId: parsed.session_id ?? "",
title: typeof parsed.title === "string" ? parsed.title : "",
});
} else if (event === "error") {
onEvent({
type: "error",
sessionId: parsed.session_id,
message: parsed.message ?? "unknown error",
detail: parsed.detail,
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "tool_call") {
onEvent({
type: "tool_call",
sessionId: parsed.session_id ?? "",
tool: parsed.tool ?? "",
params: resolveToolParams(parsed.params, parsed.arguments),
});
}
} catch {
onEvent({
type: "error",
message: "invalid SSE data payload",
detail: data,
});
}
};
const readStreamEvents = async (
response: Response,
onEvent: (event: StreamEvent) => void,
) => {
if (!response.body) {
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";
for (const block of blocks) {
const { event, data } = parseEventBlock(block);
if (!event || !data) continue;
emitParsedStreamEvent(event, data, onEvent);
}
}
};
export const streamAgentChat = async ({ export const streamAgentChat = async ({
message, message,
sessionId, sessionId,
@@ -144,99 +277,52 @@ export const streamAgentChat = async ({
return; return;
} }
const reader = response.body.getReader(); await readStreamEvents(response, onEvent);
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";
for (const block of blocks) {
const { event, data } = parseEventBlock(block);
if (!event || !data) continue;
try {
const parsed = JSON.parse(data) as {
session_id?: string;
conversationId?: string;
content?: string;
message?: string;
detail?: string;
tool?: string;
params?: Record<string, unknown>;
arguments?: unknown;
id?: string;
phase?: string;
status?: "running" | "completed" | "error";
title?: string;
started_at?: number;
ended_at?: number;
elapsed_ms?: number;
duration_ms?: number;
total_duration_ms?: number;
}; };
if (event === "token") {
onEvent({ export const resumeAgentChatStream = async ({
type: "token", sessionId,
sessionId: parsed.session_id ?? "", signal,
content: parsed.content ?? "", onEvent,
}); }: ResumeStreamOptions) => {
} else if (event === "progress") { let response: Response;
onEvent({ try {
type: "progress", response = await apiFetch(
sessionId: parsed.session_id ?? "", `${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}/stream`,
id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`, {
phase: parsed.phase ?? "progress", method: "GET",
status: parsed.status ?? "running", signal,
title: parsed.title ?? "正在处理", headers: {
detail: parsed.detail, Accept: "text/event-stream",
startedAt: parsed.started_at, },
endedAt: parsed.ended_at, projectHeaderMode: "include",
elapsedMs: parsed.elapsed_ms, userHeaderMode: "include",
durationMs: parsed.duration_ms, skipAuthRedirect: true,
}); },
} else if (event === "done") { );
onEvent({ } catch (error) {
type: "done", const detail = error instanceof Error ? error.message : String(error);
sessionId: parsed.session_id ?? "",
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "session_title") {
onEvent({
type: "session_title",
sessionId: parsed.session_id ?? "",
title: typeof parsed.title === "string" ? parsed.title : "",
});
} else if (event === "error") {
onEvent({ onEvent({
type: "error", type: "error",
sessionId: parsed.session_id, sessionId,
message: parsed.message ?? "unknown error", message: "network request failed",
detail: parsed.detail, detail,
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "tool_call") {
onEvent({
type: "tool_call",
sessionId: parsed.session_id ?? parsed.conversationId ?? "",
tool: parsed.tool ?? "",
params: resolveToolParams(parsed.params, parsed.arguments),
}); });
return;
} }
} catch {
if (!response.ok || !response.body) {
const detail = await response.text();
onEvent({ onEvent({
type: "error", type: "error",
message: "invalid SSE data payload", sessionId,
detail: data, message: "stream request failed",
detail,
}); });
return;
} }
}
} await readStreamEvents(response, onEvent);
}; };
export const abortAgentChat = async (sessionId?: string) => { export const abortAgentChat = async (sessionId?: string) => {