Files
TJWaterFrontend_Refine/src/lib/chatStream.ts
T
jiang 968d798a2a
Build Push and Deploy / docker-image (push) Failing after 42s
Build Push and Deploy / deploy-fallback-log (push) Successful in 0s
fix(chat): hide raw permission metadata
2026-06-08 20:12:08 +08:00

696 lines
18 KiB
TypeScript

import { apiFetch } from "@/lib/apiFetch";
import { config } from "@config/config";
export type AgentModel =
| "deepseek/deepseek-v4-flash"
| "deepseek/deepseek-v4-pro";
export type PermissionReply = "once" | "always" | "reject";
export type AgentApprovalMode = "request" | "always";
export type AgentQuestionStatus =
| "pending"
| "submitting"
| "answered"
| "rejected"
| "error";
export type AgentQuestionRequest = {
requestId: string;
sessionId: string;
questions: Array<{
header: string;
question: string;
options: Array<{
label: string;
description: string;
}>;
multiple?: boolean;
custom?: boolean;
}>;
tool?: {
messageID: string;
callID: string;
};
createdAt: number;
repliedAt?: number;
status: AgentQuestionStatus;
answers?: string[][];
error?: string;
};
export type AgentTodoItem = {
id: string;
content: string;
status: "pending" | "in_progress" | "completed" | "cancelled";
priority?: "low" | "medium" | "high";
createdAt?: number;
updatedAt?: number;
};
export type AgentTodoUpdate = {
sessionId: string;
messageId?: string;
todos: AgentTodoItem[];
createdAt: number;
};
export type StreamEvent =
| {
type: "state";
sessionId: string;
messages: unknown[];
isStreaming: boolean;
runStatus?: string;
}
| { type: "token"; sessionId: string; content: string }
| { type: "done"; sessionId: string; totalDurationMs?: number }
| { type: "session_title"; sessionId: string; title: string }
| {
type: "progress";
sessionId: string;
id: string;
phase: string;
status: "running" | "completed" | "error";
title: string;
detail?: string;
startedAt?: number;
endedAt?: number;
elapsedMs?: number;
durationMs?: number;
}
| {
type: "error";
sessionId?: string;
message: string;
detail?: string;
totalDurationMs?: number;
}
| {
type: "tool_call";
sessionId: string;
tool: string;
params: Record<string, unknown>;
}
| {
type: "permission_request";
sessionId: string;
requestId: string;
permission: string;
patterns: string[];
target?: string;
always: string[];
tool?: {
messageID: string;
callID: string;
};
createdAt: number;
}
| {
type: "permission_response";
sessionId: string;
requestId: string;
reply: PermissionReply;
}
| {
type: "question_request";
sessionId: string;
requestId: string;
questions: AgentQuestionRequest["questions"];
tool?: AgentQuestionRequest["tool"];
createdAt: number;
}
| {
type: "question_response";
sessionId: string;
requestId: string;
answers?: string[][];
rejected?: boolean;
}
| {
type: "todo_update";
sessionId: string;
messageId?: string;
todos: AgentTodoItem[];
createdAt: number;
};
type StreamOptions = {
message: string;
sessionId?: string;
model?: AgentModel;
approvalMode?: AgentApprovalMode;
signal?: AbortSignal;
onEvent: (event: StreamEvent) => void;
};
type ResumeStreamOptions = {
sessionId: string;
signal?: AbortSignal;
onEvent: (event: StreamEvent) => void;
};
const parseEventBlock = (block: string): { event?: string; data?: string } => {
const lines = block.split("\n");
let event: string | undefined;
const dataLines: string[] = [];
for (const line of lines) {
if (line.startsWith("event:")) {
event = line.slice("event:".length).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice("data:".length).trim());
}
}
return {
event,
data: dataLines.length ? dataLines.join("\n") : undefined,
};
};
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const resolveToolParams = (
params: unknown,
argumentsPayload: unknown,
): Record<string, unknown> => {
if (isObjectRecord(params) && Object.keys(params).length > 0) {
return params;
}
if (isObjectRecord(argumentsPayload)) {
return argumentsPayload;
}
if (typeof argumentsPayload === "string") {
try {
const parsed = JSON.parse(argumentsPayload) as unknown;
return isObjectRecord(parsed) ? parsed : {};
} catch {
return {};
}
}
return isObjectRecord(params) ? params : {};
};
const normalizeQuestionList = (value: unknown): AgentQuestionRequest["questions"] => {
if (!Array.isArray(value)) return [];
return value
.filter(isObjectRecord)
.map((question) => ({
header: typeof question.header === "string" ? question.header : "",
question: typeof question.question === "string" ? question.question : "",
options: Array.isArray(question.options)
? question.options.filter(isObjectRecord).map((option) => ({
label: typeof option.label === "string" ? option.label : "",
description:
typeof option.description === "string" ? option.description : "",
}))
: [],
multiple: typeof question.multiple === "boolean" ? question.multiple : undefined,
custom: typeof question.custom === "boolean" ? question.custom : undefined,
}));
};
const normalizeAnswers = (value: unknown): string[][] | undefined => {
if (!Array.isArray(value)) return undefined;
return value.map((answer) =>
Array.isArray(answer)
? answer.filter((item): item is string => typeof item === "string")
: [],
);
};
const normalizeQuestionTool = (value: unknown): AgentQuestionRequest["tool"] => {
if (!isObjectRecord(value)) return undefined;
const messageID =
typeof value.messageID === "string"
? value.messageID
: typeof value.message_id === "string"
? value.message_id
: undefined;
const callID =
typeof value.callID === "string"
? value.callID
: typeof value.call_id === "string"
? value.call_id
: undefined;
return messageID && callID ? { messageID, callID } : undefined;
};
const normalizeTodoStatus = (value: unknown): AgentTodoItem["status"] => {
if (value === "in_progress" || value === "completed" || value === "cancelled") {
return value;
}
return "pending";
};
const normalizeTodoPriority = (value: unknown): AgentTodoItem["priority"] => {
if (value === "low" || value === "medium" || value === "high") {
return value;
}
return undefined;
};
const normalizeTodos = (value: unknown): AgentTodoItem[] => {
if (!Array.isArray(value)) return [];
return value.filter(isObjectRecord).map((todo, index) => ({
id:
typeof todo.id === "string" && todo.id.trim()
? todo.id
: `todo-${index}`,
content: typeof todo.content === "string" ? todo.content : "",
status: normalizeTodoStatus(todo.status),
priority: normalizeTodoPriority(todo.priority),
createdAt: typeof todo.created_at === "number" ? todo.created_at : undefined,
updatedAt: typeof todo.updated_at === "number" ? todo.updated_at : undefined,
}));
};
const emitParsedStreamEvent = (
event: string,
data: string,
onEvent: (event: StreamEvent) => void,
) => {
try {
const parsed = JSON.parse(data) as {
session_id?: string;
content?: string;
message?: string;
detail?: string;
tool?: unknown;
params?: Record<string, unknown>;
arguments?: unknown;
id?: string;
phase?: string;
status?: "running" | "completed" | "error";
title?: string;
messages?: unknown[];
is_streaming?: boolean;
run_status?: string;
started_at?: number;
ended_at?: number;
elapsed_ms?: number;
duration_ms?: number;
total_duration_ms?: number;
request_id?: string;
permission?: string;
patterns?: unknown;
target?: string;
always?: unknown;
created_at?: number;
reply?: PermissionReply;
questions?: unknown;
answers?: unknown;
rejected?: boolean;
message_id?: string;
todos?: unknown;
};
if (event === "state") {
onEvent({
type: "state",
sessionId: parsed.session_id ?? "",
messages: Array.isArray(parsed.messages) ? parsed.messages : [],
isStreaming: parsed.is_streaming ?? false,
runStatus: parsed.run_status,
});
} else if (event === "token") {
onEvent({
type: "token",
sessionId: parsed.session_id ?? "",
content: parsed.content ?? "",
});
} else if (event === "progress") {
onEvent({
type: "progress",
sessionId: parsed.session_id ?? "",
id: parsed.id ?? `${parsed.phase ?? "progress"}-${Date.now()}`,
phase: parsed.phase ?? "progress",
status: parsed.status ?? "running",
title: parsed.title ?? "正在处理",
detail: parsed.detail,
startedAt: parsed.started_at,
endedAt: parsed.ended_at,
elapsedMs: parsed.elapsed_ms,
durationMs: parsed.duration_ms,
});
} else if (event === "done") {
onEvent({
type: "done",
sessionId: parsed.session_id ?? "",
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "session_title") {
onEvent({
type: "session_title",
sessionId: parsed.session_id ?? "",
title: typeof parsed.title === "string" ? parsed.title : "",
});
} else if (event === "error") {
onEvent({
type: "error",
sessionId: parsed.session_id,
message: parsed.message ?? "unknown error",
detail: parsed.detail,
totalDurationMs: parsed.total_duration_ms,
});
} else if (event === "tool_call") {
onEvent({
type: "tool_call",
sessionId: parsed.session_id ?? "",
tool: typeof parsed.tool === "string" ? parsed.tool : "",
params: resolveToolParams(parsed.params, parsed.arguments),
});
} else if (event === "permission_request") {
onEvent({
type: "permission_request",
sessionId: parsed.session_id ?? "",
requestId: parsed.request_id ?? "",
permission: parsed.permission ?? "",
patterns: Array.isArray(parsed.patterns)
? parsed.patterns.filter((item): item is string => typeof item === "string")
: [],
target: typeof parsed.target === "string" ? parsed.target : undefined,
always: Array.isArray(parsed.always)
? parsed.always.filter((item): item is string => typeof item === "string")
: [],
tool: isObjectRecord(parsed.tool) &&
typeof parsed.tool.messageID === "string" &&
typeof parsed.tool.callID === "string"
? {
messageID: parsed.tool.messageID,
callID: parsed.tool.callID,
}
: undefined,
createdAt: parsed.created_at ?? Date.now(),
});
} else if (event === "permission_response") {
onEvent({
type: "permission_response",
sessionId: parsed.session_id ?? "",
requestId: parsed.request_id ?? "",
reply: parsed.reply ?? "reject",
});
} else if (event === "question_request") {
onEvent({
type: "question_request",
sessionId: parsed.session_id ?? "",
requestId: parsed.request_id ?? "",
questions: normalizeQuestionList(parsed.questions),
tool: normalizeQuestionTool(parsed.tool),
createdAt: parsed.created_at ?? Date.now(),
});
} else if (event === "question_response") {
onEvent({
type: "question_response",
sessionId: parsed.session_id ?? "",
requestId: parsed.request_id ?? "",
answers: normalizeAnswers(parsed.answers),
rejected: parsed.rejected === true,
});
} else if (event === "todo_update") {
onEvent({
type: "todo_update",
sessionId: parsed.session_id ?? "",
messageId: parsed.message_id,
todos: normalizeTodos(parsed.todos),
createdAt: parsed.created_at ?? Date.now(),
});
}
} catch {
onEvent({
type: "error",
message: "invalid SSE data payload",
detail: data,
});
}
};
const readStreamEvents = async (
response: Response,
onEvent: (event: StreamEvent) => void,
) => {
if (!response.body) {
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const blocks = buffer.split("\n\n");
buffer = blocks.pop() ?? "";
for (const block of blocks) {
const { event, data } = parseEventBlock(block);
if (!event || !data) continue;
emitParsedStreamEvent(event, data, onEvent);
}
}
};
export const streamAgentChat = async ({
message,
sessionId,
model,
approvalMode,
signal,
onEvent,
}: StreamOptions) => {
let response: Response;
try {
response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/stream`,
{
method: "POST",
signal,
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify({
message,
session_id: sessionId,
model,
approval_mode: approvalMode,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
},
);
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
onEvent({
type: "error",
message: "network request failed",
detail,
});
return;
}
if (!response.ok || !response.body) {
const detail = await response.text();
let message = "stream request failed";
if (response.status === 403) {
message = "Permission denied. Please contact administrator.";
} else if (response.status === 401) {
message = "Login expired. Please sign in again.";
}
onEvent({
type: "error",
message,
detail:
response.status === 403 || response.status === 401 ? undefined : detail,
});
return;
}
await readStreamEvents(response, onEvent);
};
export const resumeAgentChatStream = async ({
sessionId,
signal,
onEvent,
}: ResumeStreamOptions) => {
let response: Response;
try {
response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/session/${encodeURIComponent(sessionId)}/stream`,
{
method: "GET",
signal,
headers: {
Accept: "text/event-stream",
},
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
},
);
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
onEvent({
type: "error",
sessionId,
message: "network request failed",
detail,
});
return;
}
if (!response.ok || !response.body) {
const detail = await response.text();
onEvent({
type: "error",
sessionId,
message: "stream request failed",
detail,
});
return;
}
await readStreamEvents(response, onEvent);
};
export const abortAgentChat = async (sessionId?: string) => {
if (!sessionId) {
return;
}
const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/abort`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
session_id: sessionId,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
});
if (!response.ok) {
const detail = await response.text();
throw new Error(detail || `abort request failed: ${response.status}`);
}
};
export const replyAgentPermission = async (
sessionId: string,
requestId: string,
reply: PermissionReply,
) => {
const response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/permission/${encodeURIComponent(requestId)}/reply`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
session_id: sessionId,
reply,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
},
);
if (!response.ok) {
const detail = await response.text();
throw new Error(detail || `permission reply failed: ${response.status}`);
}
};
export const replyAgentQuestion = async (
sessionId: string,
requestId: string,
answers: string[][],
) => {
const response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/question/${encodeURIComponent(requestId)}/reply`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
session_id: sessionId,
answers,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
},
);
if (!response.ok) {
const detail = await response.text();
throw new Error(detail || `question reply failed: ${response.status}`);
}
};
export const rejectAgentQuestion = async (
sessionId: string,
requestId: string,
) => {
const response = await apiFetch(
`${config.AGENT_URL}/api/v1/agent/chat/question/${encodeURIComponent(requestId)}/reject`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
session_id: sessionId,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
},
);
if (!response.ok) {
const detail = await response.text();
throw new Error(detail || `question reject failed: ${response.status}`);
}
};
export const forkAgentChat = async (sessionId: string | undefined, keepMessageCount: number) => {
const response = await apiFetch(`${config.AGENT_URL}/api/v1/agent/chat/fork`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
session_id: sessionId,
keep_message_count: keepMessageCount,
}),
projectHeaderMode: "include",
userHeaderMode: "include",
skipAuthRedirect: true,
});
if (!response.ok) {
const detail = await response.text();
throw new Error(detail || `fork request failed: ${response.status}`);
}
const payload = (await response.json()) as { session_id?: string };
if (!payload.session_id) {
throw new Error("fork request returned no session_id");
}
return payload.session_id;
};