Files
TJWaterAgent/src/results/store.ts
T
jiang ab12d79d91
Agent CI/CD / docker-image (push) Successful in 17s
Agent CI/CD / deploy-fallback-log (push) Has been skipped
fix(results): support legacy render refs
2026-05-21 18:18:16 +08:00

419 lines
11 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { join } from "node:path";
import { config } from "../config.js";
import { logger } from "../logger.js";
import {
atomicWriteJson,
ensureDirectory,
getFileStat,
listJsonFiles,
readJsonFile,
removeFileIfExists,
toProjectKey,
} from "../utils/fileStore.js";
export const RESULT_REF_PATTERN = /^res-[a-f0-9-]{8,64}$/;
const RESULT_REF_FILE_PATTERN = /^(res-[a-f0-9-]{8,64})(?:\.json)?$/;
export const RESULT_REFERENCE_KIND = {
dynamicHttpResult: "dynamic-http-result",
renderJunctionsPayload: "render-junctions-payload",
} as const;
export const RESULT_REFERENCE_SOURCE = {
dynamicHttp: "dynamic_http",
agentGenerated: "agent_generated",
legacy: "legacy",
migration: "migration",
} as const;
export type ResultReferenceKind =
(typeof RESULT_REFERENCE_KIND)[keyof typeof RESULT_REFERENCE_KIND];
export type ResultReferenceSource =
(typeof RESULT_REFERENCE_SOURCE)[keyof typeof RESULT_REFERENCE_SOURCE];
export type ResultPreview = {
count: number;
fields: string[];
sample: unknown;
summary: string;
};
export type ResultReferenceRecord = {
resultRef: string;
actorKey: string;
clientSessionId: string;
createdAt: string;
data: unknown;
kind: ResultReferenceKind;
preview: ResultPreview;
projectId?: string;
projectKey: string;
schemaVersion: number;
sessionId: string;
sizeBytes: number;
source: ResultReferenceSource;
traceId: string;
};
export type StoreResultInput = {
actorKey: string;
clientSessionId: string;
data: unknown;
kind: ResultReferenceKind;
projectId?: string;
projectKey: string;
schemaVersion: number;
sessionId: string;
source: ResultReferenceSource;
traceId: string;
};
export type RetrievalContext = {
actorKey: string;
clientSessionId?: string;
projectId?: string;
};
export type ResultReferencePeek = {
resultRef: string;
kind: ResultReferenceKind;
preview: ResultPreview;
storedAt: string;
};
type PartialRecord = Partial<ResultReferenceRecord> & { data?: unknown };
export class ResultReferenceStore {
private cleanupTimer: NodeJS.Timeout | null = null;
constructor(
private readonly baseDir = config.RESULT_REF_STORAGE_DIR,
private readonly ttlMs = config.RESULT_REF_TTL_HOURS * 60 * 60 * 1000,
) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
startCleanupLoop() {
if (this.cleanupTimer) {
return;
}
this.cleanupTimer = setInterval(() => {
void this.cleanupExpired().catch((error) => {
logger.warn({ err: error }, "result ref cleanup failed");
});
}, config.RESULT_REF_CLEANUP_INTERVAL_MS);
this.cleanupTimer.unref?.();
}
stopCleanupLoop() {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
}
async store(input: StoreResultInput) {
const resultRef = `res-${randomUUID().slice(0, 16)}`;
const record: ResultReferenceRecord = {
resultRef,
actorKey: input.actorKey,
clientSessionId: input.clientSessionId,
createdAt: new Date().toISOString(),
data: input.data,
kind: input.kind,
preview: buildPreview(input.data),
projectId: input.projectId,
projectKey: input.projectKey,
schemaVersion: input.schemaVersion,
sessionId: input.sessionId,
sizeBytes: estimateBytes(input.data),
source: input.source,
traceId: input.traceId,
};
await atomicWriteJson(this.filePath(resultRef), record);
return record;
}
async getAuthorizedRecord(resultRef: string, context: RetrievalContext) {
const normalizedResultRef = normalizeResultRef(resultRef);
if (!normalizedResultRef) {
return null;
}
const rawRecord = await readJsonFile<unknown>(this.filePath(normalizedResultRef));
const record =
normalizeResultReferenceRecord(rawRecord) ??
normalizeLegacyRenderReferenceRecord(rawRecord, normalizedResultRef, context);
if (!record) {
return null;
}
if (record.actorKey !== context.actorKey) {
return null;
}
if ((record.projectId ?? "") !== (context.projectId ?? "")) {
return null;
}
if (
context.clientSessionId &&
record.clientSessionId !== context.clientSessionId
) {
return null;
}
return record;
}
async peekAuthorized(
resultRef: string,
context: RetrievalContext,
): Promise<ResultReferencePeek | null> {
const record = await this.getAuthorizedRecord(resultRef, context);
if (!record) {
return null;
}
return {
resultRef: record.resultRef,
kind: record.kind,
preview: record.preview,
storedAt: record.createdAt,
};
}
async listBySession(sessionId: string) {
const files = await listJsonFiles(this.baseDir);
const records = await Promise.all(
files.map(async (filePath) =>
normalizeResultReferenceRecord(await readJsonFile<unknown>(filePath)),
),
);
return records
.filter((record): record is ResultReferenceRecord => Boolean(record))
.filter((record) => record.sessionId === sessionId)
.sort((left, right) => right.createdAt.localeCompare(left.createdAt));
}
async cleanupExpired() {
const files = await listJsonFiles(this.baseDir);
const now = Date.now();
for (const filePath of files) {
const stats = await getFileStat(filePath);
if (!stats) {
continue;
}
if (now - stats.mtimeMs > this.ttlMs) {
await removeFileIfExists(filePath);
}
}
}
private filePath(resultRef: string) {
return join(this.baseDir, `${resultRef}.json`);
}
}
export const normalizeResultReferenceRecord = (
value: unknown,
): ResultReferenceRecord | null => {
if (!isRecord(value)) {
return null;
}
const partial = value as PartialRecord;
if (
!isValidResultRef(partial.resultRef) ||
typeof partial.actorKey !== "string" ||
typeof partial.clientSessionId !== "string" ||
typeof partial.createdAt !== "string" ||
!("data" in partial) ||
!isResultPreview(partial.preview) ||
typeof partial.projectKey !== "string" ||
typeof partial.sessionId !== "string" ||
typeof partial.sizeBytes !== "number" ||
!Number.isFinite(partial.sizeBytes) ||
typeof partial.traceId !== "string"
) {
return null;
}
const kind = normalizeResultReferenceKind(partial.kind);
const source = normalizeResultReferenceSource(partial.source);
const schemaVersion =
typeof partial.schemaVersion === "number" &&
Number.isInteger(partial.schemaVersion) &&
partial.schemaVersion > 0
? partial.schemaVersion
: 1;
if (!kind || !source) {
return null;
}
if (
partial.projectId !== undefined &&
typeof partial.projectId !== "string"
) {
return null;
}
return {
resultRef: partial.resultRef,
actorKey: partial.actorKey,
clientSessionId: partial.clientSessionId,
createdAt: partial.createdAt,
data: partial.data,
kind,
preview: partial.preview,
projectId: partial.projectId,
projectKey: partial.projectKey,
schemaVersion,
sessionId: partial.sessionId,
sizeBytes: partial.sizeBytes,
source,
traceId: partial.traceId,
};
};
const normalizeResultReferenceKind = (
value: unknown,
): ResultReferenceKind | null => {
if (value === undefined) {
return RESULT_REFERENCE_KIND.dynamicHttpResult;
}
return Object.values(RESULT_REFERENCE_KIND).includes(
value as ResultReferenceKind,
)
? (value as ResultReferenceKind)
: null;
};
const normalizeResultReferenceSource = (
value: unknown,
): ResultReferenceSource | null => {
if (value === undefined) {
return RESULT_REFERENCE_SOURCE.legacy;
}
return Object.values(RESULT_REFERENCE_SOURCE).includes(
value as ResultReferenceSource,
)
? (value as ResultReferenceSource)
: null;
};
const isValidResultRef = (value: unknown): value is string =>
typeof value === "string" && RESULT_REF_PATTERN.test(value);
const normalizeResultRef = (value: string) => {
const match = value.trim().match(RESULT_REF_FILE_PATTERN);
return match?.[1] ?? null;
};
const normalizeLegacyRenderReferenceRecord = (
value: unknown,
resultRef: string,
context: RetrievalContext,
): ResultReferenceRecord | null => {
const data = extractLegacyRenderPayload(value);
if (!data) {
return null;
}
const root = isRecord(value) ? value : {};
const metadata = isRecord(root.metadata) ? root.metadata : {};
const projectId = firstNonEmptyString(root.projectId, metadata.projectId);
const createdAt =
firstNonEmptyString(root.createdAt, metadata.createdAt) ?? new Date().toISOString();
return {
resultRef,
actorKey: context.actorKey,
clientSessionId: context.clientSessionId ?? "",
createdAt,
data,
kind: RESULT_REFERENCE_KIND.renderJunctionsPayload,
preview: buildPreview(data),
projectId,
projectKey: toProjectKey(projectId),
schemaVersion: 1,
sessionId: context.clientSessionId ?? resultRef,
sizeBytes: estimateBytes(data),
source: RESULT_REFERENCE_SOURCE.legacy,
traceId: "legacy-render-ref",
};
};
const extractLegacyRenderPayload = (value: unknown) => {
if (!isRecord(value)) {
return null;
}
const candidate = isRecord(value.data) ? value.data : value;
if (!isRecord(candidate.node_area_map)) {
return null;
}
return candidate;
};
const firstNonEmptyString = (...values: unknown[]) => {
for (const value of values) {
if (typeof value === "string" && value.trim().length > 0) {
return value.trim();
}
}
return undefined;
};
const isResultPreview = (value: unknown): value is ResultPreview =>
isRecord(value) &&
typeof value.count === "number" &&
Number.isFinite(value.count) &&
Array.isArray(value.fields) &&
value.fields.every((field) => typeof field === "string") &&
typeof value.summary === "string" &&
"sample" in value;
const estimateBytes = (data: unknown) => Buffer.byteLength(JSON.stringify(data));
const buildPreview = (data: unknown): ResultPreview => {
if (Array.isArray(data)) {
const sample = data.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS);
const fields =
sample.length > 0 && isRecord(sample[0])
? Object.keys(sample[0]).slice(0, 30)
: [];
return {
count: data.length,
fields,
sample,
summary: `list[${data.length}]`,
};
}
if (isRecord(data)) {
const fields = Object.keys(data).slice(0, 30);
const sample = Object.fromEntries(
fields
.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS)
.map((field) => [field, data[field]]),
);
return {
count: fields.length,
fields,
sample,
summary: `object<${fields.length} fields>`,
};
}
return {
count: 1,
fields: [],
sample: String(data).slice(0, 300),
summary: `scalar<${typeof data}>`,
};
};
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);