import { abortAgentChat, forkAgentChat, rejectAgentQuestion, replyAgentPermission, replyAgentQuestion, type StreamEvent, resumeAgentChatStream, streamAgentChat, } from "./chatStream"; import { ReadableStream } from "stream/web"; import { TextEncoder, TextDecoder } from "util"; if (!globalThis.ReadableStream) { // @ts-expect-error test polyfill globalThis.ReadableStream = ReadableStream; } if (!globalThis.TextEncoder) { // @ts-expect-error test polyfill globalThis.TextEncoder = TextEncoder; } if (!globalThis.TextDecoder) { // @ts-expect-error test polyfill globalThis.TextDecoder = TextDecoder; } jest.mock("@/lib/apiFetch", () => ({ apiFetch: jest.fn(), })); const { apiFetch } = jest.requireMock("@/lib/apiFetch") as { apiFetch: jest.Mock; }; const makeStream = (chunks: string[]) => new ReadableStream({ start(controller) { const encoder = new TextEncoder(); chunks.forEach((chunk) => controller.enqueue(encoder.encode(chunk))); controller.close(); }, }); describe("streamAgentChat", () => { beforeEach(() => { apiFetch.mockReset(); }); it("parses token and done events from chunked SSE", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: token\ndata: {"session_id":"s1","content":"he"}\n\n', 'event: token\ndata: {"session_id":"s1","content":"llo"}\n\n', 'event: done\ndata: {"session_id":"s1"}\n\n', ]), }); const events: Array<{ type: string; content?: string; sessionId?: string }> = []; await streamAgentChat({ message: "hi", model: "provider/model", onEvent: (event) => events.push(event), }); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/stream"), expect.objectContaining({ method: "POST", projectHeaderMode: "include", skipAuthRedirect: true, body: JSON.stringify({ message: "hi", session_id: undefined, model: "provider/model", approval_mode: undefined, }), }), ); expect(events).toEqual([ { type: "token", sessionId: "s1", content: "he" }, { type: "token", sessionId: "s1", content: "llo" }, { type: "done", sessionId: "s1" }, ]); }); it("parses state events from a resumed stream", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: state\ndata: {"session_id":"s1","messages":[{"id":"a1","role":"assistant","content":"已输出"}],"is_streaming":true,"run_status":"running"}\n\n', 'event: token\ndata: {"session_id":"s1","content":"继续"}\n\n', 'event: done\ndata: {"session_id":"s1"}\n\n', ]), }); const events: Array<{ type: string; sessionId?: string; messages?: unknown[]; isStreaming?: boolean; runStatus?: string; content?: string; }> = []; await resumeAgentChatStream({ sessionId: "s1", onEvent: (event) => events.push(event), }); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/session/s1/stream"), expect.objectContaining({ method: "GET", projectHeaderMode: "include", skipAuthRedirect: true, }), ); expect(events).toEqual([ { type: "state", sessionId: "s1", messages: [{ id: "a1", role: "assistant", content: "已输出" }], isStreaming: true, runStatus: "running", }, { type: "token", sessionId: "s1", content: "继续" }, { type: "done", sessionId: "s1" }, ]); }); it("parses progress events", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: progress\ndata: {"session_id":"s1","id":"p1","phase":"tool","status":"running","title":"正在调用后端数据查询","detail":"GET /api/v1/demo"}\n\n', 'event: done\ndata: {"session_id":"s1"}\n\n', ]), }); const events: Array<{ type: string; title?: string; status?: string; detail?: string }> = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events[0]).toEqual({ type: "progress", sessionId: "s1", id: "p1", phase: "tool", status: "running", title: "正在调用后端数据查询", detail: "GET /api/v1/demo", }); }); it("parses tool_call arguments when params is empty", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: tool_call\ndata: {"session_id":"agent-1e75dd01-29e","tool":"locate_features","params":{},"arguments":"{\\"ids\\":[\\"142902\\"],\\"feature_type\\":\\"junction\\"}"}\n\n', 'event: done\ndata: {"session_id":"agent-1e75dd01-29e"}\n\n', ]), }); const events: StreamEvent[] = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events[0]).toEqual({ type: "tool_call", sessionId: "agent-1e75dd01-29e", tool: "locate_features", params: { ids: ["142902"], feature_type: "junction" }, }); }); it("parses permission request and response events", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: permission_request\ndata: {"session_id":"s1","request_id":"perm-1","permission":"bash","patterns":["rm *"],"target":"rm tmp.txt","always":["rm *"],"created_at":123}\n\n', 'event: permission_response\ndata: {"session_id":"s1","request_id":"perm-1","reply":"reject"}\n\n', ]), }); const events: StreamEvent[] = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events).toEqual([ { type: "permission_request", sessionId: "s1", requestId: "perm-1", permission: "bash", patterns: ["rm *"], target: "rm tmp.txt", always: ["rm *"], tool: undefined, createdAt: 123, }, { type: "permission_response", sessionId: "s1", requestId: "perm-1", reply: "reject", }, ]); }); it("parses question request, response, and todo update events", async () => { apiFetch.mockResolvedValue({ ok: true, body: makeStream([ 'event: question_request\ndata: {"session_id":"s1","request_id":"q-1","questions":[{"header":"范围","question":"选择范围","options":[{"label":"城区","description":"中心城区"}],"multiple":false,"custom":true}],"tool":{"message_id":"m1","call_id":"c1"},"created_at":123}\n\n', 'event: question_response\ndata: {"session_id":"s1","request_id":"q-1","answers":[["城区","补充说明"]]}\n\n', 'event: todo_update\ndata: {"session_id":"s1","todos":[{"id":"t1","content":"分析水位","status":"in_progress","priority":"high","updated_at":456}],"created_at":456}\n\n', ]), }); const events: StreamEvent[] = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events).toEqual([ { type: "question_request", sessionId: "s1", requestId: "q-1", questions: [ { header: "范围", question: "选择范围", options: [{ label: "城区", description: "中心城区" }], multiple: false, custom: true, }, ], tool: { messageID: "m1", callID: "c1", }, createdAt: 123, }, { type: "question_response", sessionId: "s1", requestId: "q-1", answers: [["城区", "补充说明"]], rejected: false, }, { type: "todo_update", sessionId: "s1", messageId: undefined, todos: [ { id: "t1", content: "分析水位", status: "in_progress", priority: "high", createdAt: undefined, updatedAt: 456, }, ], createdAt: 456, }, ]); }); it("emits error when response is not ok", async () => { apiFetch.mockResolvedValue({ ok: false, body: null, text: async () => "bad request", }); const events: Array<{ type: string; message?: string; detail?: string }> = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events).toEqual([ { type: "error", message: "stream request failed", detail: "bad request" }, ]); }); it("emits re-login message on unauthorized response", async () => { apiFetch.mockResolvedValue({ ok: false, status: 401, body: null, text: async () => "unauthorized", }); const events: Array<{ type: string; message?: string; detail?: string }> = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events).toEqual([ { type: "error", message: "Login expired. Please sign in again.", detail: undefined }, ]); }); it("emits network error when fetch throws", async () => { apiFetch.mockRejectedValue(new TypeError("Failed to fetch")); const events: Array<{ type: string; message?: string; detail?: string }> = []; await streamAgentChat({ message: "hi", onEvent: (event) => events.push(event), }); expect(events).toEqual([ { type: "error", message: "network request failed", detail: "Failed to fetch" }, ]); }); it("calls abort endpoint for an active session", async () => { apiFetch.mockResolvedValue({ ok: true, status: 202, text: async () => "", }); await abortAgentChat("s1"); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/abort"), expect.objectContaining({ method: "POST", projectHeaderMode: "include", skipAuthRedirect: true, body: JSON.stringify({ session_id: "s1", }), }), ); }); it("calls permission reply endpoint", async () => { apiFetch.mockResolvedValue({ ok: true, status: 202, text: async () => "", }); await replyAgentPermission("s1", "perm-1", "once"); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/permission/perm-1/reply"), expect.objectContaining({ method: "POST", projectHeaderMode: "include", skipAuthRedirect: true, body: JSON.stringify({ session_id: "s1", reply: "once", }), }), ); }); it("calls question reply and reject endpoints", async () => { apiFetch.mockResolvedValue({ ok: true, status: 202, text: async () => "", }); await replyAgentQuestion("s1", "q-1", [["城区"]]); await rejectAgentQuestion("s1", "q-2"); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/question/q-1/reply"), expect.objectContaining({ method: "POST", projectHeaderMode: "include", skipAuthRedirect: true, body: JSON.stringify({ session_id: "s1", answers: [["城区"]], }), }), ); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/question/q-2/reject"), expect.objectContaining({ method: "POST", projectHeaderMode: "include", skipAuthRedirect: true, body: JSON.stringify({ session_id: "s1", }), }), ); }); it("calls fork endpoint and returns new session id", async () => { apiFetch.mockResolvedValue({ ok: true, status: 200, json: async () => ({ session_id: "forked-s1" }), text: async () => "", }); const sessionId = await forkAgentChat("s1", 3); expect(sessionId).toBe("forked-s1"); expect(apiFetch).toHaveBeenCalledWith( expect.stringContaining("/api/v1/agent/chat/fork"), expect.objectContaining({ method: "POST", body: JSON.stringify({ session_id: "s1", keep_message_count: 3, }), }), ); }); });