增加流式信息中断处理机制

This commit is contained in:
2026-06-04 16:27:15 +08:00
parent e60e1f6453
commit 7764e25398
6 changed files with 559 additions and 197 deletions
@@ -71,6 +71,8 @@ export type ChatSessionSummary = {
title: string; title: string;
createdAt: number; createdAt: number;
updatedAt: number; updatedAt: number;
isStreaming?: boolean;
runStatus?: string;
}; };
export type LoadedChatState = { export type LoadedChatState = {
@@ -79,4 +81,6 @@ export type LoadedChatState = {
isTitleManuallyEdited?: boolean; isTitleManuallyEdited?: boolean;
messages: Message[]; messages: Message[];
branchGroups: BranchGroup[]; branchGroups: BranchGroup[];
isStreaming?: boolean;
runStatus?: string;
}; };
+8
View File
@@ -14,6 +14,8 @@ type BackendSessionPayload = {
title?: string; title?: string;
created_at?: string | number; created_at?: string | number;
updated_at?: string | number; updated_at?: string | number;
is_streaming?: boolean;
run_status?: string;
}; };
export const createEmptyChatState = (): LoadedChatState => ({ export const createEmptyChatState = (): LoadedChatState => ({
@@ -76,6 +78,8 @@ const fetchBackendChatSessions = async (): Promise<ChatSessionSummary[]> => {
title: normalizeTitle(session.title), title: normalizeTitle(session.title),
createdAt: toMillis(session.created_at), createdAt: toMillis(session.created_at),
updatedAt: toMillis(session.updated_at), updatedAt: toMillis(session.updated_at),
isStreaming: session.is_streaming,
runStatus: session.run_status,
})) }))
.filter((session) => Boolean(session.id)) .filter((session) => Boolean(session.id))
.sort(compareSessionsByAnchorTime); .sort(compareSessionsByAnchorTime);
@@ -104,6 +108,8 @@ const fetchBackendChatSession = async (sessionId: string): Promise<LoadedChatSta
session_id?: string; session_id?: string;
messages?: Message[]; messages?: Message[];
branch_groups?: BranchGroup[]; branch_groups?: BranchGroup[];
is_streaming?: boolean;
run_status?: string;
}; };
return { return {
title: normalizeTitle(payload.title), title: normalizeTitle(payload.title),
@@ -111,6 +117,8 @@ const fetchBackendChatSession = async (sessionId: string): Promise<LoadedChatSta
messages: sanitizeMessages(payload.messages), messages: sanitizeMessages(payload.messages),
sessionId: payload.session_id ?? payload.id, sessionId: payload.session_id ?? payload.id,
branchGroups: sanitizeBranchGroups(payload.branch_groups), branchGroups: sanitizeBranchGroups(payload.branch_groups),
isStreaming: payload.is_streaming ?? false,
runStatus: payload.run_status,
}; };
}; };
@@ -3,12 +3,13 @@
import { act, renderHook, waitFor } from "@testing-library/react"; import { act, renderHook, waitFor } from "@testing-library/react";
import { useAgentChatSession } from "./useAgentChatSession"; import { useAgentChatSession } from "./useAgentChatSession";
import { streamAgentChat } from "@/lib/chatStream"; import { abortAgentChat, resumeAgentChatStream, streamAgentChat } from "@/lib/chatStream";
import type { StreamEvent } from "@/lib/chatStream"; import type { StreamEvent } from "@/lib/chatStream";
jest.mock("@/lib/chatStream", () => ({ jest.mock("@/lib/chatStream", () => ({
abortAgentChat: jest.fn(async () => undefined), abortAgentChat: jest.fn(async () => undefined),
forkAgentChat: jest.fn(async () => "forked-session"), forkAgentChat: jest.fn(async () => "forked-session"),
resumeAgentChatStream: jest.fn(async () => undefined),
streamAgentChat: jest.fn(async () => undefined), streamAgentChat: jest.fn(async () => undefined),
})); }));
@@ -42,7 +43,12 @@ describe("useAgentChatSession", () => {
listChatSessions.mockReset(); listChatSessions.mockReset();
saveActiveChatState.mockReset(); saveActiveChatState.mockReset();
updateChatSessionTitle.mockReset(); updateChatSessionTitle.mockReset();
jest.mocked(abortAgentChat).mockReset();
jest.mocked(resumeAgentChatStream).mockReset();
jest.mocked(streamAgentChat).mockReset(); jest.mocked(streamAgentChat).mockReset();
jest.mocked(abortAgentChat).mockImplementation(async () => undefined);
jest.mocked(resumeAgentChatStream).mockImplementation(async () => undefined);
jest.mocked(streamAgentChat).mockImplementation(async () => undefined);
saveActiveChatState.mockImplementation(async (state) => state.sessionId); saveActiveChatState.mockImplementation(async (state) => state.sessionId);
}); });
@@ -103,7 +109,7 @@ describe("useAgentChatSession", () => {
]); ]);
}); });
it("waits for the stream session id before persisting a new streaming conversation", async () => { it("persists a new conversation only after the stream is done", async () => {
listChatSessions.mockResolvedValue([]); listChatSessions.mockResolvedValue([]);
let emitStreamEvent: ((event: StreamEvent) => void) | undefined; let emitStreamEvent: ((event: StreamEvent) => void) | undefined;
jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => {
@@ -147,15 +153,140 @@ describe("useAgentChatSession", () => {
jest.advanceTimersByTime(200); jest.advanceTimersByTime(200);
}); });
expect(saveActiveChatState).toHaveBeenCalledTimes(1); expect(saveActiveChatState).not.toHaveBeenCalled();
act(() => {
emitStreamEvent?.({
type: "done",
sessionId: "chat-stream-1",
});
});
await act(async () => {
jest.advanceTimersByTime(200);
});
await waitFor(() => expect(saveActiveChatState).toHaveBeenCalledTimes(1));
expect(saveActiveChatState.mock.calls[0][0]).toMatchObject({ expect(saveActiveChatState.mock.calls[0][0]).toMatchObject({
sessionId: "chat-stream-1", sessionId: "chat-stream-1",
messages: [
expect.objectContaining({ role: "user", content: "第一条消息" }),
expect.objectContaining({ role: "assistant", content: "收到" }),
],
}); });
} finally { } finally {
jest.useRealTimers(); jest.useRealTimers();
} }
}); });
it("hydrates a backend streaming session and resumes its stream", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
runStatus: "running",
},
]);
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
expect(result.current.isStreaming).toBe(true);
expect(result.current.activeSessionId).toBe("session-loaded");
expect(resumeAgentChatStream).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "session-loaded",
}),
);
});
it("updates resumed messages from state, token, and done events", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async ({ onEvent }) => {
onEvent({
type: "state",
sessionId: "session-loaded",
messages: [
{ id: "u1", role: "user", content: "继续分析" },
{ id: "a1", role: "assistant", content: "已有" },
],
isStreaming: true,
runStatus: "running",
});
onEvent({
type: "token",
sessionId: "session-loaded",
content: "输出",
});
onEvent({
type: "done",
sessionId: "session-loaded",
});
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isHydrating).toBe(false));
await waitFor(() => expect(result.current.isStreaming).toBe(false));
expect(result.current.messages).toEqual([
expect.objectContaining({ id: "u1", role: "user", content: "继续分析" }),
expect.objectContaining({ id: "a1", role: "assistant", content: "已有输出" }),
]);
});
it("aborts a resumed streaming session through the backend abort endpoint", async () => {
listChatSessions.mockResolvedValue([
{
id: "session-streaming",
title: "运行中",
createdAt: 1,
updatedAt: 2,
isStreaming: true,
},
]);
jest.mocked(resumeAgentChatStream).mockImplementationOnce(async () => {
await new Promise<void>(() => undefined);
});
const { result } = renderHook(() =>
useAgentChatSession({
projectId: "project-1",
onToolCall: jest.fn(),
}),
);
await waitFor(() => expect(result.current.isStreaming).toBe(true));
act(() => {
result.current.abort();
});
expect(abortAgentChat).toHaveBeenCalledWith("session-loaded");
});
it("ignores generated session titles after the title was edited manually", async () => { it("ignores generated session titles after the title was edited manually", async () => {
listChatSessions.mockResolvedValue([]); listChatSessions.mockResolvedValue([]);
jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => { jest.mocked(streamAgentChat).mockImplementationOnce(async ({ onEvent }) => {
+139 -57
View File
@@ -2,7 +2,12 @@
import { useCallback, useEffect, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { abortAgentChat, forkAgentChat, streamAgentChat } from "@/lib/chatStream"; import {
abortAgentChat,
forkAgentChat,
resumeAgentChatStream,
streamAgentChat,
} from "@/lib/chatStream";
import type { AgentModel, StreamEvent } from "@/lib/chatStream"; import type { AgentModel, StreamEvent } from "@/lib/chatStream";
import type { import type {
AgentArtifact, AgentArtifact,
@@ -164,6 +169,8 @@ export const useAgentChatSession = ({
const [isHydrating, setIsHydrating] = useState(true); const [isHydrating, setIsHydrating] = useState(true);
const abortRef = useRef<AbortController | null>(null); const abortRef = useRef<AbortController | null>(null);
const sessionIdRef = useRef<string | undefined>(undefined); const sessionIdRef = useRef<string | undefined>(undefined);
const messagesRef = useRef<Message[]>([]);
const resumeStreamingSessionRef = useRef<((sessionId: string) => void) | null>(null);
const isSessionTitleManuallyEditedRef = useRef(false); const isSessionTitleManuallyEditedRef = useRef(false);
const cancelPromiseRef = useRef<Promise<void> | null>(null); const cancelPromiseRef = useRef<Promise<void> | null>(null);
const titleUpdateNonceRef = useRef(0); const titleUpdateNonceRef = useRef(0);
@@ -181,6 +188,10 @@ export const useAgentChatSession = ({
sessionIdRef.current = sessionId; sessionIdRef.current = sessionId;
}, [sessionId]); }, [sessionId]);
useEffect(() => {
messagesRef.current = messages;
}, [messages]);
useEffect(() => { useEffect(() => {
isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited; isSessionTitleManuallyEditedRef.current = isSessionTitleManuallyEdited;
}, [isSessionTitleManuallyEdited]); }, [isSessionTitleManuallyEdited]);
@@ -216,10 +227,11 @@ export const useAgentChatSession = ({
} }
try { try {
const [loadedState, sessions] = await Promise.all([ const sessions = await listChatSessions();
Promise.resolve(createEmptyChatState()), const streamingSession = sessions.find((session) => session.isStreaming);
listChatSessions(), const loadedState = streamingSession
]); ? await loadChatSessionById(streamingSession.id)
: createEmptyChatState();
if (cancelled) return; if (cancelled) return;
sessionIdRef.current = loadedState.sessionId; sessionIdRef.current = loadedState.sessionId;
@@ -234,6 +246,12 @@ export const useAgentChatSession = ({
setSessionId(loadedState.sessionId); setSessionId(loadedState.sessionId);
setBranchGroups(loadedState.branchGroups); setBranchGroups(loadedState.branchGroups);
setChatSessions(sessions); setChatSessions(sessions);
if (
loadedState.sessionId &&
(loadedState.isStreaming || streamingSession?.isStreaming)
) {
resumeStreamingSessionRef.current?.(loadedState.sessionId);
}
} catch (error) { } catch (error) {
console.error("[GlobalChatbox] Failed to hydrate chat state:", error); console.error("[GlobalChatbox] Failed to hydrate chat state:", error);
} finally { } finally {
@@ -255,6 +273,10 @@ export const useAgentChatSession = ({
const currentHydrationNonce = hydrationNonceRef.current; const currentHydrationNonce = hydrationNonceRef.current;
const persistTimer = window.setTimeout(() => { const persistTimer = window.setTimeout(() => {
if (isStreaming) {
return;
}
const state: LoadedChatState = { const state: LoadedChatState = {
title: sessionTitle, title: sessionTitle,
isTitleManuallyEdited: isSessionTitleManuallyEdited, isTitleManuallyEdited: isSessionTitleManuallyEdited,
@@ -262,13 +284,6 @@ export const useAgentChatSession = ({
sessionId, sessionId,
branchGroups, branchGroups,
}; };
if (
isStreaming &&
!state.sessionId &&
state.messages.length > 0
) {
return;
}
const currentStateKey = createPersistedStateKey(state); const currentStateKey = createPersistedStateKey(state);
if (currentStateKey === lastPersistedStateKeyRef.current) { if (currentStateKey === lastPersistedStateKeyRef.current) {
@@ -351,53 +366,42 @@ export const useAgentChatSession = ({
); );
}, []); }, []);
const runPrompt = useCallback( const getLastAssistantMessageId = useCallback((fallback?: string) => {
async ({ const assistant = [...messagesRef.current]
prompt: rawPrompt, .reverse()
sessionIdOverride, .find((message) => message.role === "assistant");
preparedMessages, return assistant?.id ?? fallback;
userMessage, }, []);
assistantMessage,
}: PromptRunOptions) => {
const prompt = rawPrompt.trim();
if (!prompt || isStreaming || isHydrating) return;
await cancelPromiseRef.current?.catch(() => undefined); const applyStreamEvent = useCallback(
onBeforeSend?.(); (
setBranchTransition(null); event: StreamEvent,
options?: {
const nextUserMessage = userMessage ?? createUserMessage(prompt); assistantMessageId?: string;
const nextAssistantMessage = assistantMessage ?? createAssistantMessage(); },
const nextMessages = ) => {
preparedMessages ??
[...messages, nextUserMessage, nextAssistantMessage];
setIsStreaming(true);
setMessages(cloneMessages(nextMessages));
if (sessionIdOverride !== undefined) {
sessionIdRef.current = sessionIdOverride;
setSessionId(sessionIdOverride);
}
const controller = new AbortController();
abortRef.current = controller;
try {
await streamAgentChat({
message: prompt,
sessionId: sessionIdOverride ?? sessionIdRef.current,
model: getModel?.(),
signal: controller.signal,
onEvent: (event) => {
if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) { if ("sessionId" in event && event.sessionId && event.sessionId !== sessionIdRef.current) {
sessionIdRef.current = event.sessionId; sessionIdRef.current = event.sessionId;
setSessionId(event.sessionId); setSessionId(event.sessionId);
} }
if (event.type === "state") {
const nextMessages = cloneMessages(event.messages as Message[]);
messagesRef.current = nextMessages;
setMessages(nextMessages);
setIsStreaming(event.isStreaming);
return;
}
const assistantMessageId = getLastAssistantMessageId(options?.assistantMessageId);
if (!assistantMessageId) {
return;
}
if (event.type === "token") { if (event.type === "token") {
setMessages((prev) => setMessages((prev) =>
prev.map((message) => prev.map((message) =>
message.id === nextAssistantMessage.id message.id === assistantMessageId
? { ? {
...message, ...message,
content: message.content + event.content, content: message.content + event.content,
@@ -409,14 +413,14 @@ export const useAgentChatSession = ({
} else if (event.type === "progress") { } else if (event.type === "progress") {
setMessages((prev) => setMessages((prev) =>
prev.map((message) => prev.map((message) =>
message.id === nextAssistantMessage.id message.id === assistantMessageId
? { ...message, progress: upsertProgress(message.progress, event) } ? { ...message, progress: upsertProgress(message.progress, event) }
: message, : message,
), ),
); );
} else if (event.type === "tool_call") { } else if (event.type === "tool_call") {
onToolCall(event, { onToolCall(event, {
assistantMessageId: nextAssistantMessage.id, assistantMessageId,
appendArtifact, appendArtifact,
}); });
} else if (event.type === "session_title") { } else if (event.type === "session_title") {
@@ -442,7 +446,7 @@ export const useAgentChatSession = ({
} else if (event.type === "done") { } else if (event.type === "done") {
setMessages((prev) => setMessages((prev) =>
prev.map((message) => { prev.map((message) => {
if (message.id !== nextAssistantMessage.id) return message; if (message.id !== assistantMessageId) return message;
const completedProgress = completeRunningProgress(message.progress); const completedProgress = completeRunningProgress(message.progress);
if ( if (
message.content.trim().length === 0 && message.content.trim().length === 0 &&
@@ -462,7 +466,7 @@ export const useAgentChatSession = ({
} else if (event.type === "error") { } else if (event.type === "error") {
setMessages((prev) => setMessages((prev) =>
prev.map((message) => prev.map((message) =>
message.id === nextAssistantMessage.id message.id === assistantMessageId
? { ? {
...message, ...message,
content: message.content || `⚠️ **错误:** ${event.message}`, content: message.content || `⚠️ **错误:** ${event.message}`,
@@ -475,6 +479,80 @@ export const useAgentChatSession = ({
setIsStreaming(false); setIsStreaming(false);
} }
}, },
[appendArtifact, getLastAssistantMessageId, onToolCall],
);
const resumeStreamingSession = useCallback(
(nextSessionId: string) => {
const controller = new AbortController();
abortRef.current?.abort();
abortRef.current = controller;
setIsStreaming(true);
void resumeAgentChatStream({
sessionId: nextSessionId,
signal: controller.signal,
onEvent: (event) => applyStreamEvent(event),
})
.catch((error) => {
if (!controller.signal.aborted) {
console.error("[GlobalChatbox] Failed to resume chat stream:", error);
setIsStreaming(false);
}
})
.finally(() => {
if (abortRef.current === controller) {
abortRef.current = null;
}
});
},
[applyStreamEvent],
);
resumeStreamingSessionRef.current = resumeStreamingSession;
const runPrompt = useCallback(
async ({
prompt: rawPrompt,
sessionIdOverride,
preparedMessages,
userMessage,
assistantMessage,
}: PromptRunOptions) => {
const prompt = rawPrompt.trim();
if (!prompt || isStreaming || isHydrating) return;
await cancelPromiseRef.current?.catch(() => undefined);
onBeforeSend?.();
setBranchTransition(null);
const nextUserMessage = userMessage ?? createUserMessage(prompt);
const nextAssistantMessage = assistantMessage ?? createAssistantMessage();
const nextMessages =
preparedMessages ??
[...messages, nextUserMessage, nextAssistantMessage];
const clonedNextMessages = cloneMessages(nextMessages);
setIsStreaming(true);
messagesRef.current = clonedNextMessages;
setMessages(clonedNextMessages);
if (sessionIdOverride !== undefined) {
sessionIdRef.current = sessionIdOverride;
setSessionId(sessionIdOverride);
}
const controller = new AbortController();
abortRef.current = controller;
try {
await streamAgentChat({
message: prompt,
sessionId: sessionIdOverride ?? sessionIdRef.current,
model: getModel?.(),
signal: controller.signal,
onEvent: (event) =>
applyStreamEvent(event, {
assistantMessageId: nextAssistantMessage.id,
}),
}); });
} catch (error) { } catch (error) {
if (controller.signal.aborted) { if (controller.signal.aborted) {
@@ -520,7 +598,7 @@ export const useAgentChatSession = ({
setIsStreaming(false); setIsStreaming(false);
} }
}, },
[appendArtifact, getModel, isHydrating, isStreaming, messages, onBeforeSend, onToolCall], [applyStreamEvent, getModel, isHydrating, isStreaming, messages, onBeforeSend],
); );
const abort = useCallback(() => { const abort = useCallback(() => {
@@ -587,13 +665,18 @@ export const useAgentChatSession = ({
setSessionId(nextState.sessionId); setSessionId(nextState.sessionId);
setBranchGroups(nextState.branchGroups); setBranchGroups(nextState.branchGroups);
setChatSessions(sessions); setChatSessions(sessions);
if (nextState.sessionId && nextState.isStreaming) {
resumeStreamingSession(nextState.sessionId);
} else {
setIsStreaming(false);
}
} catch (error) { } catch (error) {
console.error("[GlobalChatbox] Failed to switch chat session:", error); console.error("[GlobalChatbox] Failed to switch chat session:", error);
} finally { } finally {
setIsHydrating(false); setIsHydrating(false);
} }
}, },
[isHydrating, isStreaming], [isHydrating, isStreaming, resumeStreamingSession],
); );
const removeSession = useCallback( const removeSession = useCallback(
@@ -683,7 +766,6 @@ export const useAgentChatSession = ({
title: normalizedTitle, title: normalizedTitle,
isTitleManuallyEdited: true, isTitleManuallyEdited: true,
messages, messages,
sessionId: sessionIdRef.current,
branchGroups, branchGroups,
}); });
} }
+51 -1
View File
@@ -1,4 +1,9 @@
import { abortAgentChat, forkAgentChat, streamAgentChat } from "./chatStream"; import {
abortAgentChat,
forkAgentChat,
resumeAgentChatStream,
streamAgentChat,
} from "./chatStream";
import { ReadableStream } from "stream/web"; import { ReadableStream } from "stream/web";
import { TextEncoder, TextDecoder } from "util"; import { TextEncoder, TextDecoder } from "util";
@@ -76,6 +81,51 @@ describe("streamAgentChat", () => {
]); ]);
}); });
it("parses state events from a resumed stream", async () => {
apiFetch.mockResolvedValue({
ok: true,
body: makeStream([
'event: state\ndata: {"session_id":"s1","messages":[{"id":"a1","role":"assistant","content":"已输出"}],"is_streaming":true,"run_status":"running"}\n\n',
'event: token\ndata: {"session_id":"s1","content":"继续"}\n\n',
'event: done\ndata: {"session_id":"s1"}\n\n',
]),
});
const events: Array<{
type: string;
sessionId?: string;
messages?: unknown[];
isStreaming?: boolean;
runStatus?: string;
content?: string;
}> = [];
await resumeAgentChatStream({
sessionId: "s1",
onEvent: (event) => events.push(event),
});
expect(apiFetch).toHaveBeenCalledWith(
expect.stringContaining("/api/v1/agent/chat/session/s1/stream"),
expect.objectContaining({
method: "GET",
projectHeaderMode: "include",
skipAuthRedirect: true,
}),
);
expect(events).toEqual([
{
type: "state",
sessionId: "s1",
messages: [{ id: "a1", role: "assistant", content: "已输出" }],
isStreaming: true,
runStatus: "running",
},
{ type: "token", sessionId: "s1", content: "继续" },
{ type: "done", sessionId: "s1" },
]);
});
it("parses progress events", async () => { it("parses progress events", async () => {
apiFetch.mockResolvedValue({ apiFetch.mockResolvedValue({
ok: true, ok: true,
+170 -83
View File
@@ -6,6 +6,13 @@ export type AgentModel =
| "deepseek/deepseek-v4-pro"; | "deepseek/deepseek-v4-pro";
export type StreamEvent = export type StreamEvent =
| {
type: "state";
sessionId: string;
messages: unknown[];
isStreaming: boolean;
runStatus?: string;
}
| { type: "token"; sessionId: string; content: string } | { type: "token"; sessionId: string; content: string }
| { type: "done"; sessionId: string; totalDurationMs?: number } | { type: "done"; sessionId: string; totalDurationMs?: number }
| { type: "session_title"; sessionId: string; title: string } | { type: "session_title"; sessionId: string; title: string }
@@ -44,6 +51,12 @@ type StreamOptions = {
onEvent: (event: StreamEvent) => void; onEvent: (event: StreamEvent) => void;
}; };
type ResumeStreamOptions = {
sessionId: string;
signal?: AbortSignal;
onEvent: (event: StreamEvent) => void;
};
const parseEventBlock = (block: string): { event?: string; data?: string } => { const parseEventBlock = (block: string): { event?: string; data?: string } => {
const lines = block.split("\n"); const lines = block.split("\n");
let event: string | undefined; let event: string | undefined;
@@ -87,6 +100,126 @@ const resolveToolParams = (
return isObjectRecord(params) ? params : {}; return isObjectRecord(params) ? params : {};
}; };
const emitParsedStreamEvent = (
event: string,
data: string,
onEvent: (event: StreamEvent) => void,
) => {
try {
const parsed = JSON.parse(data) as {
session_id?: string;
content?: string;
message?: string;
detail?: string;
tool?: string;
params?: Record<string, unknown>;
arguments?: unknown;
id?: string;
phase?: string;
status?: "running" | "completed" | "error";
title?: string;
messages?: unknown[];
is_streaming?: boolean;
run_status?: string;
started_at?: number;
ended_at?: number;
elapsed_ms?: number;
duration_ms?: number;
total_duration_ms?: number;
};
if (event === "state") {
onEvent({
type: "state",
sessionId: parsed.session_id ?? "",
messages: Array.isArray(parsed.messages) ? parsed.messages : [],
isStreaming: parsed.is_streaming ?? false,
runStatus: parsed.run_status,
});
} else if (event === "token") {
onEvent({
type: "token",
sessionId: parsed.session_id ?? "",
content: parsed.content ?? "",
});
} else if (event === "progress") {
onEvent({
type: "progress",
sessionId: parsed.session_id ?? "",
id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`,
phase: parsed.phase ?? "progress",
status: parsed.status ?? "running",
title: parsed.title ?? "正在处理",
detail: parsed.detail,
startedAt: parsed.started_at,
endedAt: parsed.ended_at,
elapsedMs: parsed.elapsed_ms,
durationMs: parsed.duration_ms,
});
} else if (event === "done") {
onEvent({
type: "done",
sessionId: parsed.session_id ?? "",
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "session_title") {
onEvent({
type: "session_title",
sessionId: parsed.session_id ?? "",
title: typeof parsed.title === "string" ? parsed.title : "",
});
} else if (event === "error") {
onEvent({
type: "error",
sessionId: parsed.session_id,
message: parsed.message ?? "unknown error",
detail: parsed.detail,
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "tool_call") {
onEvent({
type: "tool_call",
sessionId: parsed.session_id ?? "",
tool: parsed.tool ?? "",
params: resolveToolParams(parsed.params, parsed.arguments),
});
}
} catch {
onEvent({
type: "error",
message: "invalid SSE data payload",
detail: data,
});
}
};
const readStreamEvents = async (
response: Response,
onEvent: (event: StreamEvent) => void,
) => {
if (!response.body) {
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";
for (const block of blocks) {
const { event, data } = parseEventBlock(block);
if (!event || !data) continue;
emitParsedStreamEvent(event, data, onEvent);
}
}
};
export const streamAgentChat = async ({ export const streamAgentChat = async ({
message, message,
sessionId, sessionId,
@@ -144,98 +277,52 @@ export const streamAgentChat = async ({
return; return;
} }
const reader = response.body.getReader(); await readStreamEvents(response, onEvent);
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";
for (const block of blocks) {
const { event, data } = parseEventBlock(block);
if (!event || !data) continue;
try {
const parsed = JSON.parse(data) as {
session_id?: string;
content?: string;
message?: string;
detail?: string;
tool?: string;
params?: Record<string, unknown>;
arguments?: unknown;
id?: string;
phase?: string;
status?: "running" | "completed" | "error";
title?: string;
started_at?: number;
ended_at?: number;
elapsed_ms?: number;
duration_ms?: number;
total_duration_ms?: number;
}; };
if (event === "token") {
onEvent({ export const resumeAgentChatStream = async ({
type: "token", sessionId,
sessionId: parsed.session_id ?? "", signal,
content: parsed.content ?? "", onEvent,
}); }: ResumeStreamOptions) => {
} else if (event === "progress") { let response: Response;
onEvent({ try {
type: "progress", response = await apiFetch(
sessionId: parsed.session_id ?? "", `${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}/stream`,
id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`, {
phase: parsed.phase ?? "progress", method: "GET",
status: parsed.status ?? "running", signal,
title: parsed.title ?? "正在处理", headers: {
detail: parsed.detail, Accept: "text/event-stream",
startedAt: parsed.started_at, },
endedAt: parsed.ended_at, projectHeaderMode: "include",
elapsedMs: parsed.elapsed_ms, userHeaderMode: "include",
durationMs: parsed.duration_ms, skipAuthRedirect: true,
}); },
} else if (event === "done") { );
onEvent({ } catch (error) {
type: "done", const detail = error instanceof Error ? error.message : String(error);
sessionId: parsed.session_id ?? "",
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "session_title") {
onEvent({
type: "session_title",
sessionId: parsed.session_id ?? "",
title: typeof parsed.title === "string" ? parsed.title : "",
});
} else if (event === "error") {
onEvent({ onEvent({
type: "error", type: "error",
sessionId: parsed.session_id, sessionId,
message: parsed.message ?? "unknown error", message: "network request failed",
detail: parsed.detail, detail,
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "tool_call") {
onEvent({
type: "tool_call",
sessionId: parsed.session_id ?? "",
tool: parsed.tool ?? "",
params: resolveToolParams(parsed.params, parsed.arguments),
}); });
return;
} }
} catch {
if (!response.ok || !response.body) {
const detail = await response.text();
onEvent({ onEvent({
type: "error", type: "error",
message: "invalid SSE data payload", sessionId,
detail: data, message: "stream request failed",
detail,
}); });
return;
} }
}
} await readStreamEvents(response, onEvent);
}; };
export const abortAgentChat = async (sessionId?: string) => { export const abortAgentChat = async (sessionId?: string) => {