Files
TJWaterAgent/src/routes/chat.ts
T

1034 lines
34 KiB
TypeScript

import { Router } from "express";
import { z } from "zod";
import { type LearningOrchestrator } from "../learning/orchestrator.js";
import { type SessionTranscriptStore } from "../sessions/transcriptStore.js";
import { logger } from "../logger.js";
import { MemoryStore } from "../memory/store.js";
import { type SessionUiStateStore } from "../sessions/uiStateStore.js";
import { type SessionMetadataStore } from "../sessions/metadataStore.js";
import { type ResultReferenceResolver } from "../results/resolver.js";
import { RESULT_REFERENCE_KIND } from "../results/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
import { type SessionRecord } from "../sessions/metadataStore.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
import {
buildPromptWithLearningContext,
extractLatestFrontendTurn,
generateSessionTitle,
shouldGenerateSessionTitle,
} from "./chatSession.js";
import {
collectTextContent,
streamPromptResponse,
supportedModels,
type SupportedModel,
} from "./chatStream.js";
const payloadSchema = z.object({
message: z.string().min(1).max(10000),
session_id: z.string().max(128).optional(),
model: z.enum(supportedModels).optional(),
});
const abortPayloadSchema = z.object({
session_id: z.string().max(128),
});
const createSessionPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
parent_session_id: z.string().max(128).optional(),
});
const forkPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
keep_message_count: z.coerce.number().int().min(0),
});
const sessionStateSchema = z.object({
title: z.string().max(120).optional(),
is_title_manually_edited: z.boolean().optional(),
messages: z.array(z.unknown()).default([]),
branch_groups: z.array(z.unknown()).default([]),
});
type RunStatus = "running" | "completed" | "error" | "aborted";
type StreamSubscriber = {
write: (event: string, data: Record<string, unknown>) => void;
close: () => void;
};
type ActiveRun = {
clientSessionId: string;
controller: AbortController;
messages: unknown[];
status: RunStatus;
subscribers: Set<StreamSubscriber>;
};
const activeRuns = new Map<string, ActiveRun>();
const lastRunStatuses = new Map<string, RunStatus>();
const toSessionUiStateContext = (sessionRecord: SessionRecord) => ({
sessionId: sessionRecord.sessionId,
});
const getSessionRunStatus = (sessionId: string) =>
activeRuns.get(sessionId)?.status ?? lastRunStatuses.get(sessionId);
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const createFrontendMessageId = () =>
`msg-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
const createInitialStreamingMessages = (existingMessages: unknown[], userContent: string) => {
const userMessage = {
id: createFrontendMessageId(),
role: "user",
content: userContent,
};
return [
...existingMessages,
{
...userMessage,
branchRootId: userMessage.id,
},
{
id: createFrontendMessageId(),
role: "assistant",
content: "",
progress: [
{
id: "request-received",
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。",
startedAt: Date.now(),
elapsedMs: 0,
elapsedSnapshotAt: Date.now(),
},
],
},
];
};
const upsertBackendProgress = (
progress: unknown,
payload: Record<string, unknown>,
) => {
const next = Array.isArray(progress) ? [...progress] : [];
const id = typeof payload.id === "string" ? payload.id : `progress-${Date.now()}`;
const index = next.findIndex((item) => isObjectRecord(item) && item.id === id);
const nextItem = {
id,
phase: typeof payload.phase === "string" ? payload.phase : "progress",
status:
payload.status === "completed" || payload.status === "error"
? payload.status
: "running",
title: typeof payload.title === "string" ? payload.title : "正在处理",
detail: typeof payload.detail === "string" ? payload.detail : undefined,
startedAt: typeof payload.started_at === "number" ? payload.started_at : undefined,
endedAt: typeof payload.ended_at === "number" ? payload.ended_at : undefined,
elapsedMs: typeof payload.elapsed_ms === "number" ? payload.elapsed_ms : undefined,
elapsedSnapshotAt:
typeof payload.elapsed_ms === "number" ? Date.now() : undefined,
durationMs: typeof payload.duration_ms === "number" ? payload.duration_ms : undefined,
};
if (index >= 0) {
next[index] = nextItem;
} else {
next.push(nextItem);
}
return next;
};
const completeBackendProgress = (progress: unknown) =>
Array.isArray(progress)
? progress.map((item) => {
if (!isObjectRecord(item) || item.status !== "running") {
return item;
}
const endedAt = Date.now();
const startedAt = typeof item.startedAt === "number" ? item.startedAt : undefined;
return {
...item,
status: "completed",
endedAt,
elapsedMs: undefined,
elapsedSnapshotAt: undefined,
durationMs:
typeof item.durationMs === "number"
? item.durationMs
: startedAt !== undefined
? Math.max(0, endedAt - startedAt)
: item.elapsedMs,
};
})
: progress;
const updateLastAssistantMessage = (
messages: unknown[],
updater: (message: Record<string, unknown>) => Record<string, unknown>,
) => {
for (let index = messages.length - 1; index >= 0; index -= 1) {
const message = messages[index];
if (isObjectRecord(message) && message.role === "assistant") {
const next = [...messages];
next[index] = updater(message);
return next;
}
}
return messages;
};
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
sessionMetadataStore: SessionMetadataStore,
sessionUiStateStore: SessionUiStateStore,
memoryStore: MemoryStore,
sessionTranscriptStore: SessionTranscriptStore,
learningOrchestrator: LearningOrchestrator,
resultReferenceResolver: ResultReferenceResolver,
) => {
const chatRouter = Router();
chatRouter.post("/session", async (req, res) => {
const parsed = createSessionPayloadSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const requestedSessionId = parsed.data.session_id?.trim();
const sessionId = requestedSessionId || (await runtime.createSession()).id;
const { record, created } = await sessionMetadataStore.ensure({
actorKey,
parentSessionId: parsed.data.parent_session_id,
projectId,
projectKey,
sessionId,
userId,
});
res.status(created ? 201 : 200).json({
session_id: record.sessionId,
created_at: record.createdAt,
updated_at: record.updatedAt,
status: record.status,
title: record.title,
parent_session_id: record.parentSessionId,
});
});
chatRouter.get("/sessions", async (req, res) => {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const records = await sessionMetadataStore.list({
actorKey,
projectId,
projectKey,
userId,
});
res.json({
sessions: records.map((record) => ({
id: record.sessionId,
title: record.title ?? "新对话",
created_at: record.createdAt,
updated_at: record.updatedAt,
status: record.status,
parent_session_id: record.parentSessionId,
is_streaming: activeRuns.get(record.sessionId)?.status === "running",
run_status: getSessionRunStatus(record.sessionId),
})),
});
});
chatRouter.get("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const sessionRecord = await sessionMetadataStore.get(
{
actorKey,
projectId,
projectKey,
userId,
},
sessionId,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const state = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord),
);
res.json({
id: sessionRecord.sessionId,
title: sessionRecord.title ?? "新对话",
is_title_manually_edited: state?.isTitleManuallyEdited ?? false,
created_at: sessionRecord.createdAt,
updated_at: sessionRecord.updatedAt,
status: sessionRecord.status,
session_id: sessionRecord.sessionId,
messages: state?.messages ?? [],
branch_groups: state?.branchGroups ?? [],
parent_session_id: sessionRecord.parentSessionId,
is_streaming: activeRuns.get(sessionRecord.sessionId)?.status === "running",
run_status: getSessionRunStatus(sessionRecord.sessionId),
});
});
chatRouter.get("/session/:sessionId/stream", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders?.();
const run = activeRuns.get(sessionRecord.sessionId);
const state = await sessionUiStateStore.read(toSessionUiStateContext(sessionRecord));
res.write(
toSse("state", {
session_id: sessionRecord.sessionId,
messages: state?.messages ?? run?.messages ?? [],
is_streaming: run?.status === "running",
run_status: getSessionRunStatus(sessionRecord.sessionId) ?? "completed",
}),
);
if (!run || run.status !== "running") {
res.end();
return;
}
const subscriber: StreamSubscriber = {
write: (event, data) => {
if (!res.writableEnded && !res.destroyed) {
res.write(toSse(event, data));
}
},
close: () => {
if (!res.writableEnded && !res.destroyed) {
res.end();
}
},
};
run.subscribers.add(subscriber);
const cleanup = () => {
run.subscribers.delete(subscriber);
};
req.on("close", cleanup);
res.on("close", cleanup);
});
chatRouter.put("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const parsed = sessionStateSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const { record } = await sessionMetadataStore.ensure({
actorKey,
projectId,
projectKey,
sessionId,
userId,
});
const nextRecord = await sessionMetadataStore.touch(record, {
...(parsed.data.title ? { title: parsed.data.title } : {}),
});
await sessionUiStateStore.write(toSessionUiStateContext(nextRecord), {
sessionId: nextRecord.sessionId,
isTitleManuallyEdited: parsed.data.is_title_manually_edited,
messages: parsed.data.messages,
branchGroups: parsed.data.branch_groups,
});
const latestTurn = extractLatestFrontendTurn(parsed.data.messages);
if (latestTurn) {
void learningOrchestrator.onTurnCompleted({
...latestTurn,
requestContext: {
actorKey,
clientSessionId: nextRecord.sessionId,
projectId,
projectKey,
traceId: req.header("x-trace-id") ?? `save-${nextRecord.sessionId}`,
userId,
},
sessionId: nextRecord.sessionId,
}).catch((error) => {
logger.warn(
{ err: error, sessionId: nextRecord.sessionId },
"post-save learning failed",
);
});
}
res.json({
id: nextRecord.sessionId,
title: nextRecord.title ?? "新对话",
created_at: nextRecord.createdAt,
updated_at: nextRecord.updatedAt,
status: nextRecord.status,
session_id: nextRecord.sessionId,
});
});
chatRouter.patch("/session/:sessionId/title", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const title =
typeof req.body?.title === "string" ? req.body.title.trim() : "";
const isTitleManuallyEdited =
typeof req.body?.is_title_manually_edited === "boolean"
? req.body.is_title_manually_edited
: undefined;
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId || !title) {
res.status(400).json({ message: "session_id and title are required" });
return;
}
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!sessionRecord) {
res.status(404).json({ message: "session not found" });
return;
}
const nextSessionRecord = await sessionMetadataStore.touch(sessionRecord, { title });
const state = await sessionUiStateStore.read(
toSessionUiStateContext(nextSessionRecord),
);
if (state) {
await sessionUiStateStore.write(
toSessionUiStateContext(nextSessionRecord),
{
...state,
isTitleManuallyEdited:
isTitleManuallyEdited ?? state.isTitleManuallyEdited,
},
);
}
res.json({
id: nextSessionRecord.sessionId,
title: nextSessionRecord.title,
updated_at: nextSessionRecord.updatedAt,
});
});
chatRouter.delete("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!sessionRecord) {
res.status(204).end();
return;
}
await sessionUiStateStore.remove(toSessionUiStateContext(sessionRecord));
await sessionBridge.deleteSession({
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
});
activeRuns.delete(sessionRecord.sessionId);
lastRunStatuses.delete(sessionRecord.sessionId);
await sessionMetadataStore.remove(sessionRecord);
res.status(204).end();
});
chatRouter.get("/render-ref/:renderRef", async (req, res) => {
const renderRef = req.params.renderRef?.trim();
const userId = req.header("x-user-id")?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const clientSessionId =
typeof req.query.session_id === "string"
? req.query.session_id.trim()
: undefined;
if (!userId) {
res.status(400).json({
message: "x-user-id is required",
});
return;
}
if (!renderRef) {
res.status(400).json({
message: "render_ref is required",
});
return;
}
const result = await resultReferenceResolver.getFullAuthorized(
renderRef,
{
actorKey: toActorKey(userId),
clientSessionId,
projectId,
},
{
expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload,
},
);
if (!result) {
res.status(404).json({ message: "render_ref not found" });
return;
}
res.json(result);
});
chatRouter.post("/abort", async (req, res) => {
const parsed = abortPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sessionRecord = await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
const binding = sessionRecord
? await sessionBridge.abort({
clientSessionId: sessionRecord.sessionId,
sessionId: sessionRecord.sessionId,
})
: null;
const run = activeRuns.get(parsed.data.session_id);
if (run && run.status === "running") {
run.status = "aborted";
lastRunStatuses.set(parsed.data.session_id, "aborted");
run.controller.abort();
run.messages = updateLastAssistantMessage(run.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: "⚠️ **请求已中断**",
isError: true,
progress: completeBackendProgress(message.progress),
}));
if (sessionRecord) {
const currentState = await sessionUiStateStore.read(
toSessionUiStateContext(sessionRecord),
);
await sessionUiStateStore.write(toSessionUiStateContext(sessionRecord), {
sessionId: sessionRecord.sessionId,
isTitleManuallyEdited: currentState?.isTitleManuallyEdited ?? false,
messages: run.messages,
branchGroups: currentState?.branchGroups ?? [],
});
}
for (const subscriber of run.subscribers) {
subscriber.write("error", {
session_id: parsed.data.session_id,
message: "请求已中断",
});
subscriber.close();
}
run.subscribers.clear();
}
if (!binding && !run) {
res.status(204).end();
return;
}
logger.info(
{
clientSessionId: parsed.data.session_id,
sessionId: binding?.sessionId ?? parsed.data.session_id,
},
"aborted chat session by client request",
);
res.status(202).json({
session_id: parsed.data.session_id,
aborted: true,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat abort failed");
res.status(500).json({
message: "chat abort failed",
detail,
});
}
});
chatRouter.post("/fork", async (req, res) => {
const parsed = forkPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sourceSessionId = parsed.data.session_id?.trim();
const sourceSessionRecord = sourceSessionId
? await sessionMetadataStore.get(
{
actorKey,
projectId,
projectKey,
userId,
},
sourceSessionId,
)
: null;
const forkSession = await runtime.createSession();
const { record: targetSessionRecord } = await sessionMetadataStore.ensure({
actorKey,
parentSessionId: sourceSessionId,
projectId,
projectKey,
sessionId: forkSession.id,
userId,
});
const nextSessionId = targetSessionRecord.sessionId;
if (sourceSessionId && parsed.data.keep_message_count > 0) {
await sessionTranscriptStore.cloneThread(
{
actorKey,
clientSessionId: sourceSessionId,
projectKey,
sessionId: sourceSessionId,
},
{
actorKey,
clientSessionId: nextSessionId,
projectKey,
sessionId: nextSessionId,
},
parsed.data.keep_message_count,
);
if (sourceSessionRecord?.title) {
await sessionMetadataStore.touch(targetSessionRecord, {
title: sourceSessionRecord.title,
});
}
}
logger.info(
{
sourceSessionId: parsed.data.session_id,
sessionId: nextSessionId,
traceId,
projectId,
keepMessageCount: parsed.data.keep_message_count,
},
"forked chat session",
);
res.status(200).json({
session_id: nextSessionId,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat fork failed");
res.status(500).json({
message: "chat fork failed",
detail,
});
}
});
chatRouter.post("/stream", async (req, res) => {
const parsed = payloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const authHeader = req.header("authorization");
const accessToken = authHeader?.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const requestedSessionId = parsed.data.session_id?.trim();
const existingSessionRecord = requestedSessionId
? await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
requestedSessionId,
)
: null;
const hadExistingRuntimeSession = Boolean(existingSessionRecord);
const { binding, requestContext, created } = await sessionBridge.resolve({
sessionId: requestedSessionId,
accessToken,
projectId,
traceId,
userId,
});
const { record: ensuredSessionRecord, created: sessionCreated } =
await sessionMetadataStore.ensure({
actorKey,
projectId,
projectKey,
sessionId: binding.sessionId,
userId,
});
const activeSessionRecord = await sessionMetadataStore.touch(ensuredSessionRecord);
const historyContext = {
actorKey: requestContext.actorKey,
clientSessionId: requestContext.clientSessionId,
projectKey: requestContext.projectKey,
sessionId: requestContext.clientSessionId,
};
const recentTurns = await sessionTranscriptStore.getRecentTurns(historyContext, 8);
const initialSessionState = await sessionUiStateStore.read(
toSessionUiStateContext(activeSessionRecord),
);
if (activeRuns.get(activeSessionRecord.sessionId)?.status === "running") {
res.status(409).json({
message: "session is already streaming",
session_id: activeSessionRecord.sessionId,
});
return;
}
logger.info(
{
clientSessionId: requestContext.clientSessionId,
sessionId: binding.sessionId,
created: created || sessionCreated,
model: parsed.data.model,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
},
"processing chat request",
);
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders?.();
const clientSessionId = requestContext.clientSessionId;
let streamClosed = false;
const abortController = new AbortController();
sessionBridge.registerAbortController(clientSessionId, abortController);
const initialMessages = createInitialStreamingMessages(
initialSessionState?.messages ?? [],
parsed.data.message,
);
const branchGroups = initialSessionState?.branchGroups ?? [];
const activeRun: ActiveRun = {
clientSessionId,
controller: abortController,
messages: initialMessages,
status: "running",
subscribers: new Set(),
};
activeRuns.set(clientSessionId, activeRun);
lastRunStatuses.set(clientSessionId, "running");
const sessionUiStateContext = toSessionUiStateContext(activeSessionRecord);
let persistQueue = sessionUiStateStore.write(sessionUiStateContext, {
sessionId: activeSessionRecord.sessionId,
isTitleManuallyEdited: initialSessionState?.isTitleManuallyEdited ?? false,
messages: initialMessages,
branchGroups,
});
const queueSessionUiStatePersist = () => {
const snapshot = {
sessionId: activeSessionRecord.sessionId,
isTitleManuallyEdited: initialSessionState?.isTitleManuallyEdited ?? false,
messages: activeRun.messages,
branchGroups,
};
persistQueue = persistQueue
.catch((error) => {
logger.warn(
{ err: error, sessionId: clientSessionId },
"failed to persist previous chat stream state",
);
})
.then(() => sessionUiStateStore.write(sessionUiStateContext, snapshot));
return persistQueue;
};
const primarySubscriber: StreamSubscriber = {
write: (event, data) => {
if (!streamClosed && !res.writableEnded && !res.destroyed) {
res.write(toSse(event, data));
}
},
close: () => {
if (!res.writableEnded && !res.destroyed) {
res.end();
}
},
};
activeRun.subscribers.add(primarySubscriber);
const handleClientClose = () => {
streamClosed = true;
activeRun.subscribers.delete(primarySubscriber);
};
req.on("close", handleClientClose);
res.on("close", handleClientClose);
const publish = (event: string, data: Record<string, unknown>) => {
if (event === "token") {
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
content: `${typeof message.content === "string" ? message.content : ""}${typeof data.content === "string" ? data.content : ""}`,
isError: false,
}));
} else if (event === "progress") {
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
progress: upsertBackendProgress(message.progress, data),
}));
} else if (event === "done") {
activeRun.status = "completed";
lastRunStatuses.set(clientSessionId, "completed");
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: "Agent 已完成处理,但没有生成文本回答。请查看过程记录,或换个更具体的问题重试。",
progress: completeBackendProgress(message.progress),
}));
} else if (event === "error") {
activeRun.status = activeRun.status === "aborted" ? "aborted" : "error";
lastRunStatuses.set(clientSessionId, activeRun.status);
activeRun.messages = updateLastAssistantMessage(activeRun.messages, (message) => ({
...message,
content:
typeof message.content === "string" && message.content.trim()
? message.content
: `⚠️ **错误:** ${typeof data.message === "string" ? data.message : "unknown error"}`,
isError: true,
progress: completeBackendProgress(message.progress),
}));
}
for (const subscriber of activeRun.subscribers) {
subscriber.write(event, data);
}
void queueSessionUiStatePersist().catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state");
});
};
try {
const preparedMessage = await buildPromptWithLearningContext(
memoryStore,
requestContext.actorKey,
requestContext.projectKey,
{
recentTurns,
persistedMessages: initialSessionState?.messages,
message: parsed.data.message,
restoreConversation: !hadExistingRuntimeSession,
},
);
const streamResult = await streamPromptResponse({
runtime,
sessionId: binding.sessionId,
clientSessionId,
message: preparedMessage,
model: parsed.data.model,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
signal: abortController.signal,
write: (event, data) => {
publish(event, data);
},
});
await persistQueue.catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state");
});
if (!streamResult.aborted && !streamResult.failed) {
const messages = await runtime.messages(binding.sessionId, 60);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const assistantText = collectTextContent(assistantMessage?.parts ?? []);
const latestSessionRecord =
(await sessionMetadataStore.get(
{ actorKey, projectId, projectKey, userId },
activeSessionRecord.sessionId,
)) ?? activeSessionRecord;
const latestSessionState = await sessionUiStateStore.read(
toSessionUiStateContext(latestSessionRecord),
);
const existingSessionTitle = latestSessionRecord.title;
let sessionTitle = existingSessionTitle;
const shouldGenerateTitle = shouldGenerateSessionTitle({
recentTurnCount: recentTurns.length,
isTitleManuallyEdited:
latestSessionState?.isTitleManuallyEdited ?? false,
});
if (shouldGenerateTitle) {
sessionTitle = await generateSessionTitle(runtime, {
sessionId: binding.sessionId,
latestAssistantMessage: assistantText,
latestUserMessage: parsed.data.message,
fallbackTitle: existingSessionTitle,
});
}
const nextSessionRecord = await sessionMetadataStore.touch(latestSessionRecord, {
...(sessionTitle && sessionTitle !== existingSessionTitle
? { title: sessionTitle }
: {}),
});
if (
shouldGenerateTitle &&
sessionTitle &&
sessionTitle !== existingSessionTitle
) {
publish("session_title", {
session_id: clientSessionId,
title: sessionTitle,
});
await persistQueue.catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state");
});
}
}
} finally {
await persistQueue.catch((error) => {
logger.warn({ err: error, sessionId: clientSessionId }, "failed to persist chat stream state");
});
sessionBridge.finalizeRequest(clientSessionId);
activeRun.status = abortController.signal.aborted
? activeRun.status === "aborted"
? "aborted"
: "aborted"
: activeRun.status === "running"
? "completed"
: activeRun.status;
lastRunStatuses.set(clientSessionId, activeRun.status);
for (const subscriber of activeRun.subscribers) {
subscriber.close();
}
activeRun.subscribers.clear();
activeRuns.delete(clientSessionId);
streamClosed = true;
req.off("close", handleClientClose);
res.off("close", handleClientClose);
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat stream failed");
res.status(500).json({
message: "chat stream failed",
detail,
});
}
});
return chatRouter;
};
const toSse = (event: string, data: Record<string, unknown>) =>
`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;