From 7764e253983cb43b047f93538a642a09af7849ba Mon Sep 17 00:00:00 2001 From: Huarch Date: Thu, 4 Jun 2026 16:27:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=81=E5=BC=8F=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E4=B8=AD=E6=96=AD=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/components/chat/GlobalChatbox.types.ts | 4 + src/components/chat/chatStorage.ts | 8 + .../chat/hooks/useAgentChatSession.test.tsx | 137 ++++++++- .../chat/hooks/useAgentChatSession.ts | 288 +++++++++++------- src/lib/chatStream.test.ts | 52 +++- src/lib/chatStream.ts | 267 ++++++++++------ 6 files changed, 559 insertions(+), 197 deletions(-) diff --git a/src/components/chat/GlobalChatbox.types.ts b/src/components/chat/GlobalChatbox.types.ts index 4ac085b..8a580b5 100644 --- a/src/components/chat/GlobalChatbox.types.ts +++ b/src/components/chat/GlobalChatbox.types.ts @@ -71,6 +71,8 @@ export type ChatSessionSummary = { title: string; createdAt: number; updatedAt: number; + isStreaming?: boolean; + runStatus?: string; }; export type LoadedChatState = { @@ -79,4 +81,6 @@ export type LoadedChatState = { isTitleManuallyEdited?: boolean; messages: Message[]; branchGroups: BranchGroup[]; + isStreaming?: boolean; + runStatus?: string; }; diff --git a/src/components/chat/chatStorage.ts b/src/components/chat/chatStorage.ts index 5cfed97..81d5506 100644 --- a/src/components/chat/chatStorage.ts +++ b/src/components/chat/chatStorage.ts @@ -14,6 +14,8 @@ type BackendSessionPayload = { title?: string; created_at?: string | number; updated_at?: string | number; + is_streaming?: boolean; + run_status?: string; }; export const createEmptyChatState = (): LoadedChatState => ({ @@ -76,6 +78,8 @@ const fetchBackendChatSessions = async (): Promise => { title: normalizeTitle(session.title), createdAt: toMillis(session.created_at), updatedAt: toMillis(session.updated_at), + isStreaming: session.is_streaming, + runStatus: session.run_status, })) .filter((session) => Boolean(session.id)) .sort(compareSessionsByAnchorTime); @@ -104,6 +108,8 @@ const fetchBackendChatSession = async (sessionId: string): Promise ({ abortAgentChat: jest.fn(async () => undefined), forkAgentChat: jest.fn(async () => "forked-session"), + resumeAgentChatStream: jest.fn(async () => undefined), streamAgentChat: jest.fn(async () => undefined), })); @@ -42,7 +43,12 @@ describe("useAgentChatSession", () => { listChatSessions.mockReset(); saveActiveChatState.mockReset(); updateChatSessionTitle.mockReset(); + jest.mocked(abortAgentChat).mockReset(); + jest.mocked(resumeAgentChatStream).mockReset(); jest.mocked(streamAgentChat).mockReset(); + jest.mocked(abortAgentChat).mockImplementation(async () => undefined); + jest.mocked(resumeAgentChatStream).mockImplementation(async () => undefined); + jest.mocked(streamAgentChat).mockImplementation(async () => undefined); saveActiveChatState.mockImplementation(async (state) => state.sessionId); }); @@ -103,7 +109,7 @@ describe("useAgentChatSession", () => { ]); }); - it("waits for the stream session id before persisting a new streaming conversation", async () => { + it("persists a new conversation only after the stream is done", async () => { listChatSessions.mockResolvedValue([]); let emitStreamEvent: ((event: StreamEvent) => void) | undefined; jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { @@ -147,15 +153,140 @@ describe("useAgentChatSession", () => { jest.advanceTimersByTime(200); }); - expect(saveActiveChatState).toHaveBeenCalledTimes(1); + expect(saveActiveChatState).not.toHaveBeenCalled(); + + act(() => { + emitStreamEvent?.({ + type: "done", + sessionId: "chat-stream-1", + }); + }); + + await act(async () => { + jest.advanceTimersByTime(200); + }); + + await waitFor(() => expect(saveActiveChatState).toHaveBeenCalledTimes(1)); expect(saveActiveChatState.mock.calls[0][0]).toMatchObject({ sessionId: "chat-stream-1", + messages: [ + expect.objectContaining({ role: "user", content: "第一条消息" }), + expect.objectContaining({ role: "assistant", content: "收到" }), + ], }); } finally { jest.useRealTimers(); } }); + it("hydrates a backend streaming session and resumes its stream", async () => { + listChatSessions.mockResolvedValue([ + { + id: "session-streaming", + title: "运行中", + createdAt: 1, + updatedAt: 2, + isStreaming: true, + runStatus: "running", + }, + ]); + + const { result } = renderHook(() => + useAgentChatSession({ + projectId: "project-1", + onToolCall: jest.fn(), + }), + ); + + await waitFor(() => expect(result.current.isHydrating).toBe(false)); + + expect(result.current.isStreaming).toBe(true); + expect(result.current.activeSessionId).toBe("session-loaded"); + expect(resumeAgentChatStream).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "session-loaded", + }), + ); + }); + + it("updates resumed messages from state, token, and done events", async () => { + listChatSessions.mockResolvedValue([ + { + id: "session-streaming", + title: "运行中", + createdAt: 1, + updatedAt: 2, + isStreaming: true, + }, + ]); + jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => { + onEvent({ + type: "state", + sessionId: "session-loaded", + messages: [ + { id: "u1", role: "user", content: "继续分析" }, + { id: "a1", role: "assistant", content: "已有" }, + ], + isStreaming: true, + runStatus: "running", + }); + onEvent({ + type: "token", + sessionId: "session-loaded", + content: "输出", + }); + onEvent({ + type: "done", + sessionId: "session-loaded", + }); + }); + + const { result } = renderHook(() => + useAgentChatSession({ + projectId: "project-1", + onToolCall: jest.fn(), + }), + ); + + await waitFor(() => expect(result.current.isHydrating).toBe(false)); + await waitFor(() => expect(result.current.isStreaming).toBe(false)); + + expect(result.current.messages).toEqual([ + expect.objectContaining({ id: "u1", role: "user", content: "继续分析" }), + expect.objectContaining({ id: "a1", role: "assistant", content: "已有输出" }), + ]); + }); + + it("aborts a resumed streaming session through the backend abort endpoint", async () => { + listChatSessions.mockResolvedValue([ + { + id: "session-streaming", + title: "运行中", + createdAt: 1, + updatedAt: 2, + isStreaming: true, + }, + ]); + jest.mocked(resumeAgentChatStream).mockImplementationOnce(async () => { + await new Promise(() => undefined); + }); + + const { result } = renderHook(() => + useAgentChatSession({ + projectId: "project-1", + onToolCall: jest.fn(), + }), + ); + + await waitFor(() => expect(result.current.isStreaming).toBe(true)); + + act(() => { + result.current.abort(); + }); + + expect(abortAgentChat).toHaveBeenCalledWith("session-loaded"); + }); + it("ignores generated session titles after the title was edited manually", async () => { listChatSessions.mockResolvedValue([]); jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { diff --git a/src/components/chat/hooks/useAgentChatSession.ts b/src/components/chat/hooks/useAgentChatSession.ts index 8252bdf..2cd64c5 100644 --- a/src/components/chat/hooks/useAgentChatSession.ts +++ b/src/components/chat/hooks/useAgentChatSession.ts @@ -2,7 +2,12 @@ import { useCallback, useEffect, useRef, useState } from "react"; -import { abortAgentChat, forkAgentChat, streamAgentChat } from "@/lib/chatStream"; +import { + abortAgentChat, + forkAgentChat, + resumeAgentChatStream, + streamAgentChat, +} from "@/lib/chatStream"; import type { AgentModel, StreamEvent } from "@/lib/chatStream"; import type { AgentArtifact, @@ -164,6 +169,8 @@ export const useAgentChatSession = ({ const [isHydrating, setIsHydrating] = useState(true); const abortRef = useRef(null); const sessionIdRef = useRef(undefined); + const messagesRef = useRef([]); + const resumeStreamingSessionRef = useRef<((sessionId: string) => void) | null>(null); const isSessionTitleManuallyEditedRef = useRef(false); const cancelPromiseRef = useRef | null>(null); const titleUpdateNonceRef = useRef(0); @@ -181,6 +188,10 @@ export const useAgentChatSession = ({ sessionIdRef.current = sessionId; }, [sessionId]); + useEffect(() => { + messagesRef.current = messages; + }, [messages]); + useEffect(() => { isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited; }, [isSessionTitleManuallyEdited]); @@ -216,10 +227,11 @@ export const useAgentChatSession = ({ } try { - const [loadedState, sessions] = await Promise.all([ - Promise.resolve(createEmptyChatState()), - listChatSessions(), - ]); + const sessions = await listChatSessions(); + const streamingSession = sessions.find((session) => session.isStreaming); + const loadedState = streamingSession + ? await loadChatSessionById(streamingSession.id) + : createEmptyChatState(); if (cancelled) return; sessionIdRef.current = loadedState.sessionId; @@ -234,6 +246,12 @@ export const useAgentChatSession = ({ setSessionId(loadedState.sessionId); setBranchGroups(loadedState.branchGroups); setChatSessions(sessions); + if ( + loadedState.sessionId && + (loadedState.isStreaming || streamingSession?.isStreaming) + ) { + resumeStreamingSessionRef.current?.(loadedState.sessionId); + } } catch (error) { console.error("[GlobalChatbox] Failed to hydrate chat state:", error); } finally { @@ -255,6 +273,10 @@ export const useAgentChatSession = ({ const currentHydrationNonce = hydrationNonceRef.current; const persistTimer = window.setTimeout(() => { + if (isStreaming) { + return; + } + const state: LoadedChatState = { title: sessionTitle, isTitleManuallyEdited: isSessionTitleManuallyEdited, @@ -262,13 +284,6 @@ export const useAgentChatSession = ({ sessionId, branchGroups, }; - if ( - isStreaming && - !state.sessionId && - state.messages.length > 0 - ) { - return; - } const currentStateKey = createPersistedStateKey(state); if (currentStateKey === lastPersistedStateKeyRef.current) { @@ -351,6 +366,150 @@ export const useAgentChatSession = ({ ); }, []); + const getLastAssistantMessageId = useCallback((fallback?: string) => { + const assistant = [...messagesRef.current] + .reverse() + .find((message) => message.role === "assistant"); + return assistant?.id ?? fallback; + }, []); + + const applyStreamEvent = useCallback( + ( + event: StreamEvent, + options?: { + assistantMessageId?: string; + }, + ) => { + if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) { + sessionIdRef.current = event.sessionId; + setSessionId(event.sessionId); + } + + if (event.type === "state") { + const nextMessages = cloneMessages(event.messages as Message[]); + messagesRef.current = nextMessages; + setMessages(nextMessages); + setIsStreaming(event.isStreaming); + return; + } + + const assistantMessageId = getLastAssistantMessageId(options?.assistantMessageId); + if (!assistantMessageId) { + return; + } + + if (event.type === "token") { + setMessages((prev) => + prev.map((message) => + message.id === assistantMessageId + ? { + ...message, + content: message.content + event.content, + isError: false, + } + : message, + ), + ); + } else if (event.type === "progress") { + setMessages((prev) => + prev.map((message) => + message.id === assistantMessageId + ? { ...message, progress: upsertProgress(message.progress, event) } + : message, + ), + ); + } else if (event.type === "tool_call") { + onToolCall(event, { + assistantMessageId, + appendArtifact, + }); + } else if (event.type === "session_title") { + const nextTitle = event.title.trim(); + if (nextTitle && !isSessionTitleManuallyEditedRef.current) { + setSessionTitle(nextTitle); + const currentSessionId = sessionIdRef.current; + if (currentSessionId) { + const currentNonce = ++titleUpdateNonceRef.current; + void updateChatSessionTitle(currentSessionId, nextTitle, { + isTitleManuallyEdited: false, + }) + .then(() => listChatSessions()) + .then((sessions) => { + if (titleUpdateNonceRef.current !== currentNonce) return; + setChatSessions(sessions); + }) + .catch((error) => { + console.error("[GlobalChatbox] Failed to persist session title:", error); + }); + } + } + } else if (event.type === "done") { + setMessages((prev) => + prev.map((message) => { + if (message.id !== assistantMessageId) return message; + const completedProgress = completeRunningProgress(message.progress); + if ( + message.content.trim().length === 0 && + !(message.artifacts?.length) + ) { + return { + ...message, + content: + "Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。", + progress: completedProgress, + }; + } + return { ...message, progress: completedProgress }; + }), + ); + setIsStreaming(false); + } else if (event.type === "error") { + setMessages((prev) => + prev.map((message) => + message.id === assistantMessageId + ? { + ...message, + content: message.content || `⚠️ **错误:** ${event.message}`, + isError: true, + progress: completeRunningProgress(message.progress), + } + : message, + ), + ); + setIsStreaming(false); + } + }, + [appendArtifact, getLastAssistantMessageId, onToolCall], + ); + + const resumeStreamingSession = useCallback( + (nextSessionId: string) => { + const controller = new AbortController(); + abortRef.current?.abort(); + abortRef.current = controller; + setIsStreaming(true); + + void resumeAgentChatStream({ + sessionId: nextSessionId, + signal: controller.signal, + onEvent: (event) => applyStreamEvent(event), + }) + .catch((error) => { + if (!controller.signal.aborted) { + console.error("[GlobalChatbox] Failed to resume chat stream:", error); + setIsStreaming(false); + } + }) + .finally(() => { + if (abortRef.current === controller) { + abortRef.current = null; + } + }); + }, + [applyStreamEvent], + ); + resumeStreamingSessionRef.current = resumeStreamingSession; + const runPrompt = useCallback( async ({ prompt: rawPrompt, @@ -372,8 +531,10 @@ export const useAgentChatSession = ({ preparedMessages ?? [...messages, nextUserMessage, nextAssistantMessage]; + const clonedNextMessages = cloneMessages(nextMessages); setIsStreaming(true); - setMessages(cloneMessages(nextMessages)); + messagesRef.current = clonedNextMessages; + setMessages(clonedNextMessages); if (sessionIdOverride !== undefined) { sessionIdRef.current = sessionIdOverride; setSessionId(sessionIdOverride); @@ -388,93 +549,10 @@ export const useAgentChatSession = ({ sessionId: sessionIdOverride ?? sessionIdRef.current, model: getModel?.(), signal: controller.signal, - onEvent: (event) => { - if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) { - sessionIdRef.current = event.sessionId; - setSessionId(event.sessionId); - } - - if (event.type === "token") { - setMessages((prev) => - prev.map((message) => - message.id === nextAssistantMessage.id - ? { - ...message, - content: message.content + event.content, - isError: false, - } - : message, - ), - ); - } else if (event.type === "progress") { - setMessages((prev) => - prev.map((message) => - message.id === nextAssistantMessage.id - ? { ...message, progress: upsertProgress(message.progress, event) } - : message, - ), - ); - } else if (event.type === "tool_call") { - onToolCall(event, { - assistantMessageId: nextAssistantMessage.id, - appendArtifact, - }); - } else if (event.type === "session_title") { - const nextTitle = event.title.trim(); - if (nextTitle && !isSessionTitleManuallyEditedRef.current) { - setSessionTitle(nextTitle); - const currentSessionId = sessionIdRef.current; - if (currentSessionId) { - const currentNonce = ++titleUpdateNonceRef.current; - void updateChatSessionTitle(currentSessionId, nextTitle, { - isTitleManuallyEdited: false, - }) - .then(() => listChatSessions()) - .then((sessions) => { - if (titleUpdateNonceRef.current !== currentNonce) return; - setChatSessions(sessions); - }) - .catch((error) => { - console.error("[GlobalChatbox] Failed to persist session title:", error); - }); - } - } - } else if (event.type === "done") { - setMessages((prev) => - prev.map((message) => { - if (message.id !== nextAssistantMessage.id) return message; - const completedProgress = completeRunningProgress(message.progress); - if ( - message.content.trim().length === 0 && - !(message.artifacts?.length) - ) { - return { - ...message, - content: - "Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。", - progress: completedProgress, - }; - } - return { ...message, progress: completedProgress }; - }), - ); - setIsStreaming(false); - } else if (event.type === "error") { - setMessages((prev) => - prev.map((message) => - message.id === nextAssistantMessage.id - ? { - ...message, - content: message.content || `⚠️ **错误:** ${event.message}`, - isError: true, - progress: completeRunningProgress(message.progress), - } - : message, - ), - ); - setIsStreaming(false); - } - }, + onEvent: (event) => + applyStreamEvent(event, { + assistantMessageId: nextAssistantMessage.id, + }), }); } catch (error) { if (controller.signal.aborted) { @@ -520,7 +598,7 @@ export const useAgentChatSession = ({ setIsStreaming(false); } }, - [appendArtifact, getModel, isHydrating, isStreaming, messages, onBeforeSend, onToolCall], + [applyStreamEvent, getModel, isHydrating, isStreaming, messages, onBeforeSend], ); const abort = useCallback(() => { @@ -587,13 +665,18 @@ export const useAgentChatSession = ({ setSessionId(nextState.sessionId); setBranchGroups(nextState.branchGroups); setChatSessions(sessions); + if (nextState.sessionId && nextState.isStreaming) { + resumeStreamingSession(nextState.sessionId); + } else { + setIsStreaming(false); + } } catch (error) { console.error("[GlobalChatbox] Failed to switch chat session:", error); } finally { setIsHydrating(false); } }, - [isHydrating, isStreaming], + [isHydrating, isStreaming, resumeStreamingSession], ); const removeSession = useCallback( @@ -683,7 +766,6 @@ export const useAgentChatSession = ({ title: normalizedTitle, isTitleManuallyEdited: true, messages, - sessionId: sessionIdRef.current, branchGroups, }); } diff --git a/src/lib/chatStream.test.ts b/src/lib/chatStream.test.ts index 79e400f..64a6c62 100644 --- a/src/lib/chatStream.test.ts +++ b/src/lib/chatStream.test.ts @@ -1,4 +1,9 @@ -import { abortAgentChat, forkAgentChat, streamAgentChat } from "./chatStream"; +import { + abortAgentChat, + forkAgentChat, + resumeAgentChatStream, + streamAgentChat, +} from "./chatStream"; import { ReadableStream } from "stream/web"; import { TextEncoder, TextDecoder } from "util"; @@ -76,6 +81,51 @@ describe("streamAgentChat", () => { ]); }); + it("parses state events from a resumed stream", async () => { + apiFetch.mockResolvedValue({ + ok: true, + body: makeStream([ + 'event: state\ndata: {"session_id":"s1","messages":[{"id":"a1","role":"assistant","content":"已输出"}],"is_streaming":true,"run_status":"running"}\n\n', + 'event: token\ndata: {"session_id":"s1","content":"继续"}\n\n', + 'event: done\ndata: {"session_id":"s1"}\n\n', + ]), + }); + + const events: Array<{ + type: string; + sessionId?: string; + messages?: unknown[]; + isStreaming?: boolean; + runStatus?: string; + content?: string; + }> = []; + + await resumeAgentChatStream({ + sessionId: "s1", + onEvent: (event) => events.push(event), + }); + + expect(apiFetch).toHaveBeenCalledWith( + expect.stringContaining("/api/v1/agent/chat/session/s1/stream"), + expect.objectContaining({ + method: "GET", + projectHeaderMode: "include", + skipAuthRedirect: true, + }), + ); + expect(events).toEqual([ + { + type: "state", + sessionId: "s1", + messages: [{ id: "a1", role: "assistant", content: "已输出" }], + isStreaming: true, + runStatus: "running", + }, + { type: "token", sessionId: "s1", content: "继续" }, + { type: "done", sessionId: "s1" }, + ]); + }); + it("parses progress events", async () => { apiFetch.mockResolvedValue({ ok: true, diff --git a/src/lib/chatStream.ts b/src/lib/chatStream.ts index 4fd94d5..a6581e5 100644 --- a/src/lib/chatStream.ts +++ b/src/lib/chatStream.ts @@ -6,6 +6,13 @@ export type AgentModel = | "deepseek/deepseek-v4-pro"; export type StreamEvent = + | { + type: "state"; + sessionId: string; + messages: unknown[]; + isStreaming: boolean; + runStatus?: string; + } | { type: "token"; sessionId: string; content: string } | { type: "done"; sessionId: string; totalDurationMs?: number } | { type: "session_title"; sessionId: string; title: string } @@ -44,6 +51,12 @@ type StreamOptions = { onEvent: (event: StreamEvent) => void; }; +type ResumeStreamOptions = { + sessionId: string; + signal?: AbortSignal; + onEvent: (event: StreamEvent) => void; +}; + const parseEventBlock = (block: string): { event?: string; data?: string } => { const lines = block.split("\n"); let event: string | undefined; @@ -87,6 +100,126 @@ const resolveToolParams = ( return isObjectRecord(params) ? params : {}; }; +const emitParsedStreamEvent = ( + event: string, + data: string, + onEvent: (event: StreamEvent) => void, +) => { + try { + const parsed = JSON.parse(data) as { + session_id?: string; + content?: string; + message?: string; + detail?: string; + tool?: string; + params?: Record; + arguments?: unknown; + id?: string; + phase?: string; + status?: "running" | "completed" | "error"; + title?: string; + messages?: unknown[]; + is_streaming?: boolean; + run_status?: string; + started_at?: number; + ended_at?: number; + elapsed_ms?: number; + duration_ms?: number; + total_duration_ms?: number; + }; + if (event === "state") { + onEvent({ + type: "state", + sessionId: parsed.session_id ?? "", + messages: Array.isArray(parsed.messages) ? parsed.messages : [], + isStreaming: parsed.is_streaming ?? false, + runStatus: parsed.run_status, + }); + } else if (event === "token") { + onEvent({ + type: "token", + sessionId: parsed.session_id ?? "", + content: parsed.content ?? "", + }); + } else if (event === "progress") { + onEvent({ + type: "progress", + sessionId: parsed.session_id ?? "", + id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`, + phase: parsed.phase ?? "progress", + status: parsed.status ?? "running", + title: parsed.title ?? "正在处理", + detail: parsed.detail, + startedAt: parsed.started_at, + endedAt: parsed.ended_at, + elapsedMs: parsed.elapsed_ms, + durationMs: parsed.duration_ms, + }); + } else if (event === "done") { + onEvent({ + type: "done", + sessionId: parsed.session_id ?? "", + totalDurationMs: parsed.total_duration_ms, + }); + } else if (event === "session_title") { + onEvent({ + type: "session_title", + sessionId: parsed.session_id ?? "", + title: typeof parsed.title === "string" ? parsed.title : "", + }); + } else if (event === "error") { + onEvent({ + type: "error", + sessionId: parsed.session_id, + message: parsed.message ?? "unknown error", + detail: parsed.detail, + totalDurationMs: parsed.total_duration_ms, + }); + } else if (event === "tool_call") { + onEvent({ + type: "tool_call", + sessionId: parsed.session_id ?? "", + tool: parsed.tool ?? "", + params: resolveToolParams(parsed.params, parsed.arguments), + }); + } + } catch { + onEvent({ + type: "error", + message: "invalid SSE data payload", + detail: data, + }); + } +}; + +const readStreamEvents = async ( + response: Response, + onEvent: (event: StreamEvent) => void, +) => { + if (!response.body) { + return; + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const blocks = buffer.split("\n\n"); + buffer = blocks.pop() ?? ""; + + for (const block of blocks) { + const { event, data } = parseEventBlock(block); + if (!event || !data) continue; + emitParsedStreamEvent(event, data, onEvent); + } + } +}; + export const streamAgentChat = async ({ message, sessionId, @@ -144,98 +277,52 @@ export const streamAgentChat = async ({ return; } - const reader = response.body.getReader(); - const decoder = new TextDecoder("utf-8"); - let buffer = ""; + await readStreamEvents(response, onEvent); +}; - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const blocks = buffer.split("\n\n"); - buffer = blocks.pop() ?? ""; - - for (const block of blocks) { - const { event, data } = parseEventBlock(block); - if (!event || !data) continue; - - try { - const parsed = JSON.parse(data) as { - session_id?: string; - content?: string; - message?: string; - detail?: string; - tool?: string; - params?: Record; - arguments?: unknown; - id?: string; - phase?: string; - status?: "running" | "completed" | "error"; - title?: string; - started_at?: number; - ended_at?: number; - elapsed_ms?: number; - duration_ms?: number; - total_duration_ms?: number; - }; - if (event === "token") { - onEvent({ - type: "token", - sessionId: parsed.session_id ?? "", - content: parsed.content ?? "", - }); - } else if (event === "progress") { - onEvent({ - type: "progress", - sessionId: parsed.session_id ?? "", - id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`, - phase: parsed.phase ?? "progress", - status: parsed.status ?? "running", - title: parsed.title ?? "正在处理", - detail: parsed.detail, - startedAt: parsed.started_at, - endedAt: parsed.ended_at, - elapsedMs: parsed.elapsed_ms, - durationMs: parsed.duration_ms, - }); - } else if (event === "done") { - onEvent({ - type: "done", - sessionId: parsed.session_id ?? "", - totalDurationMs: parsed.total_duration_ms, - }); - } else if (event === "session_title") { - onEvent({ - type: "session_title", - sessionId: parsed.session_id ?? "", - title: typeof parsed.title === "string" ? parsed.title : "", - }); - } else if (event === "error") { - onEvent({ - type: "error", - sessionId: parsed.session_id, - message: parsed.message ?? "unknown error", - detail: parsed.detail, - totalDurationMs: parsed.total_duration_ms, - }); - } else if (event === "tool_call") { - onEvent({ - type: "tool_call", - sessionId: parsed.session_id ?? "", - tool: parsed.tool ?? "", - params: resolveToolParams(parsed.params, parsed.arguments), - }); - } - } catch { - onEvent({ - type: "error", - message: "invalid SSE data payload", - detail: data, - }); - } - } +export const resumeAgentChatStream = async ({ + sessionId, + signal, + onEvent, +}: ResumeStreamOptions) => { + let response: Response; + try { + response = await apiFetch( + `${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}/stream`, + { + method: "GET", + signal, + headers: { + Accept: "text/event-stream", + }, + projectHeaderMode: "include", + userHeaderMode: "include", + skipAuthRedirect: true, + }, + ); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + onEvent({ + type: "error", + sessionId, + message: "network request failed", + detail, + }); + return; } + + if (!response.ok || !response.body) { + const detail = await response.text(); + onEvent({ + type: "error", + sessionId, + message: "stream request failed", + detail, + }); + return; + } + + await readStreamEvents(response, onEvent); }; export const abortAgentChat = async (sessionId?: string) => {