Files
TJWaterAgent/src/routes/chat.ts
T

679 lines
22 KiB
TypeScript

import { Router } from "express";
import { z } from "zod";
import { type LearningOrchestrator } from "../learning/orchestrator.js";
import { type SessionHistoryStore } from "../history/store.js";
import { logger } from "../logger.js";
import { MemoryStore } from "../memory/store.js";
import { type ConversationStateStore } from "../conversations/stateStore.js";
import { type ConversationStore } from "../conversations/store.js";
import { type ResultReferenceResolver } from "../results/resolver.js";
import { RESULT_REFERENCE_KIND } from "../results/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
import { type ConversationRecord } from "../conversations/store.js";
import { toActorKey, toProjectKey } from "../utils/fileStore.js";
import {
buildPromptWithLearningContext,
generateSessionTitle,
shouldGenerateSessionTitle,
} from "./chatSession.js";
import {
collectTextContent,
streamPromptResponse,
supportedModels,
type SupportedModel,
} from "./chatStream.js";
const payloadSchema = z.object({
message: z.string().min(1).max(10000),
session_id: z.string().max(128).optional(),
model: z.enum(supportedModels).optional(),
});
const abortPayloadSchema = z.object({
session_id: z.string().max(128),
});
const createSessionPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
parent_session_id: z.string().max(128).optional(),
});
const forkPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
keep_message_count: z.coerce.number().int().min(0),
});
const conversationStateSchema = z.object({
title: z.string().max(120).optional(),
is_title_manually_edited: z.boolean().optional(),
messages: z.array(z.unknown()).default([]),
branch_groups: z.array(z.unknown()).default([]),
});
const toConversationStateContext = (conversation: ConversationRecord) => ({
actorKey: conversation.actorKey,
projectKey: conversation.projectKey,
sessionId: conversation.sessionId,
});
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
conversationStore: ConversationStore,
conversationStateStore: ConversationStateStore,
memoryStore: MemoryStore,
sessionHistoryStore: SessionHistoryStore,
learningOrchestrator: LearningOrchestrator,
resultReferenceResolver: ResultReferenceResolver,
) => {
const chatRouter = Router();
chatRouter.post("/session", async (req, res) => {
const parsed = createSessionPayloadSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const { record, created } = await conversationStore.ensure({
actorKey,
parentSessionId: parsed.data.parent_session_id,
projectId,
projectKey,
sessionId: parsed.data.session_id,
userId,
});
res.status(created ? 201 : 200).json({
session_id: record.sessionId,
created_at: record.createdAt,
updated_at: record.updatedAt,
status: record.status,
title: record.title,
parent_session_id: record.parentSessionId,
});
});
chatRouter.get("/sessions", async (req, res) => {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const records = await conversationStore.list({
actorKey,
projectId,
projectKey,
userId,
});
res.json({
sessions: records.map((record) => ({
id: record.sessionId,
title: record.title ?? "新对话",
created_at: record.createdAt,
updated_at: record.updatedAt,
status: record.status,
parent_session_id: record.parentSessionId,
})),
});
});
chatRouter.get("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const conversation = await conversationStore.get(
{
actorKey,
projectId,
projectKey,
userId,
},
sessionId,
);
if (!conversation) {
res.status(404).json({ message: "session not found" });
return;
}
const state = await conversationStateStore.read(
toConversationStateContext(conversation),
);
res.json({
id: conversation.sessionId,
title: conversation.title ?? "新对话",
is_title_manually_edited: state?.isTitleManuallyEdited ?? false,
created_at: conversation.createdAt,
updated_at: conversation.updatedAt,
status: conversation.status,
session_id: conversation.sessionId,
messages: state?.messages ?? [],
branch_groups: state?.branchGroups ?? [],
parent_session_id: conversation.parentSessionId,
});
});
chatRouter.put("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const parsed = conversationStateSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const { record } = await conversationStore.ensure({
actorKey,
projectId,
projectKey,
sessionId,
userId,
});
const nextRecord = await conversationStore.touch(record, {
...(parsed.data.title ? { title: parsed.data.title } : {}),
});
await conversationStateStore.write(toConversationStateContext(nextRecord), {
sessionId: nextRecord.sessionId,
isTitleManuallyEdited: parsed.data.is_title_manually_edited,
messages: parsed.data.messages,
branchGroups: parsed.data.branch_groups,
});
res.json({
id: nextRecord.sessionId,
title: nextRecord.title ?? "新对话",
created_at: nextRecord.createdAt,
updated_at: nextRecord.updatedAt,
status: nextRecord.status,
session_id: nextRecord.sessionId,
});
});
chatRouter.patch("/session/:sessionId/title", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const title =
typeof req.body?.title === "string" ? req.body.title.trim() : "";
const isTitleManuallyEdited =
typeof req.body?.is_title_manually_edited === "boolean"
? req.body.is_title_manually_edited
: undefined;
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId || !title) {
res.status(400).json({ message: "session_id and title are required" });
return;
}
const conversation = await conversationStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!conversation) {
res.status(404).json({ message: "session not found" });
return;
}
const nextConversation = await conversationStore.touch(conversation, { title });
const state = await conversationStateStore.read(
toConversationStateContext(nextConversation),
);
if (state) {
await conversationStateStore.write(
toConversationStateContext(nextConversation),
{
...state,
isTitleManuallyEdited:
isTitleManuallyEdited ?? state.isTitleManuallyEdited,
},
);
}
res.json({
id: nextConversation.sessionId,
title: nextConversation.title,
updated_at: nextConversation.updatedAt,
});
});
chatRouter.delete("/session/:sessionId", async (req, res) => {
const sessionId = req.params.sessionId?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
if (!sessionId) {
res.status(400).json({ message: "session_id is required" });
return;
}
const conversation = await conversationStore.get(
{ actorKey, projectId, projectKey, userId },
sessionId,
);
if (!conversation) {
res.status(204).end();
return;
}
await conversationStateStore.remove(toConversationStateContext(conversation));
if (conversation.opencodeSessionId) {
await sessionBridge.deleteConversationSession({
clientSessionId: conversation.sessionId,
sessionId: conversation.opencodeSessionId,
});
}
await conversationStore.remove(conversation);
res.status(204).end();
});
chatRouter.get("/render-ref/:renderRef", async (req, res) => {
const renderRef = req.params.renderRef?.trim();
const userId = req.header("x-user-id")?.trim();
const projectId = req.header("x-project-id") ?? undefined;
const clientSessionId =
typeof req.query.session_id === "string"
? req.query.session_id.trim()
: undefined;
if (!userId) {
res.status(400).json({
message: "x-user-id is required",
});
return;
}
if (!renderRef) {
res.status(400).json({
message: "render_ref is required",
});
return;
}
const result = await resultReferenceResolver.getFullAuthorized(
renderRef,
{
actorKey: toActorKey(userId),
clientSessionId,
projectId,
},
{
expectedKind: RESULT_REFERENCE_KIND.renderJunctionsPayload,
},
);
if (!result) {
res.status(404).json({ message: "render_ref not found" });
return;
}
res.json(result);
});
chatRouter.post("/abort", async (req, res) => {
const parsed = abortPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const conversation = await conversationStore.get(
{ actorKey, projectId, projectKey, userId },
parsed.data.session_id,
);
const binding = conversation?.opencodeSessionId
? await sessionBridge.abort({
clientSessionId: conversation.sessionId,
sessionId: conversation.opencodeSessionId,
})
: null;
if (!binding) {
res.status(204).end();
return;
}
logger.info(
{
clientSessionId: parsed.data.session_id,
sessionId: binding.sessionId,
},
"aborted chat session by client request",
);
res.status(202).json({
session_id: parsed.data.session_id,
aborted: true,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat abort failed");
res.status(500).json({
message: "chat abort failed",
detail,
});
}
});
chatRouter.post("/fork", async (req, res) => {
const parsed = forkPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const sourceClientSessionId = parsed.data.session_id?.trim();
const sourceConversation = sourceClientSessionId
? await conversationStore.get(
{
actorKey,
projectId,
projectKey,
userId,
},
sourceClientSessionId,
)
: null;
const { record: targetConversation } = await conversationStore.ensure({
actorKey,
parentSessionId: sourceClientSessionId,
projectId,
projectKey,
userId,
});
const nextClientSessionId = targetConversation.sessionId;
if (sourceClientSessionId && parsed.data.keep_message_count > 0) {
await sessionHistoryStore.cloneThread(
{
actorKey,
clientSessionId: sourceClientSessionId,
projectKey,
sessionId: sourceClientSessionId,
},
{
actorKey,
clientSessionId: nextClientSessionId,
projectKey,
sessionId: nextClientSessionId,
},
parsed.data.keep_message_count,
);
if (sourceConversation?.title) {
await conversationStore.touch(targetConversation, {
title: sourceConversation.title,
});
}
}
logger.info(
{
sourceClientSessionId: parsed.data.session_id,
clientSessionId: nextClientSessionId,
traceId,
projectId,
keepMessageCount: parsed.data.keep_message_count,
},
"forked chat session",
);
res.status(200).json({
session_id: nextClientSessionId,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat fork failed");
res.status(500).json({
message: "chat fork failed",
detail,
});
}
});
chatRouter.post("/stream", async (req, res) => {
const parsed = payloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const authHeader = req.header("authorization");
const accessToken = authHeader?.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const userId = req.header("x-user-id") ?? undefined;
const actorKey = toActorKey(userId);
const projectKey = toProjectKey(projectId);
const { record: conversation, created: conversationCreated } =
await conversationStore.ensure({
actorKey,
projectId,
projectKey,
sessionId: parsed.data.session_id,
userId,
});
const activeConversation = await conversationStore.touch(conversation);
const hadExistingRuntimeSession = Boolean(activeConversation.opencodeSessionId);
const { binding, requestContext, created } = await sessionBridge.resolve({
clientSessionId: activeConversation.sessionId,
sessionId: activeConversation.opencodeSessionId,
accessToken,
projectId,
traceId,
userId,
});
const conversationWithRuntime =
created && binding.sessionId !== activeConversation.opencodeSessionId
? await conversationStore.touch(activeConversation, {
opencodeSessionId: binding.sessionId,
})
: activeConversation;
const historyContext = {
actorKey: requestContext.actorKey,
clientSessionId: requestContext.clientSessionId,
projectKey: requestContext.projectKey,
sessionId: requestContext.clientSessionId,
};
const recentTurns = await sessionHistoryStore.getRecentTurns(historyContext, 8);
const initialConversationState = await conversationStateStore.read(
toConversationStateContext(conversationWithRuntime),
);
logger.info(
{
clientSessionId: requestContext.clientSessionId,
sessionId: binding.sessionId,
created: created || conversationCreated,
model: parsed.data.model,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
},
"processing chat request",
);
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders?.();
const clientSessionId = requestContext.clientSessionId;
let streamClosed = false;
const abortController = new AbortController();
sessionBridge.registerAbortController(clientSessionId, abortController);
const handleClientClose = () => {
if (streamClosed || abortController.signal.aborted) {
return;
}
abortController.abort();
};
req.on("close", handleClientClose);
res.on("close", handleClientClose);
try {
const preparedMessage = await buildPromptWithLearningContext(
memoryStore,
requestContext.actorKey,
requestContext.projectKey,
{
recentTurns,
persistedMessages: initialConversationState?.messages,
message: parsed.data.message,
restoreConversation: !hadExistingRuntimeSession,
},
);
const streamResult = await streamPromptResponse({
runtime,
opencodeSessionId: binding.sessionId,
clientSessionId,
message: preparedMessage,
model: parsed.data.model,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
signal: abortController.signal,
write: (event, data) => {
if (streamClosed || res.writableEnded || res.destroyed) {
return;
}
res.write(toSse(event, data));
},
});
if (!streamResult.aborted && !streamResult.failed) {
const messages = await runtime.messages(binding.sessionId, 60);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const assistantText = collectTextContent(assistantMessage?.parts ?? []);
const latestConversation =
(await conversationStore.get(
{ actorKey, projectId, projectKey, userId },
conversationWithRuntime.sessionId,
)) ?? conversationWithRuntime;
const latestConversationState = await conversationStateStore.read(
toConversationStateContext(latestConversation),
);
const existingSessionTitle = latestConversation.title;
let sessionTitle = existingSessionTitle;
const shouldGenerateTitle = shouldGenerateSessionTitle({
recentTurnCount: recentTurns.length,
isTitleManuallyEdited:
latestConversationState?.isTitleManuallyEdited ?? false,
});
if (shouldGenerateTitle) {
sessionTitle = await generateSessionTitle(runtime, {
sessionId: binding.sessionId,
latestAssistantMessage: assistantText,
latestUserMessage: parsed.data.message,
fallbackTitle: existingSessionTitle,
});
}
const nextConversation = await conversationStore.touch(latestConversation, {
...(sessionTitle && sessionTitle !== existingSessionTitle
? { title: sessionTitle }
: {}),
});
if (!streamClosed && !res.writableEnded && !res.destroyed) {
if (
shouldGenerateTitle &&
sessionTitle &&
sessionTitle !== existingSessionTitle
) {
res.write(
toSse("session_title", {
session_id: clientSessionId,
title: sessionTitle,
}),
);
}
}
if (assistantText) {
void learningOrchestrator.onTurnCompleted({
assistantMessage: assistantText,
model: parsed.data.model,
requestContext,
sessionId: clientSessionId,
toolCallCount: streamResult.toolCallCount,
userMessage: parsed.data.message,
}).catch((error) => {
logger.warn(
{ err: error, sessionId: clientSessionId },
"post-turn learning failed",
);
});
}
}
} finally {
sessionBridge.finalizeRequest(clientSessionId);
streamClosed = true;
req.off("close", handleClientClose);
res.off("close", handleClientClose);
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat stream failed");
res.status(500).json({
message: "chat stream failed",
detail,
});
}
});
return chatRouter;
};
const toSse = (event: string, data: Record<string, unknown>) =>
`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;