import { apiFetch } from "@/lib/apiFetch"; import { config } from "@config/config"; export type StreamEvent = | { type: "token"; sessionId: string; content: string } | { type: "done"; sessionId: string } | { type: "session_title"; sessionId: string; title: string } | { type: "progress"; sessionId: string; id: string; phase: string; status: "running" | "completed" | "error"; title: string; detail?: string; } | { type: "error"; sessionId?: string; message: string; detail?: string; } | { type: "tool_call"; sessionId: string; tool: string; params: Record; }; type StreamOptions = { message: string; sessionId?: string; signal?: AbortSignal; onEvent: (event: StreamEvent) => void; }; const parseEventBlock = (block: string): { event?: string; data?: string } => { const lines = block.split("\n"); let event: string | undefined; const dataLines: string[] = []; for (const line of lines) { if (line.startsWith("event:")) { event = line.slice("event:".length).trim(); } else if (line.startsWith("data:")) { dataLines.push(line.slice("data:".length).trim()); } } return { event, data: dataLines.length ? dataLines.join("\n") : undefined, }; }; const isObjectRecord = (value: unknown): value is Record => typeof value === "object" && value !== null && !Array.isArray(value); const resolveToolParams = ( params: unknown, argumentsPayload: unknown, ): Record => { if (isObjectRecord(params) && Object.keys(params).length > 0) { return params; } if (isObjectRecord(argumentsPayload)) { return argumentsPayload; } if (typeof argumentsPayload === "string") { try { const parsed = JSON.parse(argumentsPayload) as unknown; return isObjectRecord(parsed) ? parsed : {}; } catch { return {}; } } return isObjectRecord(params) ? params : {}; }; export const streamAgentChat = async ({ message, sessionId, signal, onEvent, }: StreamOptions) => { let response: Response; try { response = await apiFetch( `${config.AGENT_URL}/api/v1/agent/chat/stream`, { method: "POST", signal, headers: { "Content-Type": "application/json", Accept: "text/event-stream", }, body: JSON.stringify({ message, session_id: sessionId, }), projectHeaderMode: "include", userHeaderMode: "include", skipAuthRedirect: true, }, ); } catch (error) { const detail = error instanceof Error ? error.message : String(error); onEvent({ type: "error", message: "network request failed", detail, }); return; } if (!response.ok || !response.body) { const detail = await response.text(); let message = "stream request failed"; if (response.status === 403) { message = "Permission denied. Please contact administrator."; } else if (response.status === 401) { message = "Login expired. Please sign in again."; } onEvent({ type: "error", message, detail: response.status === 403 || response.status === 401 ? undefined : detail, }); return; } const reader = response.body.getReader(); const decoder = new TextDecoder("utf-8"); let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const blocks = buffer.split("\n\n"); buffer = blocks.pop() ?? ""; for (const block of blocks) { const { event, data } = parseEventBlock(block); if (!event || !data) continue; try { const parsed = JSON.parse(data) as { session_id?: string; conversationId?: string; content?: string; message?: string; detail?: string; tool?: string; params?: Record; arguments?: unknown; id?: string; phase?: string; status?: "running" | "completed" | "error"; title?: string; }; if (event === "token") { onEvent({ type: "token", sessionId: parsed.session_id ?? "", content: parsed.content ?? "", }); } else if (event === "progress") { onEvent({ type: "progress", sessionId: parsed.session_id ?? "", id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`, phase: parsed.phase ?? "progress", status: parsed.status ?? "running", title: parsed.title ?? "正在处理", detail: parsed.detail, }); } else if (event === "done") { onEvent({ type: "done", sessionId: parsed.session_id ?? "", }); } else if (event === "session_title") { onEvent({ type: "session_title", sessionId: parsed.session_id ?? "", title: typeof parsed.title === "string" ? parsed.title : "", }); } else if (event === "error") { onEvent({ type: "error", sessionId: parsed.session_id, message: parsed.message ?? "unknown error", detail: parsed.detail, }); } else if (event === "tool_call") { onEvent({ type: "tool_call", sessionId: parsed.session_id ?? parsed.conversationId ?? "", tool: parsed.tool ?? "", params: resolveToolParams(parsed.params, parsed.arguments), }); } } catch { onEvent({ type: "error", message: "invalid SSE data payload", detail: data, }); } } } }; export const abortAgentChat = async (sessionId?: string) => { if (!sessionId) { return; } const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/abort`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ session_id: sessionId, }), projectHeaderMode: "include", userHeaderMode: "include", skipAuthRedirect: true, }); if (!response.ok) { const detail = await response.text(); throw new Error(detail || `abort request failed: ${response.status}`); } }; export const forkAgentChat = async (sessionId: string | undefined, keepMessageCount: number) => { const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/fork`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ session_id: sessionId, keep_message_count: keepMessageCount, }), projectHeaderMode: "include", userHeaderMode: "include", skipAuthRedirect: true, }); if (!response.ok) { const detail = await response.text(); throw new Error(detail || `fork request failed: ${response.status}`); } const payload = (await response.json()) as { session_id?: string }; if (!payload.session_id) { throw new Error("fork request returned no session_id"); } return payload.session_id; };