更新配置和聊天路由,添加会话中止与分叉功能

This commit is contained in:
2026-04-30 13:07:39 +08:00
parent 6f15b5d7e3
commit 76d407a81c
16 changed files with 1228 additions and 121 deletions
+60
View File
@@ -44,6 +44,66 @@ export class ChatSessionBridge {
getSessionContext(sessionId) {
return this.sessionContexts.get(sessionId) ?? null;
}
async abort(context) {
const clientSessionId = context.clientSessionId?.trim();
if (!clientSessionId) {
return null;
}
const requestContext = {
clientSessionId,
accessToken: context.accessToken,
projectId: context.projectId,
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
};
this.cleanupExpired();
const binding = this.registry.get(requestContext);
if (!binding) {
return null;
}
this.sessionContexts.set(binding.sessionId, requestContext);
await this.runtime.abortSession(binding.sessionId);
return binding;
}
async fork(context) {
const currentClientSessionId = context.clientSessionId?.trim();
const nextRequestContext = {
clientSessionId: `agent-${randomUUID().slice(0, 12)}`,
accessToken: context.accessToken,
projectId: context.projectId,
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
};
this.cleanupExpired();
if (!currentClientSessionId || context.keepMessageCount <= 0) {
const session = await this.runtime.createSession(nextRequestContext.clientSessionId);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
return { binding, requestContext: nextRequestContext, created: true };
}
const currentContext = {
clientSessionId: currentClientSessionId,
accessToken: context.accessToken,
projectId: context.projectId,
traceId: nextRequestContext.traceId,
};
const current = this.registry.get(currentContext);
if (!current) {
const session = await this.runtime.createSession(nextRequestContext.clientSessionId);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
return { binding, requestContext: nextRequestContext, created: true };
}
await this.runtime.getSession(current.sessionId);
const messages = await this.runtime.messages(current.sessionId, Math.max(100, context.keepMessageCount + 20));
const chatMessages = messages.filter((message) => message.info.role === "user" || message.info.role === "assistant");
const keepMessage = chatMessages[context.keepMessageCount - 1];
if (!keepMessage) {
throw new Error(`fork keep point not found for message count ${context.keepMessageCount}`);
}
const session = await this.runtime.forkSession(current.sessionId, keepMessage.info.id);
const binding = this.registry.upsert(nextRequestContext, session.id);
this.sessionContexts.set(binding.sessionId, nextRequestContext);
return { binding, requestContext: nextRequestContext, created: true };
}
cleanupExpired() {
const expiredSessionIds = this.registry.evictExpired();
for (const sessionId of expiredSessionIds) {
+4 -1
View File
@@ -1,4 +1,7 @@
import dotenv from "dotenv";
import { z } from "zod";
// 本地开发可在项目根目录放 .local.env;已存在的系统环境变量优先级更高。
dotenv.config({ path: ".local.env", override: false });
// 统一在启动时解析环境变量,避免业务代码里散落字符串默认值。
const envSchema = z.object({
NODE_ENV: z.string().default("development"),
@@ -9,7 +12,7 @@ const envSchema = z.object({
OPENCODE_HOSTNAME: z.string().default("127.0.0.1"),
OPENCODE_PORT: z.coerce.number().int().positive().default(4096),
OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000),
OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"),
OPENCODE_MODEL: z.string().default("deepseek/deepseek-v4-pro"),
OPENCODE_BASE_URL: z.string().optional(),
OPENCODE_SERVER_PASSWORD: z.string().optional(),
OPENCODE_SERVER_USERNAME: z.string().default("opencode"),
+385 -42
View File
@@ -5,8 +5,105 @@ const payloadSchema = z.object({
message: z.string().min(1).max(10000),
session_id: z.string().max(128).optional(),
});
const abortPayloadSchema = z.object({
session_id: z.string().max(128),
});
const forkPayloadSchema = z.object({
session_id: z.string().max(128).optional(),
keep_message_count: z.coerce.number().int().min(0),
});
export const buildChatRouter = (sessionBridge, runtime) => {
const chatRouter = Router();
chatRouter.post("/abort", async (req, res) => {
const parsed = abortPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const authHeader = req.header("authorization");
const accessToken = authHeader?.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const binding = await sessionBridge.abort({
clientSessionId: parsed.data.session_id,
accessToken,
projectId,
traceId,
});
if (!binding) {
res.status(204).end();
return;
}
logger.info({
clientSessionId: parsed.data.session_id,
sessionId: binding.sessionId,
traceId,
projectId,
}, "aborted chat session by client request");
res.status(202).json({
session_id: parsed.data.session_id,
aborted: true,
});
}
catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat abort failed");
res.status(500).json({
message: "chat abort failed",
detail,
});
}
});
chatRouter.post("/fork", async (req, res) => {
const parsed = forkPayloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const authHeader = req.header("authorization");
const accessToken = authHeader?.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const { binding, requestContext } = await sessionBridge.fork({
clientSessionId: parsed.data.session_id,
accessToken,
projectId,
traceId,
keepMessageCount: parsed.data.keep_message_count,
});
logger.info({
sourceClientSessionId: parsed.data.session_id,
clientSessionId: requestContext.clientSessionId,
sessionId: binding.sessionId,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
keepMessageCount: parsed.data.keep_message_count,
}, "forked chat session");
res.status(200).json({
session_id: requestContext.clientSessionId,
});
}
catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat fork failed");
res.status(500).json({
message: "chat fork failed",
detail,
});
}
});
chatRouter.post("/stream", async (req, res) => {
const parsed = payloadSchema.safeParse(req.body);
if (!parsed.success) {
@@ -36,49 +133,46 @@ export const buildChatRouter = (sessionBridge, runtime) => {
traceId: requestContext.traceId,
projectId: requestContext.projectId,
}, "processing chat request");
// 当前先走“发送 prompt 后回读最近消息”的 opencode SDK 实现。
// 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。
const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message);
const assistantMessage = messages.find((message) => message.info.role === "assistant");
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders?.();
const clientSessionId = requestContext.clientSessionId;
const parts = assistantMessage?.parts ?? [];
const textContent = collectTextContent(parts);
if (textContent) {
res.write(toSse("token", {
session_id: clientSessionId,
content: textContent,
}));
let streamClosed = false;
const abortController = new AbortController();
const handleClientClose = () => {
if (streamClosed || abortController.signal.aborted) {
return;
}
abortController.abort();
};
req.on("close", handleClientClose);
res.on("close", handleClientClose);
try {
await streamPromptResponse({
runtime,
opencodeSessionId: binding.sessionId,
clientSessionId,
message: parsed.data.message,
signal: abortController.signal,
write: (event, data) => {
if (streamClosed || res.writableEnded || res.destroyed) {
return;
}
res.write(toSse(event, data));
},
});
}
for (const toolCall of collectToolCalls(parts)) {
res.write(toSse("tool_call", {
session_id: clientSessionId,
tool: toolCall.tool,
params: toolCall.params,
}));
finally {
streamClosed = true;
req.off("close", handleClientClose);
res.off("close", handleClientClose);
}
if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) {
res.write(toSse("error", {
session_id: clientSessionId,
message: getErrorMessage(assistantMessage.info.error),
detail: assistantMessage.info.error.name,
}));
if (!res.writableEnded && !res.destroyed) {
res.end();
}
else if (!assistantMessage) {
res.write(toSse("error", {
session_id: clientSessionId,
message: "assistant response unavailable",
detail: "no assistant message found after prompt",
}));
}
else {
res.write(toSse("done", { session_id: clientSessionId }));
}
res.end();
}
catch (error) {
const detail = error instanceof Error ? error.message : String(error);
@@ -92,15 +186,264 @@ export const buildChatRouter = (sessionBridge, runtime) => {
return chatRouter;
};
const toSse = (event, data) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
// 先把 opencode 的 Part 结构压平成 Agent API 的 SSE 语义。
const getErrorMessage = (error) => error.data?.message ?? error.name;
const streamPromptResponse = async ({ runtime, opencodeSessionId, clientSessionId, message, signal, write, }) => {
const eventStream = await runtime.subscribeEvents();
const iterator = eventStream[Symbol.asyncIterator]();
const emittedToolParts = new Set();
const partTypes = new Map();
const pendingTextDeltas = new Map();
let emittedText = false;
let done = false;
let promptSettled = false;
let aborted = signal?.aborted ?? false;
const abortPromise = signal
? new Promise((resolve) => {
if (signal.aborted) {
resolve({ type: "abort" });
return;
}
signal.addEventListener("abort", () => resolve({ type: "abort" }), {
once: true,
});
})
: null;
write("progress", {
session_id: clientSessionId,
id: "request-received",
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
});
const promptPromise = runtime
.prompt(opencodeSessionId, message)
.then(() => {
promptSettled = true;
})
.catch((error) => {
promptSettled = true;
throw error;
});
try {
while (!done) {
if (signal?.aborted) {
aborted = true;
break;
}
const nextEvent = iterator
.next()
.then((result) => ({ type: "event", result }));
const nextPrompt = promptSettled
? null
: promptPromise.then(() => ({ type: "prompt" }), (error) => ({ type: "prompt-error", error }));
const next = await Promise.race([
...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]),
...(abortPromise ? [abortPromise] : []),
]);
if (next.type === "abort") {
aborted = true;
break;
}
if (next.type === "prompt-error") {
throw next.error;
}
if (next.type === "prompt") {
continue;
}
if (next.result.done) {
break;
}
const event = next.result.value;
if (!isSessionEvent(event, opencodeSessionId)) {
continue;
}
if (event.type === "session.status") {
write("progress", {
session_id: clientSessionId,
id: "session-status",
phase: "session",
status: event.properties.status.type === "idle" ? "completed" : "running",
title: event.properties.status.type === "retry"
? `模型请求重试中:${event.properties.status.message}`
: event.properties.status.type === "busy"
? "Agent 正在处理请求"
: "Agent 已空闲",
});
continue;
}
if (event.type === "message.part.delta" && event.properties.field === "text") {
const partType = partTypes.get(event.properties.partID);
if (partType === "text") {
emittedText = true;
write("token", {
session_id: clientSessionId,
content: event.properties.delta,
});
}
else if (!partType) {
const pending = pendingTextDeltas.get(event.properties.partID) ?? [];
pending.push(event.properties.delta);
pendingTextDeltas.set(event.properties.partID, pending);
}
continue;
}
if (event.type === "message.part.updated") {
const part = event.properties.part;
partTypes.set(part.id, part.type);
if (part.type === "text") {
const pending = pendingTextDeltas.get(part.id) ?? [];
pendingTextDeltas.delete(part.id);
for (const content of pending) {
emittedText = true;
write("token", {
session_id: clientSessionId,
content,
});
}
}
else if (part.type === "reasoning") {
pendingTextDeltas.delete(part.id);
write("progress", {
session_id: clientSessionId,
id: part.id,
phase: "planning",
status: part.time.end ? "completed" : "running",
title: part.time.end ? "分析规划完成" : "正在规划分析步骤",
});
}
if (part.type === "tool") {
write("progress", {
session_id: clientSessionId,
id: part.id,
phase: "tool",
status: normalizeToolStatus(part.state.status),
title: getToolProgressTitle(part.tool, part.state.status),
detail: part.state.status === "error" ? part.state.error : undefined,
});
if (!emittedToolParts.has(part.id)) {
emittedToolParts.add(part.id);
write("tool_call", {
session_id: clientSessionId,
tool: part.tool,
params: part.state.input,
});
}
}
continue;
}
if (event.type === "todo.updated") {
const completed = event.properties.todos.filter((todo) => todo.status === "completed").length;
write("progress", {
session_id: clientSessionId,
id: "todo-progress",
phase: "planning",
status: completed === event.properties.todos.length ? "completed" : "running",
title: `计划进度 ${completed}/${event.properties.todos.length}`,
detail: event.properties.todos
.map((todo) => `${todo.status}: ${todo.content}`)
.join("\n"),
});
continue;
}
if (event.type === "session.error") {
write("error", {
session_id: clientSessionId,
message: event.properties.error
? getErrorMessage(event.properties.error)
: "opencode session error",
detail: event.properties.error?.name,
});
done = true;
continue;
}
if (event.type === "session.idle") {
write("progress", {
session_id: clientSessionId,
id: "session-status",
phase: "session",
status: "completed",
title: "Agent 已完成处理",
});
done = true;
}
}
if (aborted) {
await runtime.abortSession(opencodeSessionId).catch((error) => {
logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session");
});
return;
}
await promptPromise;
if (!emittedText) {
await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write);
}
write("progress", {
session_id: clientSessionId,
id: "request-received",
phase: "start",
status: "completed",
title: "请求处理完成",
});
write("progress", {
session_id: clientSessionId,
id: "request-completed",
phase: "complete",
status: "completed",
title: "分析完成",
});
write("done", { session_id: clientSessionId });
}
finally {
await iterator.return?.(undefined);
if (!promptSettled) {
await promptPromise.catch(() => undefined);
}
}
};
const isSessionEvent = (event, sessionId) => "properties" in event &&
typeof event.properties === "object" &&
event.properties !== null &&
"sessionID" in event.properties &&
event.properties.sessionID === sessionId;
const emitFallbackMessage = async (runtime, opencodeSessionId, clientSessionId, write) => {
const messages = await runtime.messages(opencodeSessionId);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const parts = assistantMessage?.parts ?? [];
const text = collectTextContent(parts);
if (text) {
write("token", {
session_id: clientSessionId,
content: text,
});
}
};
const collectTextContent = (parts) => parts
.filter((part) => part.type === "text")
.map((part) => part.text)
.join("");
const collectToolCalls = (parts) => parts
.filter((part) => part.type === "tool")
.map((part) => ({
tool: part.tool,
params: part.state.input,
}));
const getErrorMessage = (error) => error.data?.message ?? error.name;
const normalizeToolStatus = (status) => {
if (status === "completed")
return "completed";
if (status === "error")
return "error";
return "running";
};
const getToolProgressTitle = (tool, status) => {
const toolName = toolLabels[tool] ?? tool;
if (status === "completed")
return `${toolName} 已完成`;
if (status === "error")
return `${toolName} 执行失败`;
if (status === "pending")
return `准备调用 ${toolName}`;
return `正在调用 ${toolName}`;
};
const toolLabels = {
dynamic_http_call: "后端数据查询",
locate_features: "地图定位",
view_history: "历史数据面板",
view_scada: "SCADA 面板",
show_chart: "图表渲染",
};
+18 -3
View File
@@ -30,19 +30,34 @@ export class OpencodeRuntimeAdapter {
return requireData(response.data, "session.get");
}
async sendPrompt(sessionId, text) {
await this.prompt(sessionId, text);
// 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts
// 所以这里紧跟一次 messages() 回读,给上层路由统一消费。
return this.messages(sessionId);
}
async prompt(sessionId, text) {
const client = await this.ensureClient();
await client.session.prompt({
sessionID: sessionId,
parts: [{ type: "text", text }],
});
// 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts
// 所以这里紧跟一次 messages() 回读,给上层路由统一消费。
}
async messages(sessionId, limit = 20) {
const client = await this.ensureClient();
const messages = await client.session.messages({
sessionID: sessionId,
limit: 20,
limit,
});
return requireData(messages.data, "session.messages");
}
async forkSession(sessionId, messageId) {
const client = await this.ensureClient();
const response = await client.session.fork({
sessionID: sessionId,
messageID: messageId,
});
return requireData(response.data, "session.fork");
}
async abortSession(sessionId) {
const client = await this.ensureClient();
const response = await client.session.abort({
+11
View File
@@ -1,5 +1,6 @@
import { randomUUID } from "node:crypto";
import { config } from "../config.js";
import { logger } from "../logger.js";
const allowedMethods = new Set(["GET", "POST", "PUT", "PATCH", "DELETE"]);
const resultStore = new Map();
export class DynamicHttpExecutor {
@@ -28,11 +29,21 @@ export class DynamicHttpExecutor {
if (context.projectId) {
headers.set("x-project-id", context.projectId);
}
const startedAt = Date.now();
const response = await fetch(url, {
method,
headers,
signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS),
});
const durationMs = Date.now() - startedAt;
logger.info({
method,
path,
statusCode: response.status,
durationMs,
traceId: context.traceId,
projectId: context.projectId,
}, "dynamic_http_call completed");
const contentType = response.headers.get("content-type") ?? "";
const rawText = await response.text();
const data = contentType.includes("application/json") && rawText