Files
TJWaterAgent/src/learning/orchestrator.ts
T
jiang c823e3935e
Agent CI/CD / docker-image (push) Failing after 24s
Agent CI/CD / deploy-fallback-log (push) Successful in 1s
feat(chat): expose model options config
2026-06-10 19:50:40 +08:00

626 lines
21 KiB
TypeScript

import { z } from "zod";
import { writeLearningAuditLog } from "../audit/learningAudit.js";
import { type SupportedModel } from "../chat/models.js";
import { type ChatRequestContext } from "../chat/sessionBridge.js";
import { config } from "../config.js";
import { type SessionTurnRecord, SessionTranscriptStore } from "../sessions/transcriptStore.js";
import { logger } from "../logger.js";
import { SessionLearningStateStore } from "./sessionStateStore.js";
import { MemoryStore, type MemoryScope } from "../memory/store.js";
import { type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
import { SkillStore } from "../skills/store.js";
import {
removeRuntimeSessionContext,
setRuntimeSessionContext,
} from "../runtime/sessionContext.js";
import {
sanitizePersistentDocument,
sanitizePersistentLine,
} from "../utils/persistencePolicy.js";
const gateResultSchema = z.object({
confidence: z.number().min(0).max(1).default(0),
focus: z.enum(["memory", "skill", "both", "none"]).default("none"),
reason: z.string().default(""),
should_review: z.boolean().default(false),
});
const reviewResultSchema = z.object({
memories: z
.array(
z.object({
action: z.enum(["add", "replace", "remove"]),
confidence: z.number().min(0).max(1),
content: z.string().optional(),
evidence: z.string().default(""),
scope: z.enum(["user", "workspace"]),
target_id: z.string().optional(),
}),
)
.default([]),
skills: z
.array(
z.object({
action: z.enum([
"append_pattern",
"remove_pattern",
"write_reference",
"write_skill",
"remove_skill",
]),
confidence: z.number().min(0).max(1),
content: z.string().optional(),
evidence: z.string().default(""),
file_path: z.string().optional(),
pattern: z.string().optional(),
skill_path: z.string(),
target_id: z.string().optional(),
}),
)
.default([]),
summary: z.string().default(""),
});
type GateResult = z.infer<typeof gateResultSchema>;
type ReviewResult = z.infer<typeof reviewResultSchema>;
type TurnReviewInput = {
assistantMessage: string;
model?: SupportedModel;
requestContext: ChatRequestContext;
sessionId: string;
toolCallCount: number;
userMessage: string;
};
export class LearningOrchestrator {
private readonly activeReviews = new Set<string>();
private readonly sessionLearningStateStore = new SessionLearningStateStore();
private readonly skillStore = new SkillStore();
constructor(
private readonly runtime: OpencodeRuntimeAdapter,
private readonly memoryStore: MemoryStore,
private readonly transcriptStore: SessionTranscriptStore,
) {}
async initialize() {
await this.sessionLearningStateStore.initialize();
}
async onTurnCompleted(input: TurnReviewInput) {
const transcript = await this.transcriptStore.appendTurn(
{
actorKey: input.requestContext.actorKey,
clientSessionId: input.requestContext.clientSessionId,
projectKey: input.requestContext.projectKey,
sessionId: input.sessionId,
},
{
assistantMessage: input.assistantMessage,
toolCallCount: input.toolCallCount,
userMessage: input.userMessage,
},
);
const turnCount = transcript.turns.length;
if (this.activeReviews.has(input.sessionId)) {
return;
}
this.activeReviews.add(input.sessionId);
try {
const state = await this.sessionLearningStateStore.read(input.sessionId);
const turnsSinceGate = Math.max(0, turnCount - state.lastGatedTurn);
if (turnsSinceGate < config.LEARNING_GATE_TURN_COOLDOWN) {
this.activeReviews.delete(input.sessionId);
return;
}
} catch (error) {
this.activeReviews.delete(input.sessionId);
throw error;
}
queueMicrotask(() => {
void this.runGate({
input,
recentTurns: transcript.turns.slice(-config.LEARNING_REVIEW_MAX_RECENT_TURNS),
turnCount,
}).finally(() => {
this.activeReviews.delete(input.sessionId);
});
});
}
private async runGate({
input,
recentTurns,
turnCount,
}: {
input: TurnReviewInput;
recentTurns: SessionTurnRecord[];
turnCount: number;
}) {
let gateSessionId: string | null = null;
try {
const gateSession = await this.runtime.createSession(
`learning-gate-${input.requestContext.clientSessionId}`,
);
gateSessionId = gateSession.id;
setRuntimeSessionContext({
actorKey: input.requestContext.actorKey,
allowLearningWrite: false,
clientSessionId: `gate-${input.requestContext.clientSessionId}`,
learningMode: "review",
projectId: input.requestContext.projectId,
projectKey: input.requestContext.projectKey,
sessionId: gateSession.id,
traceId: input.requestContext.traceId,
});
await this.runtime.prompt(
gateSession.id,
buildGatePrompt({ recentTurns }),
GATE_MODEL,
);
const messages = await this.runtime.messages(gateSession.id, 20);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const gateText = collectTextContent(assistantMessage?.parts ?? []);
const gate = parseGateResult(gateText);
if (!gate) {
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
await writeLearningAuditLog({
action: "review-gate",
detail: "gate result was not valid JSON",
outcome: "error",
projectId: input.requestContext.projectId,
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return;
}
const shouldPromote =
gate.should_review &&
gate.confidence >= config.LEARNING_GATE_MIN_CONFIDENCE &&
gate.focus !== "none";
await writeLearningAuditLog({
action: "review-gate",
detail: sanitizeAuditDetail(gate.reason),
outcome: shouldPromote ? "accepted" : "skipped",
projectId: input.requestContext.projectId,
proposal: sanitizeGateForAudit(gate),
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
if (!shouldPromote) {
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
return;
}
await this.runReview({
focus: gate.focus,
input,
recentTurns,
turnCount,
});
} catch (error) {
logger.warn({ err: error, sessionId: input.sessionId }, "learning gate failed");
await writeLearningAuditLog({
action: "review-gate",
detail: sanitizeAuditDetail(error instanceof Error ? error.message : String(error)),
outcome: "error",
projectId: input.requestContext.projectId,
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
} finally {
if (gateSessionId) {
removeRuntimeSessionContext(gateSessionId);
await this.runtime.abortSession(gateSessionId).catch(() => undefined);
}
}
}
private async runReview({
focus,
input,
recentTurns,
turnCount,
}: {
focus: GateResult["focus"];
input: TurnReviewInput;
recentTurns: SessionTurnRecord[];
turnCount: number;
}) {
const reviewSession = await this.runtime.createSession(
`learning-review-${input.requestContext.clientSessionId}`,
);
setRuntimeSessionContext({
actorKey: input.requestContext.actorKey,
allowLearningWrite: false,
clientSessionId: `review-${input.requestContext.clientSessionId}`,
learningMode: "review",
projectId: input.requestContext.projectId,
projectKey: input.requestContext.projectKey,
sessionId: reviewSession.id,
traceId: input.requestContext.traceId,
});
try {
const existingMemory = await this.loadMemoryContext(input.requestContext);
await this.runtime.prompt(
reviewSession.id,
buildReviewPrompt({ existingMemory, focus, recentTurns }),
toRuntimeModel(input.model),
);
const messages = await this.runtime.messages(reviewSession.id, 20);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const reviewText = collectTextContent(assistantMessage?.parts ?? []);
const parsed = parseReviewResult(reviewText);
if (!parsed) {
await this.sessionLearningStateStore.completeGate(input.sessionId, turnCount);
await writeLearningAuditLog({
action: "review-parse",
detail: "review result was not valid JSON",
outcome: "error",
projectId: input.requestContext.projectId,
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return;
}
await this.applyReviewResult(input, parsed, turnCount);
await this.sessionLearningStateStore.completeReview(input.sessionId, turnCount);
} catch (error) {
logger.warn({ err: error, sessionId: input.sessionId }, "learning review failed");
await writeLearningAuditLog({
action: "review-run",
detail: sanitizeAuditDetail(error instanceof Error ? error.message : String(error)),
outcome: "error",
projectId: input.requestContext.projectId,
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
} finally {
removeRuntimeSessionContext(reviewSession.id);
await this.runtime.abortSession(reviewSession.id).catch(() => undefined);
}
}
private async applyReviewResult(
input: TurnReviewInput,
result: ReviewResult,
turnCount: number,
) {
const threshold = config.LEARNING_MIN_PROPOSAL_CONFIDENCE;
let accepted = 0;
for (const proposal of result.memories) {
const outcome = await this.applyMemoryProposal(input, proposal, threshold);
accepted += outcome ? 1 : 0;
}
for (const proposal of result.skills) {
const outcome = await this.applySkillProposal(input, proposal, threshold);
accepted += outcome ? 1 : 0;
}
await writeLearningAuditLog({
action: "review-summary",
detail: sanitizeAuditDetail(result.summary),
outcome: accepted > 0 ? "accepted" : "skipped",
projectId: input.requestContext.projectId,
proposal: {
accepted,
memories: result.memories.length,
skills: result.skills.length,
turnCount,
},
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
}
private async loadMemoryContext(context: ChatRequestContext) {
const [userMemory, workspaceMemory] = await Promise.all([
this.memoryStore.list("user", context.actorKey),
this.memoryStore.list("workspace", context.projectKey),
]);
return { userMemory, workspaceMemory };
}
private async applyMemoryProposal(
input: TurnReviewInput,
proposal: ReviewResult["memories"][number],
threshold: number,
) {
if (proposal.confidence < threshold) {
await writeLearningAuditLog({
action: `memory-${proposal.action}`,
detail: "proposal below confidence threshold",
outcome: "skipped",
projectId: input.requestContext.projectId,
proposal: sanitizeMemoryProposalForAudit(proposal),
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return false;
}
const scopeKey =
proposal.scope === "user"
? input.requestContext.actorKey
: input.requestContext.projectKey;
const draft = {
content: proposal.content ?? "",
sessionId: input.sessionId,
source: "review" as const,
traceId: input.requestContext.traceId,
};
let accepted = false;
let detail = "memory rejected";
if (proposal.action === "add") {
const result = await this.memoryStore.upsert(proposal.scope as MemoryScope, scopeKey, draft);
accepted = Boolean(result.entry);
detail = result.detail;
} else if (proposal.action === "replace") {
const result = await this.memoryStore.replace(
proposal.scope as MemoryScope,
scopeKey,
proposal.target_id ?? "",
draft,
);
accepted = Boolean(result.changed);
detail = result.detail;
} else {
const result = await this.memoryStore.remove(
proposal.scope as MemoryScope,
scopeKey,
proposal.target_id ?? "",
);
accepted = Boolean(result.changed);
detail = result.detail;
}
await writeLearningAuditLog({
action: `memory-${proposal.action}`,
detail: sanitizeAuditDetail(detail),
outcome: accepted ? "accepted" : "rejected",
projectId: input.requestContext.projectId,
proposal: sanitizeMemoryProposalForAudit(proposal),
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return accepted;
}
private async applySkillProposal(
input: TurnReviewInput,
proposal: ReviewResult["skills"][number],
threshold: number,
) {
if (proposal.confidence < threshold) {
await writeLearningAuditLog({
action: `skill-${proposal.action}`,
detail: "proposal below confidence threshold",
outcome: "skipped",
projectId: input.requestContext.projectId,
proposal: sanitizeSkillProposalForAudit(proposal),
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return false;
}
const result =
proposal.action === "append_pattern"
? await this.skillStore.appendPattern(proposal.skill_path, proposal.pattern ?? "")
: proposal.action === "remove_pattern"
? await this.skillStore.removePattern(
proposal.skill_path,
proposal.target_id ?? "",
)
: proposal.action === "write_reference"
? await this.skillStore.writeReference(
proposal.skill_path,
proposal.file_path ?? "",
proposal.content ?? "",
)
: proposal.action === "write_skill"
? await this.skillStore.writeSkill(proposal.skill_path, proposal.content ?? "")
: await this.skillStore.removeSkill(proposal.skill_path);
await writeLearningAuditLog({
action: `skill-${proposal.action}`,
detail: sanitizeAuditDetail(result.detail),
outcome: result.changed ? "accepted" : "rejected",
projectId: input.requestContext.projectId,
proposal: sanitizeSkillProposalForAudit(proposal),
sessionId: input.sessionId,
traceId: input.requestContext.traceId,
});
return result.changed;
}
}
const buildGatePrompt = ({ recentTurns }: { recentTurns: SessionTurnRecord[] }) => {
const transcript = recentTurns
.map(
(turn, index) =>
`Turn ${index + 1}\nUser: ${turn.userMessage}\nAssistant: ${turn.assistantMessage}\nTool calls: ${turn.toolCallCount}`,
)
.join("\n\n");
return [
"You are the learning gate for TJWaterAgent.",
"Do NOT call any tools. Return JSON only. Do NOT wrap in markdown fences.",
"Decide whether this recent conversation is worth a deeper learning review.",
"A review is warranted only when there is likely durable memory or reusable skill signal.",
"Ignore one-off cases, temporary outcomes, and task-local noise.",
"",
'Return JSON schema: {"should_review":true|false,"reason":"string","confidence":0.0,"focus":"memory|skill|both|none"}',
"",
"Conversation transcript:",
transcript || "(empty)",
].join("\n");
};
const buildReviewPrompt = ({
existingMemory,
focus,
recentTurns,
}: {
existingMemory: {
userMemory: Array<{ content: string; id: string }>;
workspaceMemory: Array<{ content: string; id: string }>;
};
focus: GateResult["focus"];
recentTurns: SessionTurnRecord[];
}) => {
const transcript = recentTurns
.map(
(turn, index) =>
`Turn ${index + 1}\nUser: ${turn.userMessage}\nAssistant: ${turn.assistantMessage}\nTool calls: ${turn.toolCallCount}`,
)
.join("\n\n");
const userMemoryBlock =
existingMemory.userMemory.length > 0
? existingMemory.userMemory.map((entry) => `- [${entry.id}] ${entry.content}`).join("\n")
: "(empty)";
const workspaceMemoryBlock =
existingMemory.workspaceMemory.length > 0
? existingMemory.workspaceMemory
.map((entry) => `- [${entry.id}] ${entry.content}`)
.join("\n")
: "(empty)";
return [
"You are doing an internal self-improvement review for TJWaterAgent.",
"Do NOT call any tools. Return JSON only. Do NOT wrap in markdown fences.",
`Focus: ${focus}`,
"Decide what durable lessons to keep from the conversation below.",
"",
"Memory rules:",
"- Keep only stable user preferences, durable constraints, or stable workspace facts.",
"- Use scope='user' for user preferences and constraints.",
"- Use scope='workspace' for project or environment facts.",
"- Read the existing memories first before proposing changes.",
"- If a new lesson overlaps, refines, or supersedes an existing memory, prefer replace/remove using target_id instead of add.",
"- Use add only when the lesson is genuinely new after reviewing the existing memory list.",
"- Do not store one-off task outcomes, temporary facts, or speculative conclusions.",
"",
"Current persisted memories:",
"[User memory]",
userMemoryBlock,
"[Workspace memory]",
workspaceMemoryBlock,
"",
"Skill rules:",
"- Save only reusable workflows, methods, or pitfalls that will help in future similar tasks.",
"- Prefer append_pattern for concise reusable lessons.",
"- Use write_reference only for compact durable supporting notes under references/*.md.",
"- Use write_skill only when the conversation establishes a complete reusable SKILL.md with frontmatter name and description; it creates or overwrites the main SKILL.md.",
"- Use remove_skill only when the conversation clearly establishes the whole skill is obsolete or invalid.",
"",
"Output JSON schema:",
`{"summary":"string","memories":[{"action":"add|replace|remove","scope":"user|workspace","content":"string?","target_id":"string?","confidence":0.0,"evidence":"string"}],"skills":[{"action":"append_pattern|remove_pattern|write_reference|write_skill|remove_skill","skill_path":"string","pattern":"string?","target_id":"string?","file_path":"references/example.md?","content":"string?","confidence":0.0,"evidence":"string"}]}`,
"",
"If nothing should be saved, return empty arrays.",
"",
"Conversation transcript:",
transcript || "(empty)",
].join("\n");
};
const parseGateResult = (text: string): GateResult | null => {
const trimmed = text.trim();
if (!trimmed) {
return null;
}
const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const candidate = fenced?.[1]?.trim() ?? trimmed;
try {
return gateResultSchema.parse(JSON.parse(candidate));
} catch {
return null;
}
};
const parseReviewResult = (text: string): ReviewResult | null => {
const trimmed = text.trim();
if (!trimmed) {
return null;
}
const fenced = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const candidate = fenced?.[1]?.trim() ?? trimmed;
try {
return reviewResultSchema.parse(JSON.parse(candidate));
} catch {
return null;
}
};
const collectTextContent = (
parts: Array<{ type: string; text?: string }>,
) =>
parts
.filter((part): part is { type: "text"; text: string } => part.type === "text")
.map((part) => part.text)
.join("");
const toRuntimeModel = (model?: SupportedModel) => {
if (!model) {
return undefined;
}
const [providerID, modelID] = model.split("/");
if (!providerID || !modelID) {
return undefined;
}
return {
modelID,
providerID,
};
};
const GATE_MODEL = {
modelID: "deepseek-v4-flash",
providerID: "deepseek",
} as const;
const REDACTED_AUDIT_FIELD = "[redacted by persistence policy]";
const sanitizeAuditDetail = (detail?: string) => {
if (!detail) {
return undefined;
}
return sanitizePersistentDocument(detail, 1000) || REDACTED_AUDIT_FIELD;
};
const sanitizeAuditLine = (value?: string, maxLength = 320) => {
if (value === undefined) {
return undefined;
}
return sanitizePersistentLine(value, maxLength) || REDACTED_AUDIT_FIELD;
};
const sanitizeGateForAudit = (gate: GateResult): Record<string, unknown> => ({
confidence: gate.confidence,
focus: gate.focus,
reason: sanitizeAuditLine(gate.reason),
should_review: gate.should_review,
});
const sanitizeMemoryProposalForAudit = (
proposal: ReviewResult["memories"][number],
): Record<string, unknown> => ({
action: proposal.action,
confidence: proposal.confidence,
content: sanitizeAuditLine(proposal.content),
evidence: sanitizeAuditLine(proposal.evidence),
scope: proposal.scope,
target_id: sanitizeAuditLine(proposal.target_id, 120),
});
const sanitizeSkillProposalForAudit = (
proposal: ReviewResult["skills"][number],
): Record<string, unknown> => ({
action: proposal.action,
confidence: proposal.confidence,
content: sanitizeAuditDetail(proposal.content),
evidence: sanitizeAuditLine(proposal.evidence),
file_path: sanitizeAuditLine(proposal.file_path, 200),
pattern: sanitizeAuditLine(proposal.pattern),
skill_path: sanitizeAuditLine(proposal.skill_path, 200),
target_id: sanitizeAuditLine(proposal.target_id, 120),
});