diff --git a/.opencode/agents/tjwater-assistant.md b/.opencode/agents/tjwater-assistant.md new file mode 100644 index 0000000..941200b --- /dev/null +++ b/.opencode/agents/tjwater-assistant.md @@ -0,0 +1,16 @@ +--- +description: TJWater default assistant for water-network analysis and operator workflows +mode: primary +model: anthropic/claude-sonnet-4-5 +temperature: 0.2 +--- +You are the default TJWater assistant running on opencode. + +Operate with these rules: + +1. Follow the loaded TJWater instructions, skills index, runbook, and examples as the primary domain guidance. +2. Prefer `dynamic_http_call` when you need backend data for reasoning, summaries, diagnosis, or analysis. +3. Prefer frontend tools (`locate_features`, `view_history`, `view_scada`, `show_chart`) when the user mainly needs UI actions or visualization. +4. Treat frontend tools as display/interaction tools only. Do not assume they return data. +5. Keep replies accurate, concise, and operationally useful for water-network users. +6. Respect user authorization and project isolation. Never invent backend results when a tool call fails or no data is available. diff --git a/.opencode/tools/dynamic_http_call.ts b/.opencode/tools/dynamic_http_call.ts new file mode 100644 index 0000000..79be8c0 --- /dev/null +++ b/.opencode/tools/dynamic_http_call.ts @@ -0,0 +1,41 @@ +import { tool } from "@opencode-ai/plugin"; + +const internalBaseUrl = process.env.TJWATER_AGENT_INTERNAL_BASE_URL ?? "http://127.0.0.1:8788"; +const internalToken = process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; + +export default tool({ + description: + "Call the TJWater backend API through the local agent bridge. Provide path, optional method, and query arguments.", + args: { + path: tool.schema.string().describe("Target backend API path, starting with '/'."), + method: tool.schema + .string() + .optional() + .describe("HTTP method. Defaults to GET."), + arguments: tool.schema + .record(tool.schema.string(), tool.schema.unknown()) + .optional() + .describe("Query arguments object."), + }, + async execute(args, context) { + const response = await fetch(`${internalBaseUrl}/internal/tools/dynamic-http-call`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-agent-internal-token": internalToken, + }, + body: JSON.stringify({ + sessionId: context.sessionID, + path: args.path, + method: args.method, + arguments: args.arguments, + }), + }); + + const text = await response.text(); + if (!response.ok) { + throw new Error(text); + } + return text; + }, +}); diff --git a/.opencode/tools/locate_features.ts b/.opencode/tools/locate_features.ts new file mode 100644 index 0000000..3c5081e --- /dev/null +++ b/.opencode/tools/locate_features.ts @@ -0,0 +1,14 @@ +import { tool } from "@opencode-ai/plugin"; + +export default tool({ + description: "Locate and highlight TJWater map features in the frontend.", + args: { + ids: tool.schema.array(tool.schema.string()).describe("Feature ids to locate."), + feature_type: tool.schema + .enum(["junction", "pipe", "valve", "reservoir", "pump", "tank"]) + .describe("Type of feature to locate."), + }, + async execute() { + return "已在地图上定位到指定要素。"; + }, +}); diff --git a/.opencode/tools/show_chart.ts b/.opencode/tools/show_chart.ts new file mode 100644 index 0000000..201aa3d --- /dev/null +++ b/.opencode/tools/show_chart.ts @@ -0,0 +1,27 @@ +import { tool } from "@opencode-ai/plugin"; + +export default tool({ + description: "Render a chart in the frontend chat UI.", + args: { + title: tool.schema.string().optional().describe("Chart title."), + chart_type: tool.schema + .enum(["line", "bar", "pie"]) + .optional() + .describe("Chart type."), + x_data: tool.schema.array(tool.schema.string()).describe("X-axis labels."), + series: tool.schema + .array( + tool.schema.object({ + name: tool.schema.string(), + data: tool.schema.array(tool.schema.number()), + type: tool.schema.enum(["line", "bar"]).optional(), + }), + ) + .describe("Series data."), + x_axis_name: tool.schema.string().optional().describe("X-axis display name."), + y_axis_name: tool.schema.string().optional().describe("Y-axis display name."), + }, + async execute() { + return "图表将在对话中显示。"; + }, +}); diff --git a/.opencode/tools/view_history.ts b/.opencode/tools/view_history.ts new file mode 100644 index 0000000..455281b --- /dev/null +++ b/.opencode/tools/view_history.ts @@ -0,0 +1,18 @@ +import { tool } from "@opencode-ai/plugin"; + +export default tool({ + description: "Open the frontend history panel for selected features.", + args: { + feature_infos: tool.schema + .array(tool.schema.tuple([tool.schema.string(), tool.schema.string()])) + .describe("List of [id, type] pairs."), + data_type: tool.schema + .enum(["realtime", "scheme", "none"]) + .describe("History data source type."), + start_time: tool.schema.string().optional().describe("Optional ISO8601 start time."), + end_time: tool.schema.string().optional().describe("Optional ISO8601 end time."), + }, + async execute() { + return "已打开计算结果面板。"; + }, +}); diff --git a/.opencode/tools/view_scada.ts b/.opencode/tools/view_scada.ts new file mode 100644 index 0000000..5f094b8 --- /dev/null +++ b/.opencode/tools/view_scada.ts @@ -0,0 +1,21 @@ +import { tool } from "@opencode-ai/plugin"; + +export default tool({ + description: "Open the frontend SCADA history panel.", + args: { + device_ids: tool.schema + .array(tool.schema.string()) + .optional() + .describe("Preferred SCADA device ids."), + device_id: tool.schema.string().optional().describe("Single SCADA device id."), + feature_infos: tool.schema + .array(tool.schema.tuple([tool.schema.string(), tool.schema.string()])) + .optional() + .describe("Legacy [id, type] pairs."), + start_time: tool.schema.string().optional().describe("Optional ISO8601 start time."), + end_time: tool.schema.string().optional().describe("Optional ISO8601 end time."), + }, + async execute() { + return "已打开 SCADA 监测面板。"; + }, +}); diff --git a/.opencode/tsconfig.json b/.opencode/tsconfig.json new file mode 100644 index 0000000..113c370 --- /dev/null +++ b/.opencode/tsconfig.json @@ -0,0 +1,12 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + "types": ["node"] + }, + "include": ["tools/**/*.ts", "plugins/**/*.ts"] +} diff --git a/dist/chat/sessionBridge.js b/dist/chat/sessionBridge.js new file mode 100644 index 0000000..3d3ebb2 --- /dev/null +++ b/dist/chat/sessionBridge.js @@ -0,0 +1,57 @@ +import { randomUUID } from "node:crypto"; +import { logger } from "../logger.js"; +export class ChatSessionBridge { + registry; + runtime; + // 这里额外保存 session -> 用户上下文,供工具桥在服务端代发真实后端请求时复用。 + sessionContexts = new Map(); + constructor(registry, runtime) { + this.registry = registry; + this.runtime = runtime; + } + async resolve(context) { + const requestContext = { + conversationId: context.conversationId?.trim() || `conv-${randomUUID().slice(0, 12)}`, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + this.cleanupExpired(); + const current = this.registry.get(requestContext); + if (current) { + this.sessionContexts.set(current.sessionId, requestContext); + try { + // 只有 opencode 侧 session 仍存在时,才复用本地映射。 + await this.runtime.getSession(current.sessionId); + return { binding: current, requestContext, created: false }; + } + catch (error) { + logger.warn({ + conversationId: requestContext.conversationId, + sessionId: current.sessionId, + err: error, + }, "existing opencode session lookup failed, creating a new session"); + } + } + const session = await this.runtime.createSession(requestContext.conversationId); + const binding = this.registry.upsert(requestContext, session.id); + this.sessionContexts.set(binding.sessionId, requestContext); + return { binding, requestContext, created: true }; + } + count() { + return this.registry.count(); + } + getSessionContext(sessionId) { + return this.sessionContexts.get(sessionId) ?? null; + } + cleanupExpired() { + const expiredSessionIds = this.registry.evictExpired(); + for (const sessionId of expiredSessionIds) { + this.sessionContexts.delete(sessionId); + // 这里用 abort 做轻量清理;即使失败,也不阻断本地过期回收。 + void this.runtime.abortSession(sessionId).catch((error) => { + logger.debug({ sessionId, err: error }, "ignoring failed abort for expired session"); + }); + } + } +} diff --git a/dist/config.js b/dist/config.js new file mode 100644 index 0000000..1a55ea5 --- /dev/null +++ b/dist/config.js @@ -0,0 +1,21 @@ +import { z } from "zod"; +const envSchema = z.object({ + NODE_ENV: z.string().default("development"), + PORT: z.coerce.number().int().positive().default(8788), + HOST: z.string().default("0.0.0.0"), + LOG_LEVEL: z.string().default("info"), + AGENT_INTERNAL_TOKEN: z.string().optional(), + OPENCODE_HOSTNAME: z.string().default("127.0.0.1"), + OPENCODE_PORT: z.coerce.number().int().positive().default(4096), + OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000), + OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"), + OPENCODE_BASE_URL: z.string().optional(), + OPENCODE_SERVER_PASSWORD: z.string().optional(), + OPENCODE_SERVER_USERNAME: z.string().default("opencode"), + SESSION_TTL_SECONDS: z.coerce.number().int().positive().default(1800), + TJWATER_API_BASE_URL: z.string().default("http://127.0.0.1:8000"), + TJWATER_API_TIMEOUT_MS: z.coerce.number().int().positive().default(30000), + MAX_INLINE_RESULT_BYTES: z.coerce.number().int().positive().default(12000), + MAX_PREVIEW_SAMPLE_ITEMS: z.coerce.number().int().positive().default(3), +}); +export const config = envSchema.parse(process.env); diff --git a/dist/logger.js b/dist/logger.js new file mode 100644 index 0000000..dc37b46 --- /dev/null +++ b/dist/logger.js @@ -0,0 +1,13 @@ +import pino from "pino"; +import { config } from "./config.js"; +export const logger = pino({ + level: config.LOG_LEVEL, + transport: config.NODE_ENV === "development" + ? { + target: "pino-pretty", + options: { + colorize: true, + }, + } + : undefined, +}); diff --git a/dist/routes/chat.js b/dist/routes/chat.js new file mode 100644 index 0000000..3d63e98 --- /dev/null +++ b/dist/routes/chat.js @@ -0,0 +1,106 @@ +import { Router } from "express"; +import { z } from "zod"; +import { logger } from "../logger.js"; +const payloadSchema = z.object({ + message: z.string().min(1).max(10000), + conversation_id: z.string().max(128).optional(), +}); +export const buildChatRouter = (sessionBridge, runtime) => { + const chatRouter = Router(); + chatRouter.post("/stream", async (req, res) => { + const parsed = payloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + const { binding, requestContext, created } = await sessionBridge.resolve({ + conversationId: parsed.data.conversation_id, + accessToken, + projectId, + traceId, + }); + logger.info({ + conversationId: requestContext.conversationId, + sessionId: binding.sessionId, + created, + traceId: requestContext.traceId, + projectId: requestContext.projectId, + }, "processing chat request"); + // 当前先走“发送 prompt 后回读最近消息”的兼容实现。 + // 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。 + const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message); + const assistantMessage = messages.find((message) => message.info.role === "assistant"); + res.status(200); + res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + const conversationId = requestContext.conversationId; + const parts = assistantMessage?.parts ?? []; + const textContent = collectTextContent(parts); + if (textContent) { + res.write(toSse("token", { + conversationId, + content: textContent, + })); + } + for (const toolCall of collectToolCalls(parts)) { + res.write(toSse("tool_call", { + conversationId, + tool: toolCall.tool, + params: toolCall.params, + })); + } + if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) { + res.write(toSse("error", { + conversationId, + message: getErrorMessage(assistantMessage.info.error), + detail: assistantMessage.info.error.name, + })); + } + else if (!assistantMessage) { + res.write(toSse("error", { + conversationId, + message: "assistant response unavailable", + detail: "no assistant message found after prompt", + })); + } + else { + res.write(toSse("done", { conversationId })); + } + res.end(); + } + catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat stream failed"); + res.status(500).json({ + message: "chat stream failed", + detail, + }); + } + }); + return chatRouter; +}; +const toSse = (event, data) => `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; +// 先把 opencode 的 Part 结构压平成前端当前消费的 SSE 语义。 +const collectTextContent = (parts) => parts + .filter((part) => part.type === "text") + .map((part) => part.text) + .join(""); +const collectToolCalls = (parts) => parts + .filter((part) => part.type === "tool") + .map((part) => ({ + tool: part.tool, + params: part.state.input, +})); +const getErrorMessage = (error) => error.data?.message ?? error.name; diff --git a/dist/runtime/opencode.js b/dist/runtime/opencode.js new file mode 100644 index 0000000..7bc19ac --- /dev/null +++ b/dist/runtime/opencode.js @@ -0,0 +1,100 @@ +import { createOpencode, createOpencodeClient, } from "@opencode-ai/sdk/v2"; +import { config } from "../config.js"; +import { logger } from "../logger.js"; +export class OpencodeRuntimeAdapter { + clientPromise = null; + closeServer = null; + async ensureClient() { + if (!this.clientPromise) { + this.clientPromise = this.bootstrapClient(); + } + return this.clientPromise; + } + async health() { + const client = await this.ensureClient(); + const response = await client.global.health(); + return requireData(response.data, "global.health"); + } + async createSession(title) { + const client = await this.ensureClient(); + const response = await client.session.create({ + title, + }); + return requireData(response.data, "session.create"); + } + async getSession(id) { + const client = await this.ensureClient(); + const response = await client.session.get({ + sessionID: id, + }); + return requireData(response.data, "session.get"); + } + async sendPrompt(sessionId, text) { + const client = await this.ensureClient(); + await client.session.prompt({ + sessionID: sessionId, + parts: [{ type: "text", text }], + }); + // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, + // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + const messages = await client.session.messages({ + sessionID: sessionId, + limit: 20, + }); + return requireData(messages.data, "session.messages"); + } + async abortSession(sessionId) { + const client = await this.ensureClient(); + const response = await client.session.abort({ + sessionID: sessionId, + }); + return requireData(response.data, "session.abort"); + } + async subscribeEvents() { + const client = await this.ensureClient(); + const response = await client.event.subscribe(); + return response.stream; + } + async dispose() { + this.closeServer?.(); + this.closeServer = null; + this.clientPromise = null; + } + async bootstrapClient() { + if (config.OPENCODE_BASE_URL) { + logger.info({ baseUrl: config.OPENCODE_BASE_URL }, "connecting to external opencode server"); + return createOpencodeClient({ + baseUrl: config.OPENCODE_BASE_URL, + }); + } + // embedded 模式下,把服务内工具桥地址注入到 opencode 进程环境里, + // 这样 .opencode/tools 下的自定义工具可以回调本服务。 + process.env.TJWATER_AGENT_INTERNAL_BASE_URL = `http://127.0.0.1:${config.PORT}`; + process.env.TJWATER_AGENT_INTERNAL_TOKEN = + config.AGENT_INTERNAL_TOKEN ?? process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; + logger.info({ + hostname: config.OPENCODE_HOSTNAME, + port: config.OPENCODE_PORT, + model: config.OPENCODE_MODEL, + }, "starting embedded opencode server"); + const runtime = await createOpencode({ + hostname: config.OPENCODE_HOSTNAME, + port: config.OPENCODE_PORT, + timeout: config.OPENCODE_TIMEOUT_MS, + config: { + model: config.OPENCODE_MODEL, + }, + }); + this.closeServer = () => { + runtime.server.close(); + }; + return runtime.client; + } +} +export const opencodeRuntime = new OpencodeRuntimeAdapter(); +function requireData(data, operation) { + if (data === undefined) { + throw new Error(`${operation} returned no data`); + } + return data; +} diff --git a/dist/server.js b/dist/server.js new file mode 100644 index 0000000..a11523e --- /dev/null +++ b/dist/server.js @@ -0,0 +1,81 @@ +import { randomUUID } from "node:crypto"; +import cors from "cors"; +import express from "express"; +import { ChatSessionBridge } from "./chat/sessionBridge.js"; +import { config } from "./config.js"; +import { logger } from "./logger.js"; +import { buildChatRouter } from "./routes/chat.js"; +import { opencodeRuntime } from "./runtime/opencode.js"; +import { SessionRegistry } from "./session/registry.js"; +import { dynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js"; +const app = express(); +const registry = new SessionRegistry(config.SESSION_TTL_SECONDS); +const sessionBridge = new ChatSessionBridge(registry, opencodeRuntime); +const internalToken = config.AGENT_INTERNAL_TOKEN ?? randomUUID(); +process.env.TJWATER_AGENT_INTERNAL_TOKEN = internalToken; +app.use(cors()); +app.use(express.json({ limit: "1mb" })); +app.get("/health", async (_req, res) => { + try { + const runtime = await opencodeRuntime.health(); + res.json({ + ok: true, + runtime, + sessions: sessionBridge.count(), + }); + } + catch (error) { + const detail = error instanceof Error ? error.message : String(error); + res.status(503).json({ + ok: false, + message: "opencode runtime unavailable", + detail, + sessions: sessionBridge.count(), + }); + } +}); +app.post("/internal/tools/dynamic-http-call", async (req, res) => { + if (req.header("x-agent-internal-token") !== internalToken) { + res.status(403).json({ message: "forbidden" }); + return; + } + const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const context = sessionBridge.getSessionContext(sessionId); + if (!context) { + res.status(404).json({ + message: "session context not found", + detail: sessionId, + }); + return; + } + try { + const result = await dynamicHttpExecutor.execute({ + path: req.body?.path, + method: req.body?.method, + arguments: req.body?.arguments, + }, context); + res.json(result); + } + catch (error) { + const detail = error instanceof Error ? error.message : String(error); + res.status(400).json({ + message: "dynamic http execution failed", + detail, + }); + } +}); +app.use("/api/v1/copilot/chat", buildChatRouter(sessionBridge, opencodeRuntime)); +const server = app.listen(config.PORT, config.HOST, () => { + logger.info({ host: config.HOST, port: config.PORT }, "TJWaterAgent listening"); +}); +const shutdown = async () => { + logger.info("shutting down TJWaterAgent"); + server.close(); + await opencodeRuntime.dispose(); +}; +process.on("SIGINT", () => { + void shutdown(); +}); +process.on("SIGTERM", () => { + void shutdown(); +}); diff --git a/dist/session/registry.js b/dist/session/registry.js new file mode 100644 index 0000000..b181c22 --- /dev/null +++ b/dist/session/registry.js @@ -0,0 +1,56 @@ +import crypto from "node:crypto"; +export class SessionRegistry { + ttlMs; + bindings = new Map(); + constructor(ttlSeconds) { + this.ttlMs = ttlSeconds * 1000; + } + upsert(context, sessionId) { + const binding = { + conversationId: context.conversationId, + sessionId, + lastUsedAt: Date.now(), + }; + this.bindings.set(this.makeKey(context), binding); + return binding; + } + get(context) { + const key = this.makeKey(context); + const binding = this.bindings.get(key); + if (!binding) { + return null; + } + if (Date.now() - binding.lastUsedAt > this.ttlMs) { + this.bindings.delete(key); + return null; + } + binding.lastUsedAt = Date.now(); + return binding; + } + count() { + this.evictExpired(); + return this.bindings.size; + } + evictExpired() { + const expired = []; + const now = Date.now(); + for (const [key, binding] of this.bindings.entries()) { + if (now - binding.lastUsedAt > this.ttlMs) { + expired.push(binding.sessionId); + this.bindings.delete(key); + } + } + return expired; + } + makeKey(context) { + const digest = crypto + .createHash("sha256") + .update([ + context.conversationId, + context.accessToken ?? "", + context.projectId ?? "", + ].join("|")) + .digest("hex"); + return digest; + } +} diff --git a/dist/tools/dynamicHttpExecutor.js b/dist/tools/dynamicHttpExecutor.js new file mode 100644 index 0000000..6f6a08f --- /dev/null +++ b/dist/tools/dynamicHttpExecutor.js @@ -0,0 +1,143 @@ +import { randomUUID } from "node:crypto"; +import { config } from "../config.js"; +const allowedMethods = new Set(["GET", "POST", "PUT", "PATCH", "DELETE"]); +const resultStore = new Map(); +export class DynamicHttpExecutor { + async execute(input, context) { + const method = (input.method ?? "GET").trim().toUpperCase(); + if (!allowedMethods.has(method)) { + throw new Error(`unsupported method: ${method}`); + } + const path = input.path.trim(); + if (!path.startsWith("/")) { + throw new Error("path must start with '/'"); + } + const query = buildQuery(input.arguments ?? {}); + const url = new URL(path, config.TJWATER_API_BASE_URL); + for (const [key, value] of query) { + url.searchParams.append(key, value); + } + // 这里复用 chat session 绑定的用户上下文,保持后端鉴权与项目隔离语义不变。 + const headers = new Headers({ + Accept: "application/json", + "x-trace-id": context.traceId, + }); + if (context.accessToken) { + headers.set("Authorization", `Bearer ${context.accessToken}`); + } + if (context.projectId) { + headers.set("x-project-id", context.projectId); + } + const response = await fetch(url, { + method, + headers, + signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS), + }); + const contentType = response.headers.get("content-type") ?? ""; + const rawText = await response.text(); + const data = contentType.includes("application/json") && rawText + ? JSON.parse(rawText) + : rawText; + if (!response.ok) { + return { + ok: false, + trace_id: context.traceId, + upstream: { + method, + path, + status_code: response.status, + }, + error: { + message: "upstream API returned error", + detail: data, + }, + }; + } + return { + ok: true, + trace_id: context.traceId, + upstream: { + method, + path, + status_code: response.status, + }, + ...normalizeSuccessResult(data, context), + }; + } + getResult(resultRef) { + return resultStore.get(resultRef); + } +} +export const dynamicHttpExecutor = new DynamicHttpExecutor(); +const buildQuery = (argumentsObject) => { + const pairs = []; + for (const [key, value] of Object.entries(argumentsObject)) { + if (value === undefined || value === null) { + continue; + } + if (Array.isArray(value)) { + if (value.length === 0) { + continue; + } + pairs.push([key, value.map(String).join(",")]); + continue; + } + pairs.push([key, String(value)]); + } + return pairs; +}; +const normalizeSuccessResult = (data, context) => { + const sizeBytes = estimateBytes(data); + if (sizeBytes <= config.MAX_INLINE_RESULT_BYTES) { + return { + result_mode: "inline", + result_size_bytes: sizeBytes, + data, + }; + } + const resultRef = `res-${randomUUID().slice(0, 16)}`; + // 大结果先落本地引用,避免工具输出把模型上下文直接撑爆。 + resultStore.set(resultRef, { + rawResult: data, + traceId: context.traceId, + projectId: context.projectId, + }); + return { + result_mode: "referenced", + result_size_bytes: sizeBytes, + result_ref: resultRef, + preview: buildPreview(data), + }; +}; +const estimateBytes = (data) => Buffer.byteLength(JSON.stringify(data)); +const buildPreview = (data) => { + if (Array.isArray(data)) { + const sample = data.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS); + const fields = sample.length > 0 && isRecord(sample[0]) + ? Object.keys(sample[0]).slice(0, 30) + : []; + return { + count: data.length, + fields, + sample, + summary: `list[${data.length}]`, + }; + } + if (isRecord(data)) { + const fields = Object.keys(data).slice(0, 30); + const sample = Object.fromEntries(fields.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS).map((field) => [field, data[field]])); + return { + count: fields.length, + fields, + sample, + summary: `object<${fields.length} fields>`, + }; + } + return { + count: 1, + fields: [], + sample: String(data).slice(0, 300), + summary: `scalar<${typeof data}>`, + }; +}; +const isRecord = (value) => typeof value === "object" && value !== null && !Array.isArray(value); diff --git a/opencode.json b/opencode.json new file mode 100644 index 0000000..3b48e0e --- /dev/null +++ b/opencode.json @@ -0,0 +1,15 @@ +{ + "$schema": "https://opencode.ai/config.json", + "model": "anthropic/claude-sonnet-4-5", + "server": { + "hostname": "127.0.0.1", + "port": 4096 + }, + "default_agent": "tjwater-assistant", + "instructions": [ + "../TJWaterCopilot/.github/copilot-instructions.md", + "../TJWaterCopilot/.github/skills/SKILL.md", + "../TJWaterCopilot/.github/skills/runbook.md", + "../TJWaterCopilot/.github/skills/examples.md" + ] +} diff --git a/src/chat/sessionBridge.ts b/src/chat/sessionBridge.ts new file mode 100644 index 0000000..dfe8d86 --- /dev/null +++ b/src/chat/sessionBridge.ts @@ -0,0 +1,82 @@ +import { randomUUID } from "node:crypto"; + +import { logger } from "../logger.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; +import { type SessionBinding, type SessionContext, SessionRegistry } from "../session/registry.js"; + +export type ChatRequestContext = SessionContext & { + traceId: string; +}; + +export class ChatSessionBridge { + // 这里额外保存 session -> 用户上下文,供工具桥在服务端代发真实后端请求时复用。 + private readonly sessionContexts = new Map(); + + constructor( + private readonly registry: SessionRegistry, + private readonly runtime: OpencodeRuntimeAdapter, + ) {} + + async resolve(context: { + conversationId?: string; + accessToken?: string; + projectId?: string; + traceId?: string; + }): Promise<{ + binding: SessionBinding; + requestContext: ChatRequestContext; + created: boolean; + }> { + const requestContext: ChatRequestContext = { + conversationId: context.conversationId?.trim() || `conv-${randomUUID().slice(0, 12)}`, + accessToken: context.accessToken, + projectId: context.projectId, + traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`, + }; + + this.cleanupExpired(); + + const current = this.registry.get(requestContext); + if (current) { + this.sessionContexts.set(current.sessionId, requestContext); + try { + // 只有 opencode 侧 session 仍存在时,才复用本地映射。 + await this.runtime.getSession(current.sessionId); + return { binding: current, requestContext, created: false }; + } catch (error) { + logger.warn( + { + conversationId: requestContext.conversationId, + sessionId: current.sessionId, + err: error, + }, + "existing opencode session lookup failed, creating a new session", + ); + } + } + + const session = await this.runtime.createSession(requestContext.conversationId); + const binding = this.registry.upsert(requestContext, session.id); + this.sessionContexts.set(binding.sessionId, requestContext); + return { binding, requestContext, created: true }; + } + + count(): number { + return this.registry.count(); + } + + getSessionContext(sessionId: string) { + return this.sessionContexts.get(sessionId) ?? null; + } + + cleanupExpired(): void { + const expiredSessionIds = this.registry.evictExpired(); + for (const sessionId of expiredSessionIds) { + this.sessionContexts.delete(sessionId); + // 这里用 abort 做轻量清理;即使失败,也不阻断本地过期回收。 + void this.runtime.abortSession(sessionId).catch((error) => { + logger.debug({ sessionId, err: error }, "ignoring failed abort for expired session"); + }); + } + } +} diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..d7ef2e2 --- /dev/null +++ b/src/config.ts @@ -0,0 +1,25 @@ +import { z } from "zod"; + +const envSchema = z.object({ + NODE_ENV: z.string().default("development"), + PORT: z.coerce.number().int().positive().default(8788), + HOST: z.string().default("0.0.0.0"), + LOG_LEVEL: z.string().default("info"), + AGENT_INTERNAL_TOKEN: z.string().optional(), + OPENCODE_HOSTNAME: z.string().default("127.0.0.1"), + OPENCODE_PORT: z.coerce.number().int().positive().default(4096), + OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000), + OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"), + OPENCODE_BASE_URL: z.string().optional(), + OPENCODE_SERVER_PASSWORD: z.string().optional(), + OPENCODE_SERVER_USERNAME: z.string().default("opencode"), + SESSION_TTL_SECONDS: z.coerce.number().int().positive().default(1800), + TJWATER_API_BASE_URL: z.string().default("http://127.0.0.1:8000"), + TJWATER_API_TIMEOUT_MS: z.coerce.number().int().positive().default(30000), + MAX_INLINE_RESULT_BYTES: z.coerce.number().int().positive().default(12000), + MAX_PREVIEW_SAMPLE_ITEMS: z.coerce.number().int().positive().default(3), +}); + +export type AppConfig = z.infer; + +export const config: AppConfig = envSchema.parse(process.env); diff --git a/src/logger.ts b/src/logger.ts new file mode 100644 index 0000000..db02175 --- /dev/null +++ b/src/logger.ts @@ -0,0 +1,16 @@ +import pino from "pino"; + +import { config } from "./config.js"; + +export const logger = pino({ + level: config.LOG_LEVEL, + transport: + config.NODE_ENV === "development" + ? { + target: "pino-pretty", + options: { + colorize: true, + }, + } + : undefined, +}); diff --git a/src/routes/chat.ts b/src/routes/chat.ts new file mode 100644 index 0000000..39eca9a --- /dev/null +++ b/src/routes/chat.ts @@ -0,0 +1,146 @@ +import type { Part } from "@opencode-ai/sdk/v2"; +import { Router } from "express"; +import { z } from "zod"; + +import { logger } from "../logger.js"; +import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js"; +import { type ChatSessionBridge } from "../chat/sessionBridge.js"; + +const payloadSchema = z.object({ + message: z.string().min(1).max(10000), + conversation_id: z.string().max(128).optional(), +}); + +export const buildChatRouter = ( + sessionBridge: ChatSessionBridge, + runtime: OpencodeRuntimeAdapter, +) => { + const chatRouter = Router(); + + chatRouter.post("/stream", async (req, res) => { + const parsed = payloadSchema.safeParse(req.body); + if (!parsed.success) { + res.status(400).json({ + message: "invalid request payload", + detail: parsed.error.flatten(), + }); + return; + } + + try { + const authHeader = req.header("authorization"); + const accessToken = authHeader?.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + const projectId = req.header("x-project-id") ?? undefined; + const traceId = req.header("x-trace-id") ?? undefined; + + const { binding, requestContext, created } = await sessionBridge.resolve({ + conversationId: parsed.data.conversation_id, + accessToken, + projectId, + traceId, + }); + + logger.info( + { + conversationId: requestContext.conversationId, + sessionId: binding.sessionId, + created, + traceId: requestContext.traceId, + projectId: requestContext.projectId, + }, + "processing chat request", + ); + + // 当前先走“发送 prompt 后回读最近消息”的兼容实现。 + // 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。 + const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message); + const assistantMessage = messages.find( + (message) => message.info.role === "assistant", + ); + + res.status(200); + res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + + const conversationId = requestContext.conversationId; + const parts = assistantMessage?.parts ?? []; + const textContent = collectTextContent(parts); + if (textContent) { + res.write( + toSse("token", { + conversationId, + content: textContent, + }), + ); + } + + for (const toolCall of collectToolCalls(parts)) { + res.write( + toSse("tool_call", { + conversationId, + tool: toolCall.tool, + params: toolCall.params, + }), + ); + } + + if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) { + res.write( + toSse("error", { + conversationId, + message: getErrorMessage(assistantMessage.info.error), + detail: assistantMessage.info.error.name, + }), + ); + } else if (!assistantMessage) { + res.write( + toSse("error", { + conversationId, + message: "assistant response unavailable", + detail: "no assistant message found after prompt", + }), + ); + } else { + res.write(toSse("done", { conversationId })); + } + + res.end(); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + logger.error({ err: error }, "chat stream failed"); + res.status(500).json({ + message: "chat stream failed", + detail, + }); + } + }); + + return chatRouter; +}; + +const toSse = (event: string, data: Record) => + `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; + +// 先把 opencode 的 Part 结构压平成前端当前消费的 SSE 语义。 +const collectTextContent = (parts: Part[]) => + parts + .filter((part): part is Extract => part.type === "text") + .map((part) => part.text) + .join(""); + +const collectToolCalls = (parts: Part[]) => + parts + .filter((part): part is Extract => part.type === "tool") + .map((part) => ({ + tool: part.tool, + params: part.state.input, + })); + +const getErrorMessage = (error: { + name: string; + data?: { message?: string }; +}) => error.data?.message ?? error.name; diff --git a/src/runtime/opencode.ts b/src/runtime/opencode.ts new file mode 100644 index 0000000..9f504a6 --- /dev/null +++ b/src/runtime/opencode.ts @@ -0,0 +1,133 @@ +import { + createOpencode, + createOpencodeClient, + type OpencodeClient, +} from "@opencode-ai/sdk/v2"; + +import { config } from "../config.js"; +import { logger } from "../logger.js"; + +export type RuntimeHealth = { + healthy: boolean; + version: string; +}; + +export class OpencodeRuntimeAdapter { + private clientPromise: Promise | null = null; + private closeServer: (() => void) | null = null; + + async ensureClient(): Promise { + if (!this.clientPromise) { + this.clientPromise = this.bootstrapClient(); + } + return this.clientPromise; + } + + async health(): Promise { + const client = await this.ensureClient(); + const response = await client.global.health(); + return requireData(response.data, "global.health"); + } + + async createSession(title?: string) { + const client = await this.ensureClient(); + const response = await client.session.create({ + title, + }); + return requireData(response.data, "session.create"); + } + + async getSession(id: string) { + const client = await this.ensureClient(); + const response = await client.session.get({ + sessionID: id, + }); + return requireData(response.data, "session.get"); + } + + async sendPrompt(sessionId: string, text: string) { + const client = await this.ensureClient(); + await client.session.prompt({ + sessionID: sessionId, + parts: [{ type: "text", text }], + }); + // 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts, + // 所以这里紧跟一次 messages() 回读,给上层路由统一消费。 + const messages = await client.session.messages({ + sessionID: sessionId, + limit: 20, + }); + return requireData(messages.data, "session.messages"); + } + + async abortSession(sessionId: string) { + const client = await this.ensureClient(); + const response = await client.session.abort({ + sessionID: sessionId, + }); + return requireData(response.data, "session.abort"); + } + + async subscribeEvents() { + const client = await this.ensureClient(); + const response = await client.event.subscribe(); + return response.stream; + } + + async dispose(): Promise { + this.closeServer?.(); + this.closeServer = null; + this.clientPromise = null; + } + + private async bootstrapClient(): Promise { + if (config.OPENCODE_BASE_URL) { + logger.info( + { baseUrl: config.OPENCODE_BASE_URL }, + "connecting to external opencode server", + ); + return createOpencodeClient({ + baseUrl: config.OPENCODE_BASE_URL, + }); + } + + // embedded 模式下,把服务内工具桥地址注入到 opencode 进程环境里, + // 这样 .opencode/tools 下的自定义工具可以回调本服务。 + process.env.TJWATER_AGENT_INTERNAL_BASE_URL = `http://127.0.0.1:${config.PORT}`; + process.env.TJWATER_AGENT_INTERNAL_TOKEN = + config.AGENT_INTERNAL_TOKEN ?? process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? ""; + + logger.info( + { + hostname: config.OPENCODE_HOSTNAME, + port: config.OPENCODE_PORT, + model: config.OPENCODE_MODEL, + }, + "starting embedded opencode server", + ); + + const runtime = await createOpencode({ + hostname: config.OPENCODE_HOSTNAME, + port: config.OPENCODE_PORT, + timeout: config.OPENCODE_TIMEOUT_MS, + config: { + model: config.OPENCODE_MODEL, + }, + }); + + this.closeServer = () => { + runtime.server.close(); + }; + + return runtime.client; + } +} + +export const opencodeRuntime = new OpencodeRuntimeAdapter(); + +function requireData(data: T | undefined, operation: string): T { + if (data === undefined) { + throw new Error(`${operation} returned no data`); + } + return data; +} diff --git a/src/server.ts b/src/server.ts new file mode 100644 index 0000000..f0c8af8 --- /dev/null +++ b/src/server.ts @@ -0,0 +1,99 @@ +import { randomUUID } from "node:crypto"; + +import cors from "cors"; +import express from "express"; + +import { ChatSessionBridge } from "./chat/sessionBridge.js"; +import { config } from "./config.js"; +import { logger } from "./logger.js"; +import { buildChatRouter } from "./routes/chat.js"; +import { opencodeRuntime } from "./runtime/opencode.js"; +import { SessionRegistry } from "./session/registry.js"; +import { dynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js"; + +const app = express(); +const registry = new SessionRegistry(config.SESSION_TTL_SECONDS); +const sessionBridge = new ChatSessionBridge(registry, opencodeRuntime); +const internalToken = config.AGENT_INTERNAL_TOKEN ?? randomUUID(); + +process.env.TJWATER_AGENT_INTERNAL_TOKEN = internalToken; + +app.use(cors()); +app.use(express.json({ limit: "1mb" })); + +app.get("/health", async (_req, res) => { + try { + const runtime = await opencodeRuntime.health(); + res.json({ + ok: true, + runtime, + sessions: sessionBridge.count(), + }); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + res.status(503).json({ + ok: false, + message: "opencode runtime unavailable", + detail, + sessions: sessionBridge.count(), + }); + } +}); + +app.post("/internal/tools/dynamic-http-call", async (req, res) => { + if (req.header("x-agent-internal-token") !== internalToken) { + res.status(403).json({ message: "forbidden" }); + return; + } + + const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : ""; + const context = sessionBridge.getSessionContext(sessionId); + if (!context) { + res.status(404).json({ + message: "session context not found", + detail: sessionId, + }); + return; + } + + try { + const result = await dynamicHttpExecutor.execute( + { + path: req.body?.path, + method: req.body?.method, + arguments: req.body?.arguments, + }, + context, + ); + res.json(result); + } catch (error) { + const detail = error instanceof Error ? error.message : String(error); + res.status(400).json({ + message: "dynamic http execution failed", + detail, + }); + } +}); + +app.use("/api/v1/copilot/chat", buildChatRouter(sessionBridge, opencodeRuntime)); + +const server = app.listen(config.PORT, config.HOST, () => { + logger.info( + { host: config.HOST, port: config.PORT }, + "TJWaterAgent listening", + ); +}); + +const shutdown = async () => { + logger.info("shutting down TJWaterAgent"); + server.close(); + await opencodeRuntime.dispose(); +}; + +process.on("SIGINT", () => { + void shutdown(); +}); + +process.on("SIGTERM", () => { + void shutdown(); +}); diff --git a/src/session/registry.ts b/src/session/registry.ts new file mode 100644 index 0000000..04260e6 --- /dev/null +++ b/src/session/registry.ts @@ -0,0 +1,78 @@ +import crypto from "node:crypto"; + +export type SessionBinding = { + conversationId: string; + sessionId: string; + lastUsedAt: number; +}; + +export type SessionContext = { + conversationId: string; + accessToken?: string; + projectId?: string; +}; + +export class SessionRegistry { + private readonly ttlMs: number; + private readonly bindings = new Map(); + + constructor(ttlSeconds: number) { + this.ttlMs = ttlSeconds * 1000; + } + + upsert(context: SessionContext, sessionId: string): SessionBinding { + const binding: SessionBinding = { + conversationId: context.conversationId, + sessionId, + lastUsedAt: Date.now(), + }; + this.bindings.set(this.makeKey(context), binding); + return binding; + } + + get(context: SessionContext): SessionBinding | null { + const key = this.makeKey(context); + const binding = this.bindings.get(key); + if (!binding) { + return null; + } + if (Date.now() - binding.lastUsedAt > this.ttlMs) { + this.bindings.delete(key); + return null; + } + binding.lastUsedAt = Date.now(); + return binding; + } + + count(): number { + this.evictExpired(); + return this.bindings.size; + } + + evictExpired(): string[] { + const expired: string[] = []; + const now = Date.now(); + for (const [key, binding] of this.bindings.entries()) { + if (now - binding.lastUsedAt > this.ttlMs) { + expired.push(binding.sessionId); + this.bindings.delete(key); + } + } + return expired; + } + + private makeKey(context: SessionContext): string { + const digest = crypto + .createHash("sha256") + .update( + [ + context.conversationId, + context.accessToken ?? "", + context.projectId ?? "", + ].join("|"), + ) + .digest("hex"); + + return digest; + } +} diff --git a/src/tools/dynamicHttpExecutor.ts b/src/tools/dynamicHttpExecutor.ts new file mode 100644 index 0000000..216acaa --- /dev/null +++ b/src/tools/dynamicHttpExecutor.ts @@ -0,0 +1,187 @@ +import { randomUUID } from "node:crypto"; + +import { config } from "../config.js"; + +export type DynamicHttpInput = { + path: string; + method?: string; + arguments?: Record; +}; + +export type SessionToolContext = { + accessToken?: string; + projectId?: string; + traceId: string; +}; + +type StoredResult = { + rawResult: unknown; + traceId: string; + projectId?: string; +}; + +const allowedMethods = new Set(["GET", "POST", "PUT", "PATCH", "DELETE"]); +const resultStore = new Map(); + +export class DynamicHttpExecutor { + async execute(input: DynamicHttpInput, context: SessionToolContext) { + const method = (input.method ?? "GET").trim().toUpperCase(); + if (!allowedMethods.has(method)) { + throw new Error(`unsupported method: ${method}`); + } + + const path = input.path.trim(); + if (!path.startsWith("/")) { + throw new Error("path must start with '/'"); + } + + const query = buildQuery(input.arguments ?? {}); + const url = new URL(path, config.TJWATER_API_BASE_URL); + for (const [key, value] of query) { + url.searchParams.append(key, value); + } + + // 这里复用 chat session 绑定的用户上下文,保持后端鉴权与项目隔离语义不变。 + const headers = new Headers({ + Accept: "application/json", + "x-trace-id": context.traceId, + }); + if (context.accessToken) { + headers.set("Authorization", `Bearer ${context.accessToken}`); + } + if (context.projectId) { + headers.set("x-project-id", context.projectId); + } + + const response = await fetch(url, { + method, + headers, + signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS), + }); + + const contentType = response.headers.get("content-type") ?? ""; + const rawText = await response.text(); + const data = + contentType.includes("application/json") && rawText + ? JSON.parse(rawText) + : rawText; + + if (!response.ok) { + return { + ok: false, + trace_id: context.traceId, + upstream: { + method, + path, + status_code: response.status, + }, + error: { + message: "upstream API returned error", + detail: data, + }, + }; + } + + return { + ok: true, + trace_id: context.traceId, + upstream: { + method, + path, + status_code: response.status, + }, + ...normalizeSuccessResult(data, context), + }; + } + + getResult(resultRef: string) { + return resultStore.get(resultRef); + } +} + +export const dynamicHttpExecutor = new DynamicHttpExecutor(); + +const buildQuery = (argumentsObject: Record) => { + const pairs: Array<[string, string]> = []; + for (const [key, value] of Object.entries(argumentsObject)) { + if (value === undefined || value === null) { + continue; + } + if (Array.isArray(value)) { + if (value.length === 0) { + continue; + } + pairs.push([key, value.map(String).join(",")]); + continue; + } + pairs.push([key, String(value)]); + } + return pairs; +}; + +const normalizeSuccessResult = (data: unknown, context: SessionToolContext) => { + const sizeBytes = estimateBytes(data); + if (sizeBytes <= config.MAX_INLINE_RESULT_BYTES) { + return { + result_mode: "inline", + result_size_bytes: sizeBytes, + data, + }; + } + + const resultRef = `res-${randomUUID().slice(0, 16)}`; + // 大结果先落本地引用,避免工具输出把模型上下文直接撑爆。 + resultStore.set(resultRef, { + rawResult: data, + traceId: context.traceId, + projectId: context.projectId, + }); + + return { + result_mode: "referenced", + result_size_bytes: sizeBytes, + result_ref: resultRef, + preview: buildPreview(data), + }; +}; + +const estimateBytes = (data: unknown) => Buffer.byteLength(JSON.stringify(data)); + +const buildPreview = (data: unknown) => { + if (Array.isArray(data)) { + const sample = data.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS); + const fields = + sample.length > 0 && isRecord(sample[0]) + ? Object.keys(sample[0]).slice(0, 30) + : []; + return { + count: data.length, + fields, + sample, + summary: `list[${data.length}]`, + }; + } + + if (isRecord(data)) { + const fields = Object.keys(data).slice(0, 30); + const sample = Object.fromEntries( + fields.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS).map((field) => [field, data[field]]), + ); + return { + count: fields.length, + fields, + sample, + summary: `object<${fields.length} fields>`, + }; + } + + return { + count: 1, + fields: [], + sample: String(data).slice(0, 300), + summary: `scalar<${typeof data}>`, + }; +}; + +const isRecord = (value: unknown): value is Record => + typeof value === "object" && value !== null && !Array.isArray(value); diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..091561c --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "strict": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "skipLibCheck": true, + "outDir": "dist", + "rootDir": "src", + "resolveJsonModule": true, + "allowSyntheticDefaultImports": true + }, + "include": ["src/**/*.ts"] +}