From 76d407a81c399641fdefdc5dae8c8a8f7a72a9b3 Mon Sep 17 00:00:00 2001 From: Huarch Date: Thu, 30 Apr 2026 13:07:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=85=8D=E7=BD=AE=E5=92=8C?= =?UTF-8?q?=E8=81=8A=E5=A4=A9=E8=B7=AF=E7=94=B1=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E4=B8=AD=E6=AD=A2=E4=B8=8E=E5=88=86=E5=8F=89?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .opencode/agents/tjwater-assistant.md | 2 +- .opencode/skills/SKILL.md | 3 +- .opencode/skills/examples.md | 15 +- .opencode/skills/runbook.md | 22 ++ README.md | 128 ++++++- dist/chat/sessionBridge.js | 60 +++ dist/config.js | 5 +- dist/routes/chat.js | 427 ++++++++++++++++++--- dist/runtime/opencode.js | 21 +- dist/tools/dynamicHttpExecutor.js | 11 + opencode.json | 9 +- src/chat/sessionBridge.ts | 93 +++++ src/config.ts | 6 +- src/routes/chat.ts | 509 +++++++++++++++++++++++--- src/runtime/opencode.ts | 24 +- src/tools/dynamicHttpExecutor.ts | 14 + 16 files changed, 1228 insertions(+), 121 deletions(-) diff --git a/.opencode/agents/tjwater-assistant.md b/.opencode/agents/tjwater-assistant.md index b8067db..a694eb6 100644 --- a/.opencode/agents/tjwater-assistant.md +++ b/.opencode/agents/tjwater-assistant.md @@ -1,7 +1,7 @@ --- description: TJWater default assistant for water-network analysis and operator workflows mode: primary -model: anthropic/claude-sonnet-4-5 +model: deepseek/deepseek-v4-pro temperature: 0.2 --- You are the default TJWater assistant running on opencode. diff --git a/.opencode/skills/SKILL.md b/.opencode/skills/SKILL.md index e512e3b..ed7cffe 100644 --- a/.opencode/skills/SKILL.md +++ b/.opencode/skills/SKILL.md @@ -1,7 +1,7 @@ --- name: tjwater-skills-root-index description: TJWater Skills 分层索引(Domain -> Scenario -> Action)。 -version: 3.0.0 +version: 1.2.0 --- # TJWater Skills @@ -12,7 +12,6 @@ version: 3.0.0 ## 子模块索引 (渐进式引导) -- **ai**: 见 `./ai/SKILL.md` - **analytics**: 见 `./analytics/SKILL.md` - **business**: 见 `./business/SKILL.md` - **data**: 见 `./data/SKILL.md` diff --git a/.opencode/skills/examples.md b/.opencode/skills/examples.md index 85e6c21..ef5d126 100644 --- a/.opencode/skills/examples.md +++ b/.opencode/skills/examples.md @@ -18,6 +18,7 @@ - `x-project-id: ` 服务端内部行为: +- 持续通过 SSE `progress` 输出处理阶段,例如“正在规划分析步骤”“正在调用后端数据查询” - opencode agent 选择工具 `dynamic_http_call` - 工具参数示例: ```json @@ -41,7 +42,19 @@ 典型链路: - 第一步工具调用:查询历史数据接口。 - 第二步(可选)工具调用:查询补充数据接口。 -- opencode agent 汇总工具结果,持续通过 SSE 输出 token,最终返回 `done`。 +- opencode agent 汇总工具结果,持续通过 SSE 输出 `progress` 与 `token`,最终返回 `done`。 + +`progress` 示例: + +```json +{ + "session_id": "agent-demo-001", + "id": "tool-dynamic-http", + "phase": "tool", + "status": "running", + "title": "正在调用后端数据查询" +} +``` ## 示例 3:前端工具 — 定位要素 diff --git a/.opencode/skills/runbook.md b/.opencode/skills/runbook.md index 7a3f913..2ca89f1 100644 --- a/.opencode/skills/runbook.md +++ b/.opencode/skills/runbook.md @@ -13,6 +13,27 @@ 不提供 `/execute` 对外调用路径,统一通过 `chat/stream` + 工具调用链执行。 +请求体: + +```json +{ + "message": "帮我分析当前管网中的水力瓶颈管道,并给出改造建议", + "session_id": "agent-demo-001" +} +``` + +SSE 事件: + +| event | 用途 | 关键字段 | +| --- | --- | --- | +| `progress` | 展示 Agent 处理过程、规划和工具进度 | `session_id`, `id`, `phase`, `status`, `title`, `detail` | +| `token` | 渲染面向用户的最终回答文本 | `session_id`, `content` | +| `tool_call` | 驱动前端地图/面板/图表动作 | `session_id`, `tool`, `params` | +| `done` | 当前轮对话结束 | `session_id` | +| `error` | 当前轮失败 | `session_id`, `message`, `detail` | + +`progress.status` 取值为 `running`、`completed`、`error`;前端应按相同 `id` 覆盖更新同一条进度,而不是重复追加。 + ## 3) 工具参数约定(opencode agent 调用工具时) ```json @@ -60,6 +81,7 @@ 2) 返回简短确认给 opencode agent("已定位到管道") ↓ 前端同时收到: + - SSE event: progress → 展示规划/工具执行/完成状态 - SSE event: tool_call → 前端执行操作(定位地图/打开面板/渲染图表) - SSE event: token → 渲染 opencode agent 文字回复 ``` diff --git a/README.md b/README.md index 413305e..668c01b 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,22 @@ TJWaterAgent/ 6. 为 `.opencode/tools/dynamic_http_call.ts` 提供内部回调接口。 7. 代理调用真实 TJWater 后端 API。 +当前 Agent API 的主入口: + +```text +POST /api/v1/agent/chat/stream +``` + +该接口返回 SSE,事件包括: + +| event | 用途 | +| --- | --- | +| `progress` | 前端过程可视化,展示规划、工具调用和完成状态 | +| `token` | 最终回答文本流 | +| `tool_call` | 前端地图/面板/图表动作 | +| `done` | 当前轮完成 | +| `error` | 当前轮失败 | + 主要目录和文件: ```text @@ -132,24 +148,122 @@ typescript 这两组依赖不要混在一起:根目录负责服务运行,`.opencode` 负责 opencode 扩展资产的类型检查和运行依赖。 -## 部署安装 +## 启动与部署 默认部署不需要全局安装 `opencode` CLI。服务会通过 `@opencode-ai/sdk` 的 embedded 模式启动 opencode server。 -推荐安装和启动流程: +根目录的 npm scripts 已经封装 `.opencode` 依赖安装和类型检查,日常只需要在 `TJWaterAgent/` 根目录操作。 + +### 本地开发 + +```bash +cd TJWaterAgent +npm install +npm run dev +``` + +`npm install` 会通过 `postinstall` 自动执行 `.opencode` 依赖安装;`npm run dev` 启动前会检查 `.opencode/tools` 的类型。 + +开发模式支持热重载,以下文件变化会触发服务重启并重新拉起 embedded opencode: + +```text +src/** +.opencode/** +opencode.json +.local.env +``` + +因此修改 agent prompt、tools、skills、模型配置或本地环境变量后,不需要手动重启 `npm run dev`。 + +本地开发可以在项目根目录的 `.local.env` 中配置环境变量: + +```bash +DEEPSEEK_API_KEY=sk-xxx +TJWATER_API_BASE_URL=http://127.0.0.1:8000 +``` + +服务启动时会自动读取 `.local.env`,但系统环境变量优先级更高,适合在本机保存开发用 key。 + +### 生产启动 ```bash cd TJWaterAgent npm install npm run build - -cd .opencode -npm install - -cd .. npm run start ``` +也可以使用一条命令完成构建并启动: + +```bash +cd TJWaterAgent +npm install +npm run start:prod +``` + +### 常用脚本 + +| 命令 | 作用 | +| --- | --- | +| `npm run dev` | 类型检查 `.opencode` tools 后,以 watch 模式启动服务,并监听 `src`、`.opencode`、`opencode.json`、`.local.env` | +| `npm run build` | 构建服务并类型检查 `.opencode` tools | +| `npm run start` | 启动已构建的 `dist/server.js` | +| `npm run start:prod` | 先构建再启动 | +| `npm run check` | 执行完整构建检查 | +| `npm run install:opencode` | 手动安装 `.opencode` 依赖 | + +### 模型与 API 配置 + +默认 Agent 模型为: + +```text +deepseek/deepseek-v4-pro +``` + +涉及位置: + +```text +opencode.json +.opencode/agents/tjwater-assistant.md +src/config.ts 的 OPENCODE_MODEL 默认值 +``` + +如果需要临时覆盖模型,可以在启动时设置: + +```bash +OPENCODE_MODEL=deepseek/deepseek-v4-pro npm run start +``` + +DeepSeek API key 不写入代码,部署时通过环境变量设置: + +```bash +DEEPSEEK_API_KEY=sk-xxx npm run start +``` + +`opencode.json` 已配置从环境变量读取: + +```json +{ + "provider": { + "deepseek": { + "options": { + "apiKey": "{env:DEEPSEEK_API_KEY}" + } + } + } +} +``` + +如果需要自定义 DeepSeek 兼容 API 地址,可以通过 opencode 的 provider 配置增加 `baseURL`,例如在部署环境使用 `OPENCODE_CONFIG_CONTENT` 覆盖: + +```bash +OPENCODE_CONFIG_CONTENT='{"provider":{"deepseek":{"options":{"baseURL":"https://your-api.example.com/v1"}}}}' \ +DEEPSEEK_API_KEY=sk-xxx \ +npm run start +``` + +也可以使用 opencode 的 `/connect` 命令写入用户级凭据,但服务部署更推荐使用环境变量。 + 如果需要连接外部独立运行的 opencode server,可以配置: ```bash diff --git a/dist/chat/sessionBridge.js b/dist/chat/sessionBridge.js index a962574..f484023 100644 --- a/dist/chat/sessionBridge.js +++ b/dist/chat/sessionBridge.js @@ -44,6 +44,66 @@ export class ChatSessionBridge { getSessionContext(sessionId) { return this.sessionContexts.get(sessionId) ?? null; } + async abort(context) { + const clientSessionId = context.clientSessionId?.trim(); + if (!clientSessionId) { + return null; + } + const requestContext = { + clientSessionId, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + this.cleanupExpired(); + const binding = this.registry.get(requestContext); + if (!binding) { + return null; + } + this.sessionContexts.set(binding.sessionId, requestContext); + await this.runtime.abortSession(binding.sessionId); + return binding; + } + async fork(context) { + const currentClientSessionId = context.clientSessionId?.trim(); + const nextRequestContext = { + clientSessionId: `agent-${randomUUID().slice(0, 12)}`, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + this.cleanupExpired(); + if (!currentClientSessionId || context.keepMessageCount <= 0) { + const session = await this.runtime.createSession(nextRequestContext.clientSessionId); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } + const currentContext = { + clientSessionId: currentClientSessionId, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: nextRequestContext.traceId, + }; + const current = this.registry.get(currentContext); + if (!current) { + const session = await this.runtime.createSession(nextRequestContext.clientSessionId); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } + await this.runtime.getSession(current.sessionId); + const messages = await this.runtime.messages(current.sessionId, Math.max(100, context.keepMessageCount + 20)); + const chatMessages = messages.filter((message) => message.info.role === "user" || message.info.role === "assistant"); + const keepMessage = chatMessages[context.keepMessageCount - 1]; + if (!keepMessage) { + throw new Error(`fork keep point not found for message count ${context.keepMessageCount}`); + } + const session = await this.runtime.forkSession(current.sessionId, keepMessage.info.id); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } cleanupExpired() { const expiredSessionIds = this.registry.evictExpired(); for (const sessionId of expiredSessionIds) { diff --git a/dist/config.js b/dist/config.js index 67583a4..bfcade3 100644 --- a/dist/config.js +++ b/dist/config.js @@ -1,4 +1,7 @@ +import dotenv from "dotenv"; import { z } from "zod"; +// 本地开发可在项目根目录放 .local.env;已存在的系统环境变量优先级更高。 +dotenv.config({ path: ".local.env", override: false }); // 统一在启动时解析环境变量,避免业务代码里散落字符串默认值。 const envSchema = z.object({ NODE_ENV: z.string().default("development"), @@ -9,7 +12,7 @@ const envSchema = z.object({ OPENCODE_HOSTNAME: z.string().default("127.0.0.1"), OPENCODE_PORT: z.coerce.number().int().positive().default(4096), OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000), - OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"), + OPENCODE_MODEL: z.string().default("deepseek/deepseek-v4-pro"), OPENCODE_BASE_URL: z.string().optional(), OPENCODE_SERVER_PASSWORD: z.string().optional(), OPENCODE_SERVER_USERNAME: z.string().default("opencode"), diff --git a/dist/routes/chat.js b/dist/routes/chat.js index 0b7d7a4..5f5c56b 100644 --- a/dist/routes/chat.js +++ b/dist/routes/chat.js @@ -5,8 +5,105 @@ const payloadSchema = z.object({ message: z.string().min(1).max(10000), session_id: z.string().max(128).optional(), }); +const abortPayloadSchema = z.object({ + session_id: z.string().max(128), +}); +const forkPayloadSchema = z.object({ + session_id: z.string().max(128).optional(), + keep_message_count: z.coerce.number().int().min(0), +}); export const buildChatRouter = (sessionBridge, runtime) => { const chatRouter = Router(); + chatRouter.post("/abort", async (req, res) => { + const parsed = abortPayloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + const binding = await sessionBridge.abort({ + clientSessionId: parsed.data.session_id, + accessToken, + projectId, + traceId, + }); + if (!binding) { + res.status(204).end(); + return; + } + logger.info({ + clientSessionId: parsed.data.session_id, + sessionId: binding.sessionId, + traceId, + projectId, + }, "aborted chat session by client request"); + res.status(202).json({ + session_id: parsed.data.session_id, + aborted: true, + }); + } + catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat abort failed"); + res.status(500).json({ + message: "chat abort failed", + detail, + }); + } + }); + chatRouter.post("/fork", async (req, res) => { + const parsed = forkPayloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + const { binding, requestContext } = await sessionBridge.fork({ + clientSessionId: parsed.data.session_id, + accessToken, + projectId, + traceId, + keepMessageCount: parsed.data.keep_message_count, + }); + logger.info({ + sourceClientSessionId: parsed.data.session_id, + clientSessionId: requestContext.clientSessionId, + sessionId: binding.sessionId, + traceId: requestContext.traceId, + projectId: requestContext.projectId, + keepMessageCount: parsed.data.keep_message_count, + }, "forked chat session"); + res.status(200).json({ + session_id: requestContext.clientSessionId, + }); + } + catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat fork failed"); + res.status(500).json({ + message: "chat fork failed", + detail, + }); + } + }); chatRouter.post("/stream", async (req, res) => { const parsed = payloadSchema.safeParse(req.body); if (!parsed.success) { @@ -36,49 +133,46 @@ export const buildChatRouter = (sessionBridge, runtime) => { traceId: requestContext.traceId, projectId: requestContext.projectId, }, "processing chat request"); - // 当前先走“发送 prompt 后回读最近消息”的 opencode SDK 实现。 - // 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。 - const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message); - const assistantMessage = messages.find((message) => message.info.role === "assistant"); res.status(200); res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders?.(); const clientSessionId = requestContext.clientSessionId; - const parts = assistantMessage?.parts ?? []; - const textContent = collectTextContent(parts); - if (textContent) { - res.write(toSse("token", { - session_id: clientSessionId, - content: textContent, - })); + let streamClosed = false; + const abortController = new AbortController(); + const handleClientClose = () => { + if (streamClosed || abortController.signal.aborted) { + return; + } + abortController.abort(); + }; + req.on("close", handleClientClose); + res.on("close", handleClientClose); + try { + await streamPromptResponse({ + runtime, + opencodeSessionId: binding.sessionId, + clientSessionId, + message: parsed.data.message, + signal: abortController.signal, + write: (event, data) => { + if (streamClosed || res.writableEnded || res.destroyed) { + return; + } + res.write(toSse(event, data)); + }, + }); } - for (const toolCall of collectToolCalls(parts)) { - res.write(toSse("tool_call", { - session_id: clientSessionId, - tool: toolCall.tool, - params: toolCall.params, - })); + finally { + streamClosed = true; + req.off("close", handleClientClose); + res.off("close", handleClientClose); } - if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) { - res.write(toSse("error", { - session_id: clientSessionId, - message: getErrorMessage(assistantMessage.info.error), - detail: assistantMessage.info.error.name, - })); + if (!res.writableEnded && !res.destroyed) { + res.end(); } - else if (!assistantMessage) { - res.write(toSse("error", { - session_id: clientSessionId, - message: "assistant response unavailable", - detail: "no assistant message found after prompt", - })); - } - else { - res.write(toSse("done", { session_id: clientSessionId })); - } - res.end(); } catch (error) { const detail = error instanceof Error ? error.message : String(error); @@ -92,15 +186,264 @@ export const buildChatRouter = (sessionBridge, runtime) => { return chatRouter; }; const toSse = (event, data) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; -// 先把 opencode 的 Part 结构压平成 Agent API 的 SSE 语义。 +const getErrorMessage = (error) => error.data?.message ?? error.name; +const streamPromptResponse = async ({ runtime, opencodeSessionId, clientSessionId, message, signal, write, }) => { + const eventStream = await runtime.subscribeEvents(); + const iterator = eventStream[Symbol.asyncIterator](); + const emittedToolParts = new Set(); + const partTypes = new Map(); + const pendingTextDeltas = new Map(); + let emittedText = false; + let done = false; + let promptSettled = false; + let aborted = signal?.aborted ?? false; + const abortPromise = signal + ? new Promise((resolve) => { + if (signal.aborted) { + resolve({ type: "abort" }); + return; + } + signal.addEventListener("abort", () => resolve({ type: "abort" }), { + once: true, + }); + }) + : null; + write("progress", { + session_id: clientSessionId, + id: "request-received", + phase: "start", + status: "running", + title: "已收到请求,正在启动 Agent 分析", + }); + const promptPromise = runtime + .prompt(opencodeSessionId, message) + .then(() => { + promptSettled = true; + }) + .catch((error) => { + promptSettled = true; + throw error; + }); + try { + while (!done) { + if (signal?.aborted) { + aborted = true; + break; + } + const nextEvent = iterator + .next() + .then((result) => ({ type: "event", result })); + const nextPrompt = promptSettled + ? null + : promptPromise.then(() => ({ type: "prompt" }), (error) => ({ type: "prompt-error", error })); + const next = await Promise.race([ + ...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]), + ...(abortPromise ? [abortPromise] : []), + ]); + if (next.type === "abort") { + aborted = true; + break; + } + if (next.type === "prompt-error") { + throw next.error; + } + if (next.type === "prompt") { + continue; + } + if (next.result.done) { + break; + } + const event = next.result.value; + if (!isSessionEvent(event, opencodeSessionId)) { + continue; + } + if (event.type === "session.status") { + write("progress", { + session_id: clientSessionId, + id: "session-status", + phase: "session", + status: event.properties.status.type === "idle" ? "completed" : "running", + title: event.properties.status.type === "retry" + ? `模型请求重试中:${event.properties.status.message}` + : event.properties.status.type === "busy" + ? "Agent 正在处理请求" + : "Agent 已空闲", + }); + continue; + } + if (event.type === "message.part.delta" && event.properties.field === "text") { + const partType = partTypes.get(event.properties.partID); + if (partType === "text") { + emittedText = true; + write("token", { + session_id: clientSessionId, + content: event.properties.delta, + }); + } + else if (!partType) { + const pending = pendingTextDeltas.get(event.properties.partID) ?? []; + pending.push(event.properties.delta); + pendingTextDeltas.set(event.properties.partID, pending); + } + continue; + } + if (event.type === "message.part.updated") { + const part = event.properties.part; + partTypes.set(part.id, part.type); + if (part.type === "text") { + const pending = pendingTextDeltas.get(part.id) ?? []; + pendingTextDeltas.delete(part.id); + for (const content of pending) { + emittedText = true; + write("token", { + session_id: clientSessionId, + content, + }); + } + } + else if (part.type === "reasoning") { + pendingTextDeltas.delete(part.id); + write("progress", { + session_id: clientSessionId, + id: part.id, + phase: "planning", + status: part.time.end ? "completed" : "running", + title: part.time.end ? "分析规划完成" : "正在规划分析步骤", + }); + } + if (part.type === "tool") { + write("progress", { + session_id: clientSessionId, + id: part.id, + phase: "tool", + status: normalizeToolStatus(part.state.status), + title: getToolProgressTitle(part.tool, part.state.status), + detail: part.state.status === "error" ? part.state.error : undefined, + }); + if (!emittedToolParts.has(part.id)) { + emittedToolParts.add(part.id); + write("tool_call", { + session_id: clientSessionId, + tool: part.tool, + params: part.state.input, + }); + } + } + continue; + } + if (event.type === "todo.updated") { + const completed = event.properties.todos.filter((todo) => todo.status === "completed").length; + write("progress", { + session_id: clientSessionId, + id: "todo-progress", + phase: "planning", + status: completed === event.properties.todos.length ? "completed" : "running", + title: `计划进度 ${completed}/${event.properties.todos.length}`, + detail: event.properties.todos + .map((todo) => `${todo.status}: ${todo.content}`) + .join("\n"), + }); + continue; + } + if (event.type === "session.error") { + write("error", { + session_id: clientSessionId, + message: event.properties.error + ? getErrorMessage(event.properties.error) + : "opencode session error", + detail: event.properties.error?.name, + }); + done = true; + continue; + } + if (event.type === "session.idle") { + write("progress", { + session_id: clientSessionId, + id: "session-status", + phase: "session", + status: "completed", + title: "Agent 已完成处理", + }); + done = true; + } + } + if (aborted) { + await runtime.abortSession(opencodeSessionId).catch((error) => { + logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); + }); + return; + } + await promptPromise; + if (!emittedText) { + await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write); + } + write("progress", { + session_id: clientSessionId, + id: "request-received", + phase: "start", + status: "completed", + title: "请求处理完成", + }); + write("progress", { + session_id: clientSessionId, + id: "request-completed", + phase: "complete", + status: "completed", + title: "分析完成", + }); + write("done", { session_id: clientSessionId }); + } + finally { + await iterator.return?.(undefined); + if (!promptSettled) { + await promptPromise.catch(() => undefined); + } + } +}; +const isSessionEvent = (event, sessionId) => "properties" in event && + typeof event.properties === "object" && + event.properties !== null && + "sessionID" in event.properties && + event.properties.sessionID === sessionId; +const emitFallbackMessage = async (runtime, opencodeSessionId, clientSessionId, write) => { + const messages = await runtime.messages(opencodeSessionId); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const parts = assistantMessage?.parts ?? []; + const text = collectTextContent(parts); + if (text) { + write("token", { + session_id: clientSessionId, + content: text, + }); + } +}; const collectTextContent = (parts) => parts .filter((part) => part.type === "text") .map((part) => part.text) .join(""); -const collectToolCalls = (parts) => parts - .filter((part) => part.type === "tool") - .map((part) => ({ - tool: part.tool, - params: part.state.input, -})); -const getErrorMessage = (error) => error.data?.message ?? error.name; +const normalizeToolStatus = (status) => { + if (status === "completed") + return "completed"; + if (status === "error") + return "error"; + return "running"; +}; +const getToolProgressTitle = (tool, status) => { + const toolName = toolLabels[tool] ?? tool; + if (status === "completed") + return `${toolName} 已完成`; + if (status === "error") + return `${toolName} 执行失败`; + if (status === "pending") + return `准备调用 ${toolName}`; + return `正在调用 ${toolName}`; +}; +const toolLabels = { + dynamic_http_call: "后端数据查询", + locate_features: "地图定位", + view_history: "历史数据面板", + view_scada: "SCADA 面板", + show_chart: "图表渲染", +}; diff --git a/dist/runtime/opencode.js b/dist/runtime/opencode.js index 7bc19ac..2936c2d 100644 --- a/dist/runtime/opencode.js +++ b/dist/runtime/opencode.js @@ -30,19 +30,34 @@ export class OpencodeRuntimeAdapter { return requireData(response.data, "session.get"); } async sendPrompt(sessionId, text) { + await this.prompt(sessionId, text); + // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, + // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + return this.messages(sessionId); + } + async prompt(sessionId, text) { const client = await this.ensureClient(); await client.session.prompt({ sessionID: sessionId, parts: [{ type: "text", text }], }); - // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, - // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + } + async messages(sessionId, limit = 20) { + const client = await this.ensureClient(); const messages = await client.session.messages({ sessionID: sessionId, - limit: 20, + limit, }); return requireData(messages.data, "session.messages"); } + async forkSession(sessionId, messageId) { + const client = await this.ensureClient(); + const response = await client.session.fork({ + sessionID: sessionId, + messageID: messageId, + }); + return requireData(response.data, "session.fork"); + } async abortSession(sessionId) { const client = await this.ensureClient(); const response = await client.session.abort({ diff --git a/dist/tools/dynamicHttpExecutor.js b/dist/tools/dynamicHttpExecutor.js index 6f6a08f..d493516 100644 --- a/dist/tools/dynamicHttpExecutor.js +++ b/dist/tools/dynamicHttpExecutor.js @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { config } from "../config.js"; +import { logger } from "../logger.js"; const allowedMethods = new Set(["GET", "POST", "PUT", "PATCH", "DELETE"]); const resultStore = new Map(); export class DynamicHttpExecutor { @@ -28,11 +29,21 @@ export class DynamicHttpExecutor { if (context.projectId) { headers.set("x-project-id", context.projectId); } + const startedAt = Date.now(); const response = await fetch(url, { method, headers, signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS), }); + const durationMs = Date.now() - startedAt; + logger.info({ + method, + path, + statusCode: response.status, + durationMs, + traceId: context.traceId, + projectId: context.projectId, + }, "dynamic_http_call completed"); const contentType = response.headers.get("content-type") ?? ""; const rawText = await response.text(); const data = contentType.includes("application/json") && rawText diff --git a/opencode.json b/opencode.json index ebcbb1d..d3ea282 100644 --- a/opencode.json +++ b/opencode.json @@ -1,6 +1,13 @@ { "$schema": "https://opencode.ai/config.json", - "model": "anthropic/claude-sonnet-4-5", + "provider": { + "deepseek": { + "options": { + "apiKey": "{env:DEEPSEEK_API_KEY}" + } + } + }, + "model": "deepseek/deepseek-v4-pro", "server": { "hostname": "127.0.0.1", "port": 4096 diff --git a/src/chat/sessionBridge.ts b/src/chat/sessionBridge.ts index 062d640..beb8dbd 100644 --- a/src/chat/sessionBridge.ts +++ b/src/chat/sessionBridge.ts @@ -70,6 +70,99 @@ export class ChatSessionBridge { return this.sessionContexts.get(sessionId) ?? null; } + async abort(context: { + clientSessionId?: string; + accessToken?: string; + projectId?: string; + traceId?: string; + }): Promise { + const clientSessionId = context.clientSessionId?.trim(); + if (!clientSessionId) { + return null; + } + + const requestContext: ChatRequestContext = { + clientSessionId, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + + this.cleanupExpired(); + + const binding = this.registry.get(requestContext); + if (!binding) { + return null; + } + + this.sessionContexts.set(binding.sessionId, requestContext); + await this.runtime.abortSession(binding.sessionId); + return binding; + } + + async fork(context: { + clientSessionId?: string; + accessToken?: string; + projectId?: string; + traceId?: string; + keepMessageCount: number; + }): Promise<{ + binding: SessionBinding; + requestContext: ChatRequestContext; + created: boolean; + }> { + const currentClientSessionId = context.clientSessionId?.trim(); + const nextRequestContext: ChatRequestContext = { + clientSessionId: `agent-${randomUUID().slice(0, 12)}`, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + + this.cleanupExpired(); + + if (!currentClientSessionId || context.keepMessageCount <= 0) { + const session = await this.runtime.createSession(nextRequestContext.clientSessionId); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } + + const currentContext: ChatRequestContext = { + clientSessionId: currentClientSessionId, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: nextRequestContext.traceId, + }; + + const current = this.registry.get(currentContext); + if (!current) { + const session = await this.runtime.createSession(nextRequestContext.clientSessionId); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } + + await this.runtime.getSession(current.sessionId); + const messages = await this.runtime.messages( + current.sessionId, + Math.max(100, context.keepMessageCount + 20), + ); + const chatMessages = messages.filter( + (message) => message.info.role === "user" || message.info.role === "assistant", + ); + const keepMessage = chatMessages[context.keepMessageCount - 1]; + + if (!keepMessage) { + throw new Error(`fork keep point not found for message count ${context.keepMessageCount}`); + } + + const session = await this.runtime.forkSession(current.sessionId, keepMessage.info.id); + const binding = this.registry.upsert(nextRequestContext, session.id); + this.sessionContexts.set(binding.sessionId, nextRequestContext); + return { binding, requestContext: nextRequestContext, created: true }; + } + cleanupExpired(): void { const expiredSessionIds = this.registry.evictExpired(); for (const sessionId of expiredSessionIds) { diff --git a/src/config.ts b/src/config.ts index 8c65ec0..e8fd3ea 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,9 @@ +import dotenv from "dotenv"; import { z } from "zod"; +// 本地开发可在项目根目录放 .local.env;已存在的系统环境变量优先级更高。 +dotenv.config({ path: ".local.env", override: false }); + // 统一在启动时解析环境变量,避免业务代码里散落字符串默认值。 const envSchema = z.object({ NODE_ENV: z.string().default("development"), @@ -10,7 +14,7 @@ const envSchema = z.object({ OPENCODE_HOSTNAME: z.string().default("127.0.0.1"), OPENCODE_PORT: z.coerce.number().int().positive().default(4096), OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000), - OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"), + OPENCODE_MODEL: z.string().default("deepseek/deepseek-v4-pro"), OPENCODE_BASE_URL: z.string().optional(), OPENCODE_SERVER_PASSWORD: z.string().optional(), OPENCODE_SERVER_USERNAME: z.string().default("opencode"), diff --git a/src/routes/chat.ts b/src/routes/chat.ts index 8d97c42..898bbad 100644 --- a/src/routes/chat.ts +++ b/src/routes/chat.ts @@ -1,4 +1,4 @@ -import type { Part } from "@opencode-ai/sdk/v2"; +import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2"; import { Router } from "express"; import { z } from "zod"; @@ -11,12 +11,125 @@ const payloadSchema = z.object({ session_id: z.string().max(128).optional(), }); +const abortPayloadSchema = z.object({ + session_id: z.string().max(128), +}); + +const forkPayloadSchema = z.object({ + session_id: z.string().max(128).optional(), + keep_message_count: z.coerce.number().int().min(0), +}); + export const buildChatRouter = ( sessionBridge: ChatSessionBridge, runtime: OpencodeRuntimeAdapter, ) => { const chatRouter = Router(); + chatRouter.post("/abort", async (req, res) => { + const parsed = abortPayloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + + const binding = await sessionBridge.abort({ + clientSessionId: parsed.data.session_id, + accessToken, + projectId, + traceId, + }); + + if (!binding) { + res.status(204).end(); + return; + } + + logger.info( + { + clientSessionId: parsed.data.session_id, + sessionId: binding.sessionId, + traceId, + projectId, + }, + "aborted chat session by client request", + ); + res.status(202).json({ + session_id: parsed.data.session_id, + aborted: true, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat abort failed"); + res.status(500).json({ + message: "chat abort failed", + detail, + }); + } + }); + + chatRouter.post("/fork", async (req, res) => { + const parsed = forkPayloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + + const { binding, requestContext } = await sessionBridge.fork({ + clientSessionId: parsed.data.session_id, + accessToken, + projectId, + traceId, + keepMessageCount: parsed.data.keep_message_count, + }); + + logger.info( + { + sourceClientSessionId: parsed.data.session_id, + clientSessionId: requestContext.clientSessionId, + sessionId: binding.sessionId, + traceId: requestContext.traceId, + projectId: requestContext.projectId, + keepMessageCount: parsed.data.keep_message_count, + }, + "forked chat session", + ); + + res.status(200).json({ + session_id: requestContext.clientSessionId, + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat fork failed"); + res.status(500).json({ + message: "chat fork failed", + detail, + }); + } + }); + chatRouter.post("/stream", async (req, res) => { const parsed = payloadSchema.safeParse(req.body); if (!parsed.success) { @@ -53,62 +166,49 @@ export const buildChatRouter = ( "processing chat request", ); - // 当前先走“发送 prompt 后回读最近消息”的 opencode SDK 实现。 - // 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。 - const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message); - const assistantMessage = messages.find( - (message) => message.info.role === "assistant", - ); - res.status(200); res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders?.(); const clientSessionId = requestContext.clientSessionId; - const parts = assistantMessage?.parts ?? []; - const textContent = collectTextContent(parts); - if (textContent) { - res.write( - toSse("token", { - session_id: clientSessionId, - content: textContent, - }), - ); + let streamClosed = false; + const abortController = new AbortController(); + const handleClientClose = () => { + if (streamClosed || abortController.signal.aborted) { + return; + } + abortController.abort(); + }; + + req.on("close", handleClientClose); + res.on("close", handleClientClose); + + try { + await streamPromptResponse({ + runtime, + opencodeSessionId: binding.sessionId, + clientSessionId, + message: parsed.data.message, + signal: abortController.signal, + write: (event, data) => { + if (streamClosed || res.writableEnded || res.destroyed) { + return; + } + res.write(toSse(event, data)); + }, + }); + } finally { + streamClosed = true; + req.off("close", handleClientClose); + res.off("close", handleClientClose); } - for (const toolCall of collectToolCalls(parts)) { - res.write( - toSse("tool_call", { - session_id: clientSessionId, - tool: toolCall.tool, - params: toolCall.params, - }), - ); + if (!res.writableEnded && !res.destroyed) { + res.end(); } - - if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) { - res.write( - toSse("error", { - session_id: clientSessionId, - message: getErrorMessage(assistantMessage.info.error), - detail: assistantMessage.info.error.name, - }), - ); - } else if (!assistantMessage) { - res.write( - toSse("error", { - session_id: clientSessionId, - message: "assistant response unavailable", - detail: "no assistant message found after prompt", - }), - ); - } else { - res.write(toSse("done", { session_id: clientSessionId })); - } - - res.end(); } catch (error) { const detail = error instanceof Error ? error.message : String(error); logger.error({ err: error }, "chat stream failed"); @@ -125,22 +225,313 @@ export const buildChatRouter = ( const toSse = (event: string, data: Record) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; -// 先把 opencode 的 Part 结构压平成 Agent API 的 SSE 语义。 +const getErrorMessage = (error: { + name: string; + data?: { message?: string }; +}) => error.data?.message ?? error.name; + +type StreamPromptOptions = { + runtime: OpencodeRuntimeAdapter; + opencodeSessionId: string; + clientSessionId: string; + message: string; + signal?: AbortSignal; + write: (event: string, data: Record) => void; +}; + +const streamPromptResponse = async ({ + runtime, + opencodeSessionId, + clientSessionId, + message, + signal, + write, +}: StreamPromptOptions) => { + const eventStream = await runtime.subscribeEvents(); + const iterator = eventStream[Symbol.asyncIterator](); + const emittedToolParts = new Set(); + const partTypes = new Map(); + const pendingTextDeltas = new Map(); + let emittedText = false; + let done = false; + let promptSettled = false; + let aborted = signal?.aborted ?? false; + + const abortPromise = signal + ? new Promise<{ type: "abort" }>((resolve) => { + if (signal.aborted) { + resolve({ type: "abort" }); + return; + } + signal.addEventListener("abort", () => resolve({ type: "abort" }), { + once: true, + }); + }) + : null; + + write("progress", { + session_id: clientSessionId, + id: "request-received", + phase: "start", + status: "running", + title: "已收到请求,正在启动 Agent 分析", + }); + + const promptPromise = runtime + .prompt(opencodeSessionId, message) + .then(() => { + promptSettled = true; + }) + .catch((error: unknown) => { + promptSettled = true; + throw error; + }); + + try { + while (!done) { + if (signal?.aborted) { + aborted = true; + break; + } + + const nextEvent = iterator + .next() + .then((result) => ({ type: "event" as const, result })); + const nextPrompt = promptSettled + ? null + : promptPromise.then( + () => ({ type: "prompt" as const }), + (error: unknown) => ({ type: "prompt-error" as const, error }), + ); + const next = await Promise.race( + [ + ...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]), + ...(abortPromise ? [abortPromise] : []), + ], + ); + + if (next.type === "abort") { + aborted = true; + break; + } + + if (next.type === "prompt-error") { + throw next.error; + } + if (next.type === "prompt") { + continue; + } + if (next.result.done) { + break; + } + + const event = next.result.value as OpencodeEvent; + if (!isSessionEvent(event, opencodeSessionId)) { + continue; + } + + if (event.type === "session.status") { + write("progress", { + session_id: clientSessionId, + id: "session-status", + phase: "session", + status: event.properties.status.type === "idle" ? "completed" : "running", + title: + event.properties.status.type === "retry" + ? `模型请求重试中:${event.properties.status.message}` + : event.properties.status.type === "busy" + ? "Agent 正在处理请求" + : "Agent 已空闲", + }); + continue; + } + + if (event.type === "message.part.delta" && event.properties.field === "text") { + const partType = partTypes.get(event.properties.partID); + if (partType === "text") { + emittedText = true; + write("token", { + session_id: clientSessionId, + content: event.properties.delta, + }); + } else if (!partType) { + const pending = pendingTextDeltas.get(event.properties.partID) ?? []; + pending.push(event.properties.delta); + pendingTextDeltas.set(event.properties.partID, pending); + } + continue; + } + + if (event.type === "message.part.updated") { + const part = event.properties.part; + partTypes.set(part.id, part.type); + if (part.type === "text") { + const pending = pendingTextDeltas.get(part.id) ?? []; + pendingTextDeltas.delete(part.id); + for (const content of pending) { + emittedText = true; + write("token", { + session_id: clientSessionId, + content, + }); + } + } else if (part.type === "reasoning") { + pendingTextDeltas.delete(part.id); + write("progress", { + session_id: clientSessionId, + id: part.id, + phase: "planning", + status: part.time.end ? "completed" : "running", + title: part.time.end ? "分析规划完成" : "正在规划分析步骤", + }); + } + if (part.type === "tool") { + write("progress", { + session_id: clientSessionId, + id: part.id, + phase: "tool", + status: normalizeToolStatus(part.state.status), + title: getToolProgressTitle(part.tool, part.state.status), + detail: part.state.status === "error" ? part.state.error : undefined, + }); + if (!emittedToolParts.has(part.id)) { + emittedToolParts.add(part.id); + write("tool_call", { + session_id: clientSessionId, + tool: part.tool, + params: part.state.input, + }); + } + } + continue; + } + + if (event.type === "todo.updated") { + const completed = event.properties.todos.filter( + (todo) => todo.status === "completed", + ).length; + write("progress", { + session_id: clientSessionId, + id: "todo-progress", + phase: "planning", + status: completed === event.properties.todos.length ? "completed" : "running", + title: `计划进度 ${completed}/${event.properties.todos.length}`, + detail: event.properties.todos + .map((todo) => `${todo.status}: ${todo.content}`) + .join("\n"), + }); + continue; + } + + if (event.type === "session.error") { + write("error", { + session_id: clientSessionId, + message: event.properties.error + ? getErrorMessage(event.properties.error) + : "opencode session error", + detail: event.properties.error?.name, + }); + done = true; + continue; + } + + if (event.type === "session.idle") { + write("progress", { + session_id: clientSessionId, + id: "session-status", + phase: "session", + status: "completed", + title: "Agent 已完成处理", + }); + done = true; + } + } + + if (aborted) { + await runtime.abortSession(opencodeSessionId).catch((error) => { + logger.warn({ sessionId: opencodeSessionId, err: error }, "failed to abort opencode session"); + }); + return; + } + + await promptPromise; + if (!emittedText) { + await emitFallbackMessage(runtime, opencodeSessionId, clientSessionId, write); + } + write("progress", { + session_id: clientSessionId, + id: "request-received", + phase: "start", + status: "completed", + title: "请求处理完成", + }); + write("progress", { + session_id: clientSessionId, + id: "request-completed", + phase: "complete", + status: "completed", + title: "分析完成", + }); + write("done", { session_id: clientSessionId }); + } finally { + await iterator.return?.(undefined); + if (!promptSettled) { + await promptPromise.catch(() => undefined); + } + } +}; + +const isSessionEvent = (event: OpencodeEvent, sessionId: string) => + "properties" in event && + typeof event.properties === "object" && + event.properties !== null && + "sessionID" in event.properties && + event.properties.sessionID === sessionId; + +const emitFallbackMessage = async ( + runtime: OpencodeRuntimeAdapter, + opencodeSessionId: string, + clientSessionId: string, + write: (event: string, data: Record) => void, +) => { + const messages = await runtime.messages(opencodeSessionId); + const assistantMessage = [...messages] + .reverse() + .find((message) => message.info.role === "assistant"); + const parts = assistantMessage?.parts ?? []; + const text = collectTextContent(parts); + if (text) { + write("token", { + session_id: clientSessionId, + content: text, + }); + } +}; + const collectTextContent = (parts: Part[]) => parts .filter((part): part is Extract => part.type === "text") .map((part) => part.text) .join(""); -const collectToolCalls = (parts: Part[]) => - parts - .filter((part): part is Extract => part.type === "tool") - .map((part) => ({ - tool: part.tool, - params: part.state.input, - })); +const normalizeToolStatus = (status: string) => { + if (status === "completed") return "completed"; + if (status === "error") return "error"; + return "running"; +}; -const getErrorMessage = (error: { - name: string; - data?: { message?: string }; -}) => error.data?.message ?? error.name; +const getToolProgressTitle = (tool: string, status: string) => { + const toolName = toolLabels[tool] ?? tool; + if (status === "completed") return `${toolName} 已完成`; + if (status === "error") return `${toolName} 执行失败`; + if (status === "pending") return `准备调用 ${toolName}`; + return `正在调用 ${toolName}`; +}; + +const toolLabels: Record = { + dynamic_http_call: "后端数据查询", + locate_features: "地图定位", + view_history: "历史数据面板", + view_scada: "SCADA 面板", + show_chart: "图表渲染", +}; diff --git a/src/runtime/opencode.ts b/src/runtime/opencode.ts index 9f504a6..678747e 100644 --- a/src/runtime/opencode.ts +++ b/src/runtime/opencode.ts @@ -46,20 +46,38 @@ export class OpencodeRuntimeAdapter { } async sendPrompt(sessionId: string, text: string) { + await this.prompt(sessionId, text); + // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, + // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + return this.messages(sessionId); + } + + async prompt(sessionId: string, text: string) { const client = await this.ensureClient(); await client.session.prompt({ sessionID: sessionId, parts: [{ type: "text", text }], }); - // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, - // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + } + + async messages(sessionId: string, limit = 20) { + const client = await this.ensureClient(); const messages = await client.session.messages({ sessionID: sessionId, - limit: 20, + limit, }); return requireData(messages.data, "session.messages"); } + async forkSession(sessionId: string, messageId?: string) { + const client = await this.ensureClient(); + const response = await client.session.fork({ + sessionID: sessionId, + messageID: messageId, + }); + return requireData(response.data, "session.fork"); + } + async abortSession(sessionId: string) { const client = await this.ensureClient(); const response = await client.session.abort({ diff --git a/src/tools/dynamicHttpExecutor.ts b/src/tools/dynamicHttpExecutor.ts index 216acaa..7f8d3a1 100644 --- a/src/tools/dynamicHttpExecutor.ts +++ b/src/tools/dynamicHttpExecutor.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import { config } from "../config.js"; +import { logger } from "../logger.js"; export type DynamicHttpInput = { path: string; @@ -53,11 +54,24 @@ export class DynamicHttpExecutor { headers.set("x-project-id", context.projectId); } + const startedAt = Date.now(); const response = await fetch(url, { method, headers, signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS), }); + const durationMs = Date.now() - startedAt; + logger.info( + { + method, + path, + statusCode: response.status, + durationMs, + traceId: context.traceId, + projectId: context.projectId, + }, + "dynamic_http_call completed", + ); const contentType = response.headers.get("content-type") ?? ""; const rawText = await response.text();