Files
TJWaterAgent/src/server.ts
T

303 lines
8.3 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { spawn } from "node:child_process";
import cors from "cors";
import express from "express";
import { SessionTranscriptStore } from "./sessions/transcriptStore.js";
import { ChatSessionBridge } from "./chat/sessionBridge.js";
import { config } from "./config.js";
import { SessionUiStateStore } from "./sessions/uiStateStore.js";
import { SessionMetadataStore } from "./sessions/metadataStore.js";
import { logger } from "./logger.js";
import { LearningOrchestrator } from "./learning/orchestrator.js";
import { MemoryStore } from "./memory/store.js";
import { ResultReferenceResolver } from "./results/resolver.js";
import {
RESULT_REFERENCE_SOURCE,
ResultReferenceStore,
} from "./results/store.js";
import { buildChatRouter } from "./routes/chat.js";
import { opencodeRuntime } from "./runtime/opencode.js";
import { getRuntimeSessionContext } from "./runtime/sessionContext.js";
const app = express();
// 这里集中组装 Agent 服务的运行期依赖,路由层只通过接口调用,便于测试时替换实现。
const sessionBridge = new ChatSessionBridge(opencodeRuntime);
const sessionMetadataStore = new SessionMetadataStore();
const sessionUiStateStore = new SessionUiStateStore();
const memoryStore = new MemoryStore();
const sessionTranscriptStore = new SessionTranscriptStore();
const learningOrchestrator = new LearningOrchestrator(
opencodeRuntime,
memoryStore,
sessionTranscriptStore,
);
const resultReferenceStore = new ResultReferenceStore();
const resultReferenceResolver = new ResultReferenceResolver(resultReferenceStore);
const internalToken = config.AGENT_INTERNAL_TOKEN ?? randomUUID();
// 这个 token 只用于仍需服务端上下文的工具桥(store_render_ref)。
process.env.TJWATER_AGENT_INTERNAL_TOKEN = internalToken;
app.use(cors());
app.use(express.json({ limit: "1mb" }));
app.get("/health", async (_req, res) => {
try {
const runtime = await opencodeRuntime.health();
res.json({
ok: true,
runtime,
sessions: sessionBridge.count(),
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
res.status(503).json({
ok: false,
message: "opencode runtime unavailable",
detail,
sessions: sessionBridge.count(),
});
}
});
app.post("/internal/tools/tjwater-cli-call", async (req, res) => {
if (req.header("x-agent-internal-token") !== internalToken) {
res.status(403).json({ message: "forbidden" });
return;
}
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const context = sessionId ? getRuntimeSessionContext(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
detail: sessionId,
});
return;
}
const command = typeof req.body?.command === "string" ? req.body.command.trim() : "";
if (!command) {
res.status(400).json({ message: "command is required" });
return;
}
const timeoutSec =
typeof req.body?.timeout === "number" && req.body.timeout > 0 ? req.body.timeout : 120;
const authJson = JSON.stringify({
server: config.TJWATER_API_BASE_URL,
access_token: context.accessToken,
project_id: context.projectId,
network:"tjwater",
});
const cliArgs = ["--auth-stdin", ...command.split(/\s+/).filter(Boolean)];
const child = spawn(config.TJWATER_CLI_PATH, cliArgs, {
stdio: ["pipe", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (data: Buffer) => {
stdout += data.toString("utf-8");
});
child.stderr.on("data", (data: Buffer) => {
stderr += data.toString("utf-8");
});
child.stdin.write(authJson);
child.stdin.end();
const exitCode = await new Promise<number | null>((resolve, reject) => {
const timer = setTimeout(() => {
child.kill("SIGTERM");
resolve(-1);
}, timeoutSec * 1000);
child.on("close", (code) => {
clearTimeout(timer);
resolve(code);
});
child.on("error", (err) => {
clearTimeout(timer);
reject(err);
});
});
if (exitCode === -1) {
res.status(504).json({
ok: false,
schema_version: "tjwater-cli/v1",
summary: "命令超时",
error: {
code: "TIMEOUT",
message: `command timed out after ${timeoutSec}s`,
retryable: true,
},
});
return;
}
if (exitCode !== 0) {
res.status(502).json({
ok: false,
exit_code: exitCode,
stderr: stderr.slice(0, 2000),
stdout: stdout.slice(0, 2000),
message: `CLI exited with code ${exitCode}`,
});
return;
}
try {
res.json(JSON.parse(stdout));
} catch {
res.json({
ok: true,
schema_version: "tjwater-cli/v1",
raw: stdout,
stderr: stderr || undefined,
});
}
});
app.post("/internal/tools/store-render-ref", async (req, res) => {
if (req.header("x-agent-internal-token") !== internalToken) {
res.status(403).json({ message: "forbidden" });
return;
}
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const filePath = typeof req.body?.file_path === "string" ? req.body.file_path.trim() : "";
const context = sessionId ? getRuntimeSessionContext(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
detail: sessionId,
});
return;
}
if (!filePath) {
res.status(400).json({ message: "file_path is required" });
return;
}
try {
const record = await resultReferenceResolver.registerRenderPayloadFile(filePath, {
actorKey: context.actorKey,
clientSessionId: context.clientSessionId,
projectId: context.projectId,
projectKey: context.projectKey,
sessionId: context.clientSessionId,
source: RESULT_REFERENCE_SOURCE.agentGenerated,
traceId: context.traceId,
});
res.json({
ok: true,
render_ref: record.resultRef,
stored_at: record.createdAt,
preview: record.preview,
kind: record.kind,
schema_version: record.schemaVersion,
source: record.source,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
res.status(400).json({
message: "store render ref failed",
detail,
});
}
});
app.post("/internal/tools/session-search", async (req, res) => {
if (req.header("x-agent-internal-token") !== internalToken) {
res.status(403).json({ message: "forbidden" });
return;
}
const sessionId =
typeof req.body?.session_id === "string" ? req.body.session_id.trim() : "";
const query = typeof req.body?.query === "string" ? req.body.query : "";
const context = sessionId ? getRuntimeSessionContext(sessionId) : null;
if (!context) {
res.status(404).json({
message: "session context not found",
detail: sessionId,
});
return;
}
if (!query.trim()) {
res.status(400).json({ message: "query is required" });
return;
}
const hits = await sessionTranscriptStore.search(
{
actorKey: context.actorKey,
projectKey: context.projectKey,
},
query,
typeof req.body?.max_results === "number" ? req.body.max_results : undefined,
);
res.json({
hits,
query,
});
});
app.use(
"/api/v1/agent/chat",
buildChatRouter(
sessionBridge,
opencodeRuntime,
sessionMetadataStore,
sessionUiStateStore,
memoryStore,
sessionTranscriptStore,
learningOrchestrator,
resultReferenceResolver,
),
);
const bootstrap = async () => {
await Promise.all([
sessionMetadataStore.initialize(),
sessionUiStateStore.initialize(),
learningOrchestrator.initialize(),
memoryStore.initialize(),
resultReferenceStore.initialize(),
sessionTranscriptStore.initialize(),
]);
resultReferenceStore.startCleanupLoop();
};
await bootstrap();
const server = app.listen(config.PORT, config.HOST, () => {
logger.info(
{ host: config.HOST, port: config.PORT },
"TJWaterAgent listening",
);
});
const shutdown = async () => {
logger.info("shutting down TJWaterAgent");
server.close();
resultReferenceStore.stopCleanupLoop();
// 同步关闭 embedded opencode server,避免本服务退出后留下孤儿进程。
await opencodeRuntime.dispose();
};
process.on("SIGINT", () => {
void shutdown();
});
process.on("SIGTERM", () => {
void shutdown();
});