This commit is contained in:
2026-04-29 11:47:28 +08:00
parent 6b7978957a
commit d3e7baca99
25 changed files with 1523 additions and 0 deletions
+82
View File
@@ -0,0 +1,82 @@
import { randomUUID } from "node:crypto";
import { logger } from "../logger.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type SessionBinding, type SessionContext, SessionRegistry } from "../session/registry.js";
export type ChatRequestContext = SessionContext & {
traceId: string;
};
export class ChatSessionBridge {
// 这里额外保存 session -> 用户上下文,供工具桥在服务端代发真实后端请求时复用。
private readonly sessionContexts = new Map<string, ChatRequestContext>();
constructor(
private readonly registry: SessionRegistry,
private readonly runtime: OpencodeRuntimeAdapter,
) {}
async resolve(context: {
conversationId?: string;
accessToken?: string;
projectId?: string;
traceId?: string;
}): Promise<{
binding: SessionBinding;
requestContext: ChatRequestContext;
created: boolean;
}> {
const requestContext: ChatRequestContext = {
conversationId: context.conversationId?.trim() || `conv-${randomUUID().slice(0, 12)}`,
accessToken: context.accessToken,
projectId: context.projectId,
traceId: context.traceId?.trim() || `trace-${randomUUID().slice(0, 12)}`,
};
this.cleanupExpired();
const current = this.registry.get(requestContext);
if (current) {
this.sessionContexts.set(current.sessionId, requestContext);
try {
// 只有 opencode 侧 session 仍存在时,才复用本地映射。
await this.runtime.getSession(current.sessionId);
return { binding: current, requestContext, created: false };
} catch (error) {
logger.warn(
{
conversationId: requestContext.conversationId,
sessionId: current.sessionId,
err: error,
},
"existing opencode session lookup failed, creating a new session",
);
}
}
const session = await this.runtime.createSession(requestContext.conversationId);
const binding = this.registry.upsert(requestContext, session.id);
this.sessionContexts.set(binding.sessionId, requestContext);
return { binding, requestContext, created: true };
}
count(): number {
return this.registry.count();
}
getSessionContext(sessionId: string) {
return this.sessionContexts.get(sessionId) ?? null;
}
cleanupExpired(): void {
const expiredSessionIds = this.registry.evictExpired();
for (const sessionId of expiredSessionIds) {
this.sessionContexts.delete(sessionId);
// 这里用 abort 做轻量清理;即使失败,也不阻断本地过期回收。
void this.runtime.abortSession(sessionId).catch((error) => {
logger.debug({ sessionId, err: error }, "ignoring failed abort for expired session");
});
}
}
}
+25
View File
@@ -0,0 +1,25 @@
import { z } from "zod";
const envSchema = z.object({
NODE_ENV: z.string().default("development"),
PORT: z.coerce.number().int().positive().default(8788),
HOST: z.string().default("0.0.0.0"),
LOG_LEVEL: z.string().default("info"),
AGENT_INTERNAL_TOKEN: z.string().optional(),
OPENCODE_HOSTNAME: z.string().default("127.0.0.1"),
OPENCODE_PORT: z.coerce.number().int().positive().default(4096),
OPENCODE_TIMEOUT_MS: z.coerce.number().int().positive().default(5000),
OPENCODE_MODEL: z.string().default("anthropic/claude-sonnet-4-5"),
OPENCODE_BASE_URL: z.string().optional(),
OPENCODE_SERVER_PASSWORD: z.string().optional(),
OPENCODE_SERVER_USERNAME: z.string().default("opencode"),
SESSION_TTL_SECONDS: z.coerce.number().int().positive().default(1800),
TJWATER_API_BASE_URL: z.string().default("http://127.0.0.1:8000"),
TJWATER_API_TIMEOUT_MS: z.coerce.number().int().positive().default(30000),
MAX_INLINE_RESULT_BYTES: z.coerce.number().int().positive().default(12000),
MAX_PREVIEW_SAMPLE_ITEMS: z.coerce.number().int().positive().default(3),
});
export type AppConfig = z.infer<typeof envSchema>;
export const config: AppConfig = envSchema.parse(process.env);
+16
View File
@@ -0,0 +1,16 @@
import pino from "pino";
import { config } from "./config.js";
export const logger = pino({
level: config.LOG_LEVEL,
transport:
config.NODE_ENV === "development"
? {
target: "pino-pretty",
options: {
colorize: true,
},
}
: undefined,
});
+146
View File
@@ -0,0 +1,146 @@
import type { Part } from "@opencode-ai/sdk/v2";
import { Router } from "express";
import { z } from "zod";
import { logger } from "../logger.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { type ChatSessionBridge } from "../chat/sessionBridge.js";
const payloadSchema = z.object({
message: z.string().min(1).max(10000),
conversation_id: z.string().max(128).optional(),
});
export const buildChatRouter = (
sessionBridge: ChatSessionBridge,
runtime: OpencodeRuntimeAdapter,
) => {
const chatRouter = Router();
chatRouter.post("/stream", async (req, res) => {
const parsed = payloadSchema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({
message: "invalid request payload",
detail: parsed.error.flatten(),
});
return;
}
try {
const authHeader = req.header("authorization");
const accessToken = authHeader?.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;
const projectId = req.header("x-project-id") ?? undefined;
const traceId = req.header("x-trace-id") ?? undefined;
const { binding, requestContext, created } = await sessionBridge.resolve({
conversationId: parsed.data.conversation_id,
accessToken,
projectId,
traceId,
});
logger.info(
{
conversationId: requestContext.conversationId,
sessionId: binding.sessionId,
created,
traceId: requestContext.traceId,
projectId: requestContext.projectId,
},
"processing chat request",
);
// 当前先走“发送 prompt 后回读最近消息”的兼容实现。
// 后续切到真正的 opencode 事件流时,只需要替换这里的取数方式。
const messages = await runtime.sendPrompt(binding.sessionId, parsed.data.message);
const assistantMessage = messages.find(
(message) => message.info.role === "assistant",
);
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
const conversationId = requestContext.conversationId;
const parts = assistantMessage?.parts ?? [];
const textContent = collectTextContent(parts);
if (textContent) {
res.write(
toSse("token", {
conversationId,
content: textContent,
}),
);
}
for (const toolCall of collectToolCalls(parts)) {
res.write(
toSse("tool_call", {
conversationId,
tool: toolCall.tool,
params: toolCall.params,
}),
);
}
if (assistantMessage?.info.role === "assistant" && assistantMessage.info.error) {
res.write(
toSse("error", {
conversationId,
message: getErrorMessage(assistantMessage.info.error),
detail: assistantMessage.info.error.name,
}),
);
} else if (!assistantMessage) {
res.write(
toSse("error", {
conversationId,
message: "assistant response unavailable",
detail: "no assistant message found after prompt",
}),
);
} else {
res.write(toSse("done", { conversationId }));
}
res.end();
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
logger.error({ err: error }, "chat stream failed");
res.status(500).json({
message: "chat stream failed",
detail,
});
}
});
return chatRouter;
};
const toSse = (event: string, data: Record<string, unknown>) =>
`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
// 先把 opencode 的 Part 结构压平成前端当前消费的 SSE 语义。
const collectTextContent = (parts: Part[]) =>
parts
.filter((part): part is Extract<Part, { type: "text" }> => part.type === "text")
.map((part) => part.text)
.join("");
const collectToolCalls = (parts: Part[]) =>
parts
.filter((part): part is Extract<Part, { type: "tool" }> => part.type === "tool")
.map((part) => ({
tool: part.tool,
params: part.state.input,
}));
const getErrorMessage = (error: {
name: string;
data?: { message?: string };
}) => error.data?.message ?? error.name;
+133
View File
@@ -0,0 +1,133 @@
import {
createOpencode,
createOpencodeClient,
type OpencodeClient,
} from "@opencode-ai/sdk/v2";
import { config } from "../config.js";
import { logger } from "../logger.js";
export type RuntimeHealth = {
healthy: boolean;
version: string;
};
export class OpencodeRuntimeAdapter {
private clientPromise: Promise<OpencodeClient> | null = null;
private closeServer: (() => void) | null = null;
async ensureClient(): Promise<OpencodeClient> {
if (!this.clientPromise) {
this.clientPromise = this.bootstrapClient();
}
return this.clientPromise;
}
async health(): Promise<RuntimeHealth> {
const client = await this.ensureClient();
const response = await client.global.health();
return requireData(response.data, "global.health");
}
async createSession(title?: string) {
const client = await this.ensureClient();
const response = await client.session.create({
title,
});
return requireData(response.data, "session.create");
}
async getSession(id: string) {
const client = await this.ensureClient();
const response = await client.session.get({
sessionID: id,
});
return requireData(response.data, "session.get");
}
async sendPrompt(sessionId: string, text: string) {
const client = await this.ensureClient();
await client.session.prompt({
sessionID: sessionId,
parts: [{ type: "text", text }],
});
// 当前 SDK 响应风格下,prompt() 本身不会直接返回完整 assistant parts
// 所以这里紧跟一次 messages() 回读,给上层路由统一消费。
const messages = await client.session.messages({
sessionID: sessionId,
limit: 20,
});
return requireData(messages.data, "session.messages");
}
async abortSession(sessionId: string) {
const client = await this.ensureClient();
const response = await client.session.abort({
sessionID: sessionId,
});
return requireData(response.data, "session.abort");
}
async subscribeEvents() {
const client = await this.ensureClient();
const response = await client.event.subscribe();
return response.stream;
}
async dispose(): Promise<void> {
this.closeServer?.();
this.closeServer = null;
this.clientPromise = null;
}
private async bootstrapClient(): Promise<OpencodeClient> {
if (config.OPENCODE_BASE_URL) {
logger.info(
{ baseUrl: config.OPENCODE_BASE_URL },
"connecting to external opencode server",
);
return createOpencodeClient({
baseUrl: config.OPENCODE_BASE_URL,
});
}
// embedded 模式下,把服务内工具桥地址注入到 opencode 进程环境里,
// 这样 .opencode/tools 下的自定义工具可以回调本服务。
process.env.TJWATER_AGENT_INTERNAL_BASE_URL = `http://127.0.0.1:${config.PORT}`;
process.env.TJWATER_AGENT_INTERNAL_TOKEN =
config.AGENT_INTERNAL_TOKEN ?? process.env.TJWATER_AGENT_INTERNAL_TOKEN ?? "";
logger.info(
{
hostname: config.OPENCODE_HOSTNAME,
port: config.OPENCODE_PORT,
model: config.OPENCODE_MODEL,
},
"starting embedded opencode server",
);
const runtime = await createOpencode({
hostname: config.OPENCODE_HOSTNAME,
port: config.OPENCODE_PORT,
timeout: config.OPENCODE_TIMEOUT_MS,
config: {
model: config.OPENCODE_MODEL,
},
});
this.closeServer = () => {
runtime.server.close();
};
return runtime.client;
}
}
export const opencodeRuntime = new OpencodeRuntimeAdapter();
function requireData<T>(data: T | undefined, operation: string): T {
if (data === undefined) {
throw new Error(`${operation} returned no data`);
}
return data;
}
+99
View File
@@ -0,0 +1,99 @@
import { randomUUID } from "node:crypto";
import cors from "cors";
import express from "express";
import { ChatSessionBridge } from "./chat/sessionBridge.js";
import { config } from "./config.js";
import { logger } from "./logger.js";
import { buildChatRouter } from "./routes/chat.js";
import { opencodeRuntime } from "./runtime/opencode.js";
import { SessionRegistry } from "./session/registry.js";
import { dynamicHttpExecutor } from "./tools/dynamicHttpExecutor.js";
const app = express();
const registry = new SessionRegistry(config.SESSION_TTL_SECONDS);
const sessionBridge = new ChatSessionBridge(registry, opencodeRuntime);
const internalToken = config.AGENT_INTERNAL_TOKEN ?? randomUUID();
process.env.TJWATER_AGENT_INTERNAL_TOKEN = internalToken;
app.use(cors());
app.use(express.json({ limit: "1mb" }));
app.get("/health", async (_req, res) => {
try {
const runtime = await opencodeRuntime.health();
res.json({
ok: true,
runtime,
sessions: sessionBridge.count(),
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
res.status(503).json({
ok: false,
message: "opencode runtime unavailable",
detail,
sessions: sessionBridge.count(),
});
}
});
app.post("/internal/tools/dynamic-http-call", async (req, res) => {
if (req.header("x-agent-internal-token") !== internalToken) {
res.status(403).json({ message: "forbidden" });
return;
}
const sessionId = typeof req.body?.sessionId === "string" ? req.body.sessionId : "";
const context = sessionBridge.getSessionContext(sessionId);
if (!context) {
res.status(404).json({
message: "session context not found",
detail: sessionId,
});
return;
}
try {
const result = await dynamicHttpExecutor.execute(
{
path: req.body?.path,
method: req.body?.method,
arguments: req.body?.arguments,
},
context,
);
res.json(result);
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
res.status(400).json({
message: "dynamic http execution failed",
detail,
});
}
});
app.use("/api/v1/copilot/chat", buildChatRouter(sessionBridge, opencodeRuntime));
const server = app.listen(config.PORT, config.HOST, () => {
logger.info(
{ host: config.HOST, port: config.PORT },
"TJWaterAgent listening",
);
});
const shutdown = async () => {
logger.info("shutting down TJWaterAgent");
server.close();
await opencodeRuntime.dispose();
};
process.on("SIGINT", () => {
void shutdown();
});
process.on("SIGTERM", () => {
void shutdown();
});
+78
View File
@@ -0,0 +1,78 @@
import crypto from "node:crypto";
export type SessionBinding = {
conversationId: string;
sessionId: string;
lastUsedAt: number;
};
export type SessionContext = {
conversationId: string;
accessToken?: string;
projectId?: string;
};
export class SessionRegistry {
private readonly ttlMs: number;
private readonly bindings = new Map<string, SessionBinding>();
constructor(ttlSeconds: number) {
this.ttlMs = ttlSeconds * 1000;
}
upsert(context: SessionContext, sessionId: string): SessionBinding {
const binding: SessionBinding = {
conversationId: context.conversationId,
sessionId,
lastUsedAt: Date.now(),
};
this.bindings.set(this.makeKey(context), binding);
return binding;
}
get(context: SessionContext): SessionBinding | null {
const key = this.makeKey(context);
const binding = this.bindings.get(key);
if (!binding) {
return null;
}
if (Date.now() - binding.lastUsedAt > this.ttlMs) {
this.bindings.delete(key);
return null;
}
binding.lastUsedAt = Date.now();
return binding;
}
count(): number {
this.evictExpired();
return this.bindings.size;
}
evictExpired(): string[] {
const expired: string[] = [];
const now = Date.now();
for (const [key, binding] of this.bindings.entries()) {
if (now - binding.lastUsedAt > this.ttlMs) {
expired.push(binding.sessionId);
this.bindings.delete(key);
}
}
return expired;
}
private makeKey(context: SessionContext): string {
const digest = crypto
.createHash("sha256")
.update(
[
context.conversationId,
context.accessToken ?? "",
context.projectId ?? "",
].join("|"),
)
.digest("hex");
return digest;
}
}
+187
View File
@@ -0,0 +1,187 @@
import { randomUUID } from "node:crypto";
import { config } from "../config.js";
export type DynamicHttpInput = {
path: string;
method?: string;
arguments?: Record<string, unknown>;
};
export type SessionToolContext = {
accessToken?: string;
projectId?: string;
traceId: string;
};
type StoredResult = {
rawResult: unknown;
traceId: string;
projectId?: string;
};
const allowedMethods = new Set(["GET", "POST", "PUT", "PATCH", "DELETE"]);
const resultStore = new Map<string, StoredResult>();
export class DynamicHttpExecutor {
async execute(input: DynamicHttpInput, context: SessionToolContext) {
const method = (input.method ?? "GET").trim().toUpperCase();
if (!allowedMethods.has(method)) {
throw new Error(`unsupported method: ${method}`);
}
const path = input.path.trim();
if (!path.startsWith("/")) {
throw new Error("path must start with '/'");
}
const query = buildQuery(input.arguments ?? {});
const url = new URL(path, config.TJWATER_API_BASE_URL);
for (const [key, value] of query) {
url.searchParams.append(key, value);
}
// 这里复用 chat session 绑定的用户上下文,保持后端鉴权与项目隔离语义不变。
const headers = new Headers({
Accept: "application/json",
"x-trace-id": context.traceId,
});
if (context.accessToken) {
headers.set("Authorization", `Bearer ${context.accessToken}`);
}
if (context.projectId) {
headers.set("x-project-id", context.projectId);
}
const response = await fetch(url, {
method,
headers,
signal: AbortSignal.timeout(config.TJWATER_API_TIMEOUT_MS),
});
const contentType = response.headers.get("content-type") ?? "";
const rawText = await response.text();
const data =
contentType.includes("application/json") && rawText
? JSON.parse(rawText)
: rawText;
if (!response.ok) {
return {
ok: false,
trace_id: context.traceId,
upstream: {
method,
path,
status_code: response.status,
},
error: {
message: "upstream API returned error",
detail: data,
},
};
}
return {
ok: true,
trace_id: context.traceId,
upstream: {
method,
path,
status_code: response.status,
},
...normalizeSuccessResult(data, context),
};
}
getResult(resultRef: string) {
return resultStore.get(resultRef);
}
}
export const dynamicHttpExecutor = new DynamicHttpExecutor();
const buildQuery = (argumentsObject: Record<string, unknown>) => {
const pairs: Array<[string, string]> = [];
for (const [key, value] of Object.entries(argumentsObject)) {
if (value === undefined || value === null) {
continue;
}
if (Array.isArray(value)) {
if (value.length === 0) {
continue;
}
pairs.push([key, value.map(String).join(",")]);
continue;
}
pairs.push([key, String(value)]);
}
return pairs;
};
const normalizeSuccessResult = (data: unknown, context: SessionToolContext) => {
const sizeBytes = estimateBytes(data);
if (sizeBytes <= config.MAX_INLINE_RESULT_BYTES) {
return {
result_mode: "inline",
result_size_bytes: sizeBytes,
data,
};
}
const resultRef = `res-${randomUUID().slice(0, 16)}`;
// 大结果先落本地引用,避免工具输出把模型上下文直接撑爆。
resultStore.set(resultRef, {
rawResult: data,
traceId: context.traceId,
projectId: context.projectId,
});
return {
result_mode: "referenced",
result_size_bytes: sizeBytes,
result_ref: resultRef,
preview: buildPreview(data),
};
};
const estimateBytes = (data: unknown) => Buffer.byteLength(JSON.stringify(data));
const buildPreview = (data: unknown) => {
if (Array.isArray(data)) {
const sample = data.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS);
const fields =
sample.length > 0 && isRecord(sample[0])
? Object.keys(sample[0]).slice(0, 30)
: [];
return {
count: data.length,
fields,
sample,
summary: `list[${data.length}]`,
};
}
if (isRecord(data)) {
const fields = Object.keys(data).slice(0, 30);
const sample = Object.fromEntries(
fields.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS).map((field) => [field, data[field]]),
);
return {
count: fields.length,
fields,
sample,
summary: `object<${fields.length} fields>`,
};
}
return {
count: 1,
fields: [],
sample: String(data).slice(0, 300),
summary: `scalar<${typeof data}>`,
};
};
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);