Files
TJWaterAgent/src/results/store.ts
T

230 lines
5.8 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { join } from "node:path";
import { config } from "../config.js";
import { logger } from "../logger.js";
import {
atomicWriteJson,
ensureDirectory,
getFileStat,
listJsonFiles,
readJsonFile,
removeFileIfExists,
} from "../utils/fileStore.js";
export type ResultReferenceRecord = {
resultRef: string;
actorKey: string;
clientSessionId: string;
createdAt: string;
data: unknown;
preview: ResultPreview;
projectId?: string;
projectKey: string;
sessionId: string;
sizeBytes: number;
traceId: string;
};
export type ResultPreview = {
count: number;
fields: string[];
sample: unknown;
summary: string;
};
export type StoreResultInput = {
actorKey: string;
clientSessionId: string;
data: unknown;
projectId?: string;
projectKey: string;
sessionId: string;
traceId: string;
};
export type RetrievalContext = {
actorKey: string;
maxItems?: number;
projectId?: string;
};
export type ResultReferencePeek = {
resultRef: string;
preview: ResultPreview;
storedAt: string;
};
export class ResultReferenceStore {
private cleanupTimer: NodeJS.Timeout | null = null;
constructor(
private readonly baseDir = config.RESULT_REF_STORAGE_DIR,
private readonly ttlMs = config.RESULT_REF_TTL_HOURS * 60 * 60 * 1000,
) {}
async initialize() {
await ensureDirectory(this.baseDir);
}
startCleanupLoop() {
if (this.cleanupTimer) {
return;
}
this.cleanupTimer = setInterval(() => {
void this.cleanupExpired().catch((error) => {
logger.warn({ err: error }, "result ref cleanup failed");
});
}, config.RESULT_REF_CLEANUP_INTERVAL_MS);
this.cleanupTimer.unref?.();
}
stopCleanupLoop() {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
}
async store(input: StoreResultInput) {
const resultRef = `res-${randomUUID().slice(0, 16)}`;
const record: ResultReferenceRecord = {
resultRef,
actorKey: input.actorKey,
clientSessionId: input.clientSessionId,
createdAt: new Date().toISOString(),
data: input.data,
preview: buildPreview(input.data),
projectId: input.projectId,
projectKey: input.projectKey,
sessionId: input.sessionId,
sizeBytes: estimateBytes(input.data),
traceId: input.traceId,
};
await atomicWriteJson(this.filePath(resultRef), record);
return record;
}
async getAuthorized(resultRef: string, context: RetrievalContext) {
const record = await this.readAuthorizedRecord(resultRef, context);
if (!record) {
return null;
}
const data = projectData(record.data, context.maxItems ?? config.RESULT_REF_MAX_RETRIEVAL_ITEMS);
return {
ok: true,
result_ref: record.resultRef,
result_size_bytes: record.sizeBytes,
stored_at: record.createdAt,
data,
preview: record.preview,
};
}
async peekAuthorized(resultRef: string, context: RetrievalContext): Promise<ResultReferencePeek | null> {
const record = await this.readAuthorizedRecord(resultRef, context);
if (!record) {
return null;
}
return {
resultRef: record.resultRef,
preview: record.preview,
storedAt: record.createdAt,
};
}
async listBySession(sessionId: string) {
const files = await listJsonFiles(this.baseDir);
const records = await Promise.all(
files.map(async (filePath) => readJsonFile<ResultReferenceRecord>(filePath)),
);
return records
.filter((record): record is ResultReferenceRecord => Boolean(record))
.filter((record) => record.sessionId === sessionId)
.sort((left, right) => right.createdAt.localeCompare(left.createdAt));
}
async cleanupExpired() {
const files = await listJsonFiles(this.baseDir);
const now = Date.now();
for (const filePath of files) {
const stats = await getFileStat(filePath);
if (!stats) {
continue;
}
if (now - stats.mtimeMs > this.ttlMs) {
await removeFileIfExists(filePath);
}
}
}
private filePath(resultRef: string) {
return join(this.baseDir, `${resultRef}.json`);
}
private async readAuthorizedRecord(resultRef: string, context: RetrievalContext) {
const record = await readJsonFile<ResultReferenceRecord>(this.filePath(resultRef));
if (!record) {
return null;
}
if (record.actorKey !== context.actorKey) {
return null;
}
if ((record.projectId ?? "") !== (context.projectId ?? "")) {
return null;
}
return record;
}
}
const estimateBytes = (data: unknown) => Buffer.byteLength(JSON.stringify(data));
const buildPreview = (data: unknown): ResultPreview => {
if (Array.isArray(data)) {
const sample = data.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS);
const fields =
sample.length > 0 && isRecord(sample[0])
? Object.keys(sample[0]).slice(0, 30)
: [];
return {
count: data.length,
fields,
sample,
summary: `list[${data.length}]`,
};
}
if (isRecord(data)) {
const fields = Object.keys(data).slice(0, 30);
const sample = Object.fromEntries(
fields.slice(0, config.MAX_PREVIEW_SAMPLE_ITEMS).map((field) => [field, data[field]]),
);
return {
count: fields.length,
fields,
sample,
summary: `object<${fields.length} fields>`,
};
}
return {
count: 1,
fields: [],
sample: String(data).slice(0, 300),
summary: `scalar<${typeof data}>`,
};
};
const projectData = (data: unknown, maxItems: number) => {
if (Array.isArray(data)) {
return data.slice(0, maxItems);
}
if (isRecord(data)) {
return Object.fromEntries(Object.entries(data).slice(0, maxItems));
}
return data;
};
const isRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);