Files
TJWaterAgent/src/routes/chatStream.ts
T

1072 lines
35 KiB
TypeScript

import type { Event as OpencodeEvent, Part } from "@opencode-ai/sdk/v2";
import { writeLlmRequestAuditLog } from "../audit/llmRequestAudit.js";
import { logger } from "../logger.js";
import { type PermissionReply, type OpencodeRuntimeAdapter } from "../runtime/opencode.js";
export const supportedModels = [
"deepseek/deepseek-v4-flash",
"deepseek/deepseek-v4-pro",
] as const;
export type SupportedModel = (typeof supportedModels)[number];
export type ApprovalMode = "request" | "always";
type StreamPromptOptions = {
runtime: OpencodeRuntimeAdapter;
sessionId: string;
clientSessionId: string;
message: string;
model?: SupportedModel;
approvalMode?: ApprovalMode;
traceId?: string;
projectId?: string;
signal?: AbortSignal;
write: (event: string, data: Record<string, unknown>) => void;
};
type ProgressStatus = "running" | "completed" | "error";
type ProgressPayload = {
id: string;
phase: string;
status: ProgressStatus;
title: string;
detail?: string;
};
export type PermissionRequestPayload = {
session_id: string;
request_id: string;
permission: string;
patterns: string[];
metadata: Record<string, unknown>;
always: string[];
tool?: {
messageID: string;
callID: string;
};
created_at: number;
};
const isDevelopmentDebugLoggingEnabled = process.env.NODE_ENV === "development";
const toolLabels: Record<string, string> = {
memory_manager: "记忆写入",
session_search: "历史会话检索",
skill_manager: "流程沉淀",
locate_features: "地图定位",
view_history: "历史数据面板",
view_scada: "SCADA 面板",
show_chart: "图表渲染",
render_junctions: "节点渲染",
};
const logDevelopmentDebug = (
message: string,
metadata: Record<string, unknown>,
) => {
if (!isDevelopmentDebugLoggingEnabled) {
return;
}
logger.info(metadata, message);
};
const getErrorMessage = (error: {
name: string;
data?: { message?: string };
}) => error.data?.message ?? error.name;
const getUnknownErrorMessage = (error: unknown) => {
if (
typeof error === "object" &&
error !== null &&
"name" in error &&
typeof error.name === "string"
) {
const maybeData = "data" in error ? error.data : undefined;
return getErrorMessage({
name: error.name,
data:
typeof maybeData === "object" && maybeData !== null && "message" in maybeData
? { message: typeof maybeData.message === "string" ? maybeData.message : undefined }
: undefined,
});
}
return error instanceof Error ? error.message : String(error);
};
const isObjectRecord = (value: unknown): value is Record<string, unknown> =>
typeof value === "object" && value !== null && !Array.isArray(value);
const normalizeToolParams = (value: unknown): Record<string, unknown> => {
if (isObjectRecord(value)) {
return value;
}
if (typeof value === "string") {
try {
const parsed = JSON.parse(value) as unknown;
return isObjectRecord(parsed) ? parsed : {};
} catch {
return {};
}
}
return {};
};
const extractRequestReason = (params: Record<string, unknown>) => {
const candidates = ["reason", "request_reason", "why", "purpose", "rationale"];
for (const key of candidates) {
const value = params[key];
if (typeof value === "string") {
const normalized = value.trim();
if (normalized) {
return normalized;
}
}
}
return "";
};
const isSkillEvent = (event: OpencodeEvent) => event.type.toLowerCase().includes("skill");
const extractSkillAuditInfo = (event: OpencodeEvent) => {
const payload = isObjectRecord(event.properties)
? (event.properties as Record<string, unknown>)
: {};
const candidateName =
typeof payload.skill === "string"
? payload.skill
: typeof payload.skillName === "string"
? payload.skillName
: typeof payload.name === "string"
? payload.name
: event.type;
const reason = extractRequestReason(payload);
return {
name: candidateName,
reason,
payload,
};
};
const hasToolParams = (params: Record<string, unknown>) =>
Object.keys(params).length > 0;
const toRuntimeModel = (model?: SupportedModel) => {
if (!model) {
return undefined;
}
const [providerID, modelID] = model.split("/");
if (!providerID || !modelID) {
return undefined;
}
return {
providerID,
modelID,
};
};
const isSessionEvent = (event: OpencodeEvent, sessionId: string) =>
"properties" in event &&
typeof event.properties === "object" &&
event.properties !== null &&
"sessionID" in event.properties &&
event.properties.sessionID === sessionId;
const isPermissionAskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.asked" }> =>
event.type === "permission.asked";
const isPermissionV2AskedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.asked" }> =>
event.type === "permission.v2.asked";
const isPermissionRepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.replied" }> =>
event.type === "permission.replied";
const isPermissionV2RepliedEvent = (
event: OpencodeEvent,
): event is Extract<OpencodeEvent, { type: "permission.v2.replied" }> =>
event.type === "permission.v2.replied";
const buildPermissionDetail = (event: Extract<OpencodeEvent, { type: "permission.asked" }>) => {
const patterns = event.properties.patterns.length
? event.properties.patterns.join(", ")
: event.properties.permission;
return `需要用户确认权限:${event.properties.permission};匹配规则:${patterns}`;
};
const buildPermissionV2Detail = (
event: Extract<OpencodeEvent, { type: "permission.v2.asked" }>,
) => {
const resources = event.properties.resources.length
? event.properties.resources.join(", ")
: event.properties.action;
return `需要用户确认权限:${event.properties.action};资源:${resources}`;
};
export const collectTextContent = (parts: Part[]) =>
parts
.filter((part): part is Extract<Part, { type: "text" }> => part.type === "text")
.map((part) => part.text)
.join("");
const emitFallbackMessage = async (
runtime: OpencodeRuntimeAdapter,
sessionId: string,
clientSessionId: string,
write: (event: string, data: Record<string, unknown>) => void,
) => {
const messages = await runtime.messages(sessionId);
const assistantMessage = [...messages]
.reverse()
.find((message) => message.info.role === "assistant");
const parts = assistantMessage?.parts ?? [];
const text = collectTextContent(parts);
if (text) {
write("token", {
session_id: clientSessionId,
content: text,
});
}
};
const normalizeToolStatus = (status: string) => {
if (status === "completed") return "completed";
if (status === "error") return "error";
return "running";
};
const formatProgressValue = (value: unknown): string => {
if (typeof value === "string") {
return value.length > 120 ? `${value.slice(0, 117)}...` : value;
}
if (
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value === undefined
) {
return String(value);
}
try {
const serialized = JSON.stringify(value);
return serialized.length > 120 ? `${serialized.slice(0, 117)}...` : serialized;
} catch {
return "[unserializable]";
}
};
const normalizeProgressText = (chunks: string[]) => chunks.join("").replace(/\s+/g, " ").trim();
const truncateProgressText = (text: string, maxLength: number) =>
text.length > maxLength ? `${text.slice(0, maxLength - 3)}...` : text;
const summarizeToolParams = (params: Record<string, unknown>) => {
const ignoredKeys = new Set(["reason", "request_reason", "why", "purpose", "rationale"]);
const summary = Object.entries(params)
.filter(([key]) => !ignoredKeys.has(key))
.slice(0, 4)
.map(([key, value]) => `${key}=${formatProgressValue(value)}`)
.join(", ");
return summary || "无附加参数";
};
const buildSessionStatusDetail = (status: { type: string; message?: string }) => {
if (status.type === "retry") {
return status.message
? `模型请求需要重试,原因:${status.message}`
: "模型请求正在重试,等待下一次响应。";
}
if (status.type === "busy") {
return status.message
? `Agent 正在处理中:${status.message}`
: "Agent 正在执行推理、工具调用或结果整理。";
}
if (status.type === "idle") {
return status.message
? `Agent 已空闲:${status.message}`
: "当前会话暂时没有待处理任务。";
}
return status.message ? `会话状态更新:${status.message}` : `会话状态更新:${status.type}`;
};
const buildReasoningProgressDetail = (chunks: string[], ended?: string | number | Date | null) => {
const reasoningText = truncateProgressText(normalizeProgressText(chunks), 800);
if (ended) {
return reasoningText
? `推理过程:${reasoningText}`
: "当前推理阶段已完成,Agent 将继续输出答案或进入工具执行。";
}
return reasoningText
? `正在推理:${reasoningText}`
: "Agent 正在拆解问题、梳理执行步骤并判断是否需要调用工具。";
};
const buildToolProgressDetail = (
tool: string,
status: string,
params: Record<string, unknown>,
reason: string,
error?: string,
) => {
const toolName = toolLabels[tool] ?? tool;
const reasonText = reason ? `;调用原因:${reason}` : "";
const paramsText = `;关键参数:${summarizeToolParams(params)}`;
if (status === "error") {
const errorText = error ? `;错误:${error}` : "";
return `${toolName} 调用失败${reasonText}${paramsText}${errorText}`;
}
if (status === "completed") {
return `${toolName} 已执行完成${reasonText}${paramsText}`;
}
if (status === "pending") {
return `${toolName} 已进入待执行状态${reasonText}${paramsText}`;
}
return `${toolName} 正在执行${reasonText}${paramsText}`;
};
const getToolProgressTitle = (tool: string, status: string) => {
const toolName = toolLabels[tool] ?? tool;
if (status === "completed") return `${toolName} 已完成`;
if (status === "error") return `${toolName} 执行失败`;
if (status === "pending") return `准备调用 ${toolName}`;
return `正在调用 ${toolName}`;
};
export const streamPromptResponse = async ({
runtime,
sessionId,
clientSessionId,
message,
model,
approvalMode = "request",
traceId,
projectId,
signal,
write,
}: StreamPromptOptions): Promise<{
aborted: boolean;
failed: boolean;
toolCallCount: number;
}> => {
const eventStream = await runtime.subscribeEvents();
const iterator = eventStream[Symbol.asyncIterator]();
const requestStartedAt = Date.now();
const promptStartedAt = Date.now();
const progressStartedAtMap = new Map<string, number>();
const finalizedProgressIds = new Set<string>();
const emittedToolParts = new Set<string>();
const partTypes = new Map<string, Part["type"]>();
const pendingPartTextDeltas = new Map<string, string[]>();
const reasoningDeltas = new Map<string, string[]>();
const reasoningStatuses = new Map<string, "running" | "completed">();
const toolStatuses = new Map<string, string>();
let firstSessionEventLogged = false;
let firstNonStatusEventLogged = false;
let firstTokenLogged = false;
let firstReasoningLogged = false;
let firstToolEventLogged = false;
let lastSessionStatus: string | null = null;
let lastSessionStatusMessage: string | null = null;
let sawResponseActivity = false;
let emittedText = false;
let toolCallCount = 0;
let done = false;
let promptSettled = false;
let aborted = signal?.aborted ?? false;
let failed = false;
const debugContext = {
sessionId,
clientSessionId,
traceId,
projectId,
model: model ?? null,
};
logDevelopmentDebug("chat stream started", {
...debugContext,
messageChars: message.length,
});
const abortPromise = signal
? new Promise<{ type: "abort" }>((resolve) => {
if (signal.aborted) {
resolve({ type: "abort" });
return;
}
signal.addEventListener("abort", () => resolve({ type: "abort" }), {
once: true,
});
})
: null;
const emitProgress = ({ id, phase, status, title, detail }: ProgressPayload) => {
if (status === "running" && finalizedProgressIds.has(id)) {
return;
}
const now = Date.now();
const startedAt = progressStartedAtMap.get(id) ?? now;
if (!progressStartedAtMap.has(id)) {
progressStartedAtMap.set(id, startedAt);
}
if (status === "running") {
write("progress", {
session_id: clientSessionId,
id,
phase,
status,
title,
detail,
started_at: startedAt,
elapsed_ms: Math.max(0, now - startedAt),
});
return;
}
const durationMs = Math.max(0, now - startedAt);
finalizedProgressIds.add(id);
progressStartedAtMap.delete(id);
write("progress", {
session_id: clientSessionId,
id,
phase,
status,
title,
detail,
started_at: startedAt,
ended_at: now,
duration_ms: durationMs,
});
};
emitProgress({
id: "request-received",
phase: "start",
status: "running",
title: "已收到请求,正在启动 Agent 分析",
detail: "已接收用户消息,正在建立会话并准备进入分析、规划和工具调用阶段。",
});
const promptPromise = runtime
.prompt(sessionId, message, toRuntimeModel(model))
.then(() => {
promptSettled = true;
logDevelopmentDebug("runtime.prompt resolved", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - promptStartedAt),
});
})
.catch((error: unknown) => {
promptSettled = true;
logDevelopmentDebug("runtime.prompt failed", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - promptStartedAt),
error: getUnknownErrorMessage(error),
});
throw error;
});
logDevelopmentDebug("runtime.prompt dispatched", {
...debugContext,
});
try {
while (!done) {
if (signal?.aborted) {
aborted = true;
logDevelopmentDebug("chat stream noticed abort signal", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
break;
}
const nextEvent = iterator
.next()
.then((result) => ({ type: "event" as const, result }));
const nextPrompt = promptSettled
? null
: promptPromise.then(
() => ({ type: "prompt" as const }),
(error: unknown) => ({ type: "prompt-error" as const, error }),
);
const next = await Promise.race(
[
...(nextPrompt ? [nextEvent, nextPrompt] : [nextEvent]),
...(abortPromise ? [abortPromise] : []),
],
);
if (next.type === "abort") {
aborted = true;
break;
}
if (next.type === "prompt-error") {
throw next.error;
}
if (next.type === "prompt") {
continue;
}
if (next.result.done) {
break;
}
const event = next.result.value as OpencodeEvent;
if (!isSessionEvent(event, sessionId)) {
continue;
}
if (!firstSessionEventLogged) {
firstSessionEventLogged = true;
logDevelopmentDebug("first session event received", {
...debugContext,
eventType: event.type,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
sincePromptDispatchMs: Math.max(0, Date.now() - promptStartedAt),
});
}
if (event.type === "session.status") {
const nextStatus = event.properties.status.type;
const nextStatusMessage =
"message" in event.properties.status &&
typeof event.properties.status.message === "string"
? event.properties.status.message
: null;
if (
nextStatus !== lastSessionStatus ||
nextStatusMessage !== lastSessionStatusMessage
) {
lastSessionStatus = nextStatus;
lastSessionStatusMessage = nextStatusMessage;
logDevelopmentDebug("session status updated", {
...debugContext,
status: nextStatus,
statusMessage: nextStatusMessage,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
}
emitProgress({
id: "session-status",
phase: "session",
status: event.properties.status.type === "idle" ? "completed" : "running",
title:
event.properties.status.type === "retry"
? `模型请求重试中:${event.properties.status.message}`
: event.properties.status.type === "busy"
? "Agent 正在处理请求"
: "Agent 已空闲",
detail: buildSessionStatusDetail(event.properties.status),
});
continue;
}
if (!firstNonStatusEventLogged) {
firstNonStatusEventLogged = true;
logDevelopmentDebug("first non-status session event received", {
...debugContext,
eventType: event.type,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
sincePromptDispatchMs: Math.max(0, Date.now() - promptStartedAt),
});
}
if (isPermissionAskedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("permission request received", {
...debugContext,
requestId: event.properties.id,
permission: event.properties.permission,
patterns: event.properties.patterns,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `permission-${event.properties.id}`,
phase: "permission",
status: approvalMode === "always" ? "completed" : "running",
title: approvalMode === "always" ? "已自动允许权限请求" : "等待权限确认",
detail:
approvalMode === "always"
? "当前批准模式为始终允许,已自动允许本次权限请求。"
: buildPermissionDetail(event),
});
if (approvalMode === "always") {
await runtime.replyPermission({
requestId: event.properties.id,
sessionId,
reply: "always",
});
write("permission_response", {
session_id: clientSessionId,
request_id: event.properties.id,
reply: "always" satisfies PermissionReply,
});
continue;
}
write("permission_request", {
session_id: clientSessionId,
request_id: event.properties.id,
permission: event.properties.permission,
patterns: event.properties.patterns,
metadata: event.properties.metadata,
always: event.properties.always,
tool: event.properties.tool,
created_at: Date.now(),
} satisfies PermissionRequestPayload);
continue;
}
if (isPermissionV2AskedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("permission v2 request received", {
...debugContext,
requestId: event.properties.id,
action: event.properties.action,
resources: event.properties.resources,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `permission-${event.properties.id}`,
phase: "permission",
status: approvalMode === "always" ? "completed" : "running",
title: approvalMode === "always" ? "已自动允许权限请求" : "等待权限确认",
detail:
approvalMode === "always"
? "当前批准模式为始终允许,已自动允许本次权限请求。"
: buildPermissionV2Detail(event),
});
if (approvalMode === "always") {
await runtime.replyPermission({
requestId: event.properties.id,
sessionId,
reply: "always",
});
write("permission_response", {
session_id: clientSessionId,
request_id: event.properties.id,
reply: "always" satisfies PermissionReply,
});
continue;
}
write("permission_request", {
session_id: clientSessionId,
request_id: event.properties.id,
permission: event.properties.action,
patterns: event.properties.resources,
metadata: event.properties.metadata ?? {},
always: event.properties.save ?? [],
tool: undefined,
created_at: Date.now(),
} satisfies PermissionRequestPayload);
continue;
}
if (isPermissionRepliedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("permission request replied", {
...debugContext,
requestId: event.properties.requestID,
reply: event.properties.reply,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `permission-${event.properties.requestID}`,
phase: "permission",
status: event.properties.reply === "reject" ? "error" : "completed",
title:
event.properties.reply === "reject"
? "权限请求已拒绝"
: "权限请求已允许",
detail:
event.properties.reply === "always"
? "已允许本次请求,并记住同类权限。"
: event.properties.reply === "once"
? "已允许本次请求。"
: "已拒绝本次请求。",
});
write("permission_response", {
session_id: clientSessionId,
request_id: event.properties.requestID,
reply: event.properties.reply satisfies PermissionReply,
});
continue;
}
if (isPermissionV2RepliedEvent(event)) {
sawResponseActivity = true;
logDevelopmentDebug("permission v2 request replied", {
...debugContext,
requestId: event.properties.requestID,
reply: event.properties.reply,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: `permission-${event.properties.requestID}`,
phase: "permission",
status: event.properties.reply === "reject" ? "error" : "completed",
title:
event.properties.reply === "reject"
? "权限请求已拒绝"
: "权限请求已允许",
detail:
event.properties.reply === "always"
? "已允许本次请求,并记住同类权限。"
: event.properties.reply === "once"
? "已允许本次请求。"
: "已拒绝本次请求。",
});
write("permission_response", {
session_id: clientSessionId,
request_id: event.properties.requestID,
reply: event.properties.reply satisfies PermissionReply,
});
continue;
}
if (isSkillEvent(event)) {
sawResponseActivity = true;
const { name, reason, payload } = extractSkillAuditInfo(event);
logDevelopmentDebug("skill event received", {
...debugContext,
skill: name,
reason: reason || null,
payloadKeys: Object.keys(payload).slice(0, 8),
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
void writeLlmRequestAuditLog({
kind: "skill",
sessionId: sessionId,
clientSessionId,
traceId,
projectId,
target: name,
reason,
reasonProvided: Boolean(reason),
payload,
}).catch((error) => {
logger.warn({ err: error }, "failed to write skill audit log");
});
}
if (event.type === "message.updated") {
if (event.properties.info.role === "assistant") {
sawResponseActivity = true;
}
continue;
}
if (event.type === "message.part.delta" && event.properties.field === "text") {
sawResponseActivity = true;
const partType = partTypes.get(event.properties.partID);
if (partType === "text") {
if (!firstTokenLogged) {
firstTokenLogged = true;
logDevelopmentDebug("first response token emitted", {
...debugContext,
partId: event.properties.partID,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
sincePromptDispatchMs: Math.max(0, Date.now() - promptStartedAt),
});
}
emittedText = true;
write("token", {
session_id: clientSessionId,
content: event.properties.delta,
});
} else if (partType === "reasoning") {
if (!firstReasoningLogged) {
firstReasoningLogged = true;
logDevelopmentDebug("first reasoning delta received", {
...debugContext,
partId: event.properties.partID,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
sincePromptDispatchMs: Math.max(0, Date.now() - promptStartedAt),
});
}
const pending = reasoningDeltas.get(event.properties.partID) ?? [];
pending.push(event.properties.delta);
reasoningDeltas.set(event.properties.partID, pending);
} else if (!partType) {
const pending = pendingPartTextDeltas.get(event.properties.partID) ?? [];
pending.push(event.properties.delta);
pendingPartTextDeltas.set(event.properties.partID, pending);
}
continue;
}
if (event.type === "message.part.updated") {
sawResponseActivity = true;
const part = event.properties.part;
partTypes.set(part.id, part.type);
if (part.type === "text") {
const pending = pendingPartTextDeltas.get(part.id) ?? [];
pendingPartTextDeltas.delete(part.id);
for (const content of pending) {
emittedText = true;
write("token", {
session_id: clientSessionId,
content,
});
}
} else if (part.type === "reasoning") {
const pending = pendingPartTextDeltas.get(part.id) ?? [];
if (pending.length > 0) {
const existing = reasoningDeltas.get(part.id) ?? [];
reasoningDeltas.set(part.id, existing.concat(pending));
}
pendingPartTextDeltas.delete(part.id);
const reasoningStatus = part.time.end ? "completed" : "running";
if (reasoningStatuses.get(part.id) !== reasoningStatus) {
reasoningStatuses.set(part.id, reasoningStatus);
logDevelopmentDebug("reasoning part status changed", {
...debugContext,
partId: part.id,
status: reasoningStatus,
chunkCount: (reasoningDeltas.get(part.id) ?? []).length,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
}
const reasoningDetail = buildReasoningProgressDetail(
reasoningDeltas.get(part.id) ?? [],
part.time.end,
);
emitProgress({
id: part.id,
phase: "planning",
status: part.time.end ? "completed" : "running",
title: part.time.end ? "分析规划完成" : "正在规划分析步骤",
detail: reasoningDetail,
});
}
if (part.type === "tool") {
if (!firstToolEventLogged) {
firstToolEventLogged = true;
logDevelopmentDebug("first tool event received", {
...debugContext,
partId: part.id,
tool: part.tool,
status: part.state.status,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
sincePromptDispatchMs: Math.max(0, Date.now() - promptStartedAt),
});
}
const toolParams = normalizeToolParams(part.state.input);
const reason = extractRequestReason(toolParams);
const isToolFinalState =
part.state.status === "completed" || part.state.status === "error";
const nextToolStatus = String(part.state.status);
if (toolStatuses.get(part.id) !== nextToolStatus) {
toolStatuses.set(part.id, nextToolStatus);
logDevelopmentDebug("tool part status changed", {
...debugContext,
partId: part.id,
tool: part.tool,
status: nextToolStatus,
reason: reason || null,
inputKeys: Object.keys(toolParams).slice(0, 8),
error:
part.state.status === "error" ? (part.state.error ?? "unknown") : null,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
}
emitProgress({
id: part.id,
phase: "tool",
status: normalizeToolStatus(part.state.status),
title: getToolProgressTitle(part.tool, part.state.status),
detail: buildToolProgressDetail(
part.tool,
part.state.status,
toolParams,
reason,
part.state.status === "error" ? part.state.error : undefined,
),
});
if (
!emittedToolParts.has(part.id) &&
(hasToolParams(toolParams) || isToolFinalState)
) {
emittedToolParts.add(part.id);
toolCallCount += 1;
if (!reason) {
logger.warn(
{
tool: part.tool,
sessionId: sessionId,
clientSessionId,
},
"llm tool request missing reason",
);
}
void writeLlmRequestAuditLog({
kind: "tool",
sessionId: sessionId,
clientSessionId,
traceId,
projectId,
target: part.tool,
reason,
reasonProvided: Boolean(reason),
payload: toolParams,
}).catch((error) => {
logger.warn({ err: error }, "failed to write tool audit log");
});
write("tool_call", {
session_id: clientSessionId,
tool: part.tool,
params: toolParams,
reason,
});
}
}
continue;
}
if (event.type === "todo.updated") {
sawResponseActivity = true;
const completed = event.properties.todos.filter(
(todo) => todo.status === "completed",
).length;
emitProgress({
id: "todo-progress",
phase: "planning",
status: completed === event.properties.todos.length ? "completed" : "running",
title: `计划进度 ${completed}/${event.properties.todos.length}`,
detail: event.properties.todos
.map((todo) => `${todo.status}: ${todo.content}`)
.join("\n"),
});
continue;
}
if (event.type === "session.error") {
sawResponseActivity = true;
logDevelopmentDebug("session error received", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
error: event.properties.error
? getErrorMessage(event.properties.error)
: "opencode session error",
});
write("error", {
session_id: clientSessionId,
message: event.properties.error
? getErrorMessage(event.properties.error)
: "opencode session error",
detail: event.properties.error?.name,
total_duration_ms: Math.max(0, Date.now() - requestStartedAt),
});
failed = true;
done = true;
continue;
}
if (event.type === "session.idle") {
if (!sawResponseActivity) {
logDevelopmentDebug("ignoring session idle before response activity", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
continue;
}
logDevelopmentDebug("session idle received", {
...debugContext,
emittedText,
toolCallCount,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
emitProgress({
id: "session-status",
phase: "session",
status: "completed",
title: "Agent 已完成处理",
detail: "当前会话已无待执行任务,正在收尾并准备返回最终结果。",
});
done = true;
}
}
if (aborted) {
logDevelopmentDebug("chat stream aborting session", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
await runtime.abortSession(sessionId).catch((error) => {
logger.warn({ sessionId: sessionId, err: error }, "failed to abort opencode session");
});
await runtime.waitForSessionIdle(sessionId).catch((error) => {
logger.warn(
{ sessionId: sessionId, err: error },
"failed while waiting for aborted opencode session to become idle",
);
});
return { aborted: true, failed: false, toolCallCount };
}
if (failed) {
return { aborted: false, failed: true, toolCallCount };
}
await promptPromise;
if (!emittedText) {
logDevelopmentDebug("no streamed text emitted, falling back to messages()", {
...debugContext,
elapsedMs: Math.max(0, Date.now() - requestStartedAt),
});
await emitFallbackMessage(runtime, sessionId, clientSessionId, write);
}
emitProgress({
id: "request-received",
phase: "start",
status: "completed",
title: "请求处理完成",
detail: "本次请求的分析、工具执行和结果整理流程已经完成。",
});
emitProgress({
id: "request-completed",
phase: "complete",
status: "completed",
title: "分析完成",
detail: emittedText
? "最终回答已生成并推送到前端。"
: "已完成分析,并通过兜底消息补发最终回答内容。",
});
write("done", {
session_id: clientSessionId,
total_duration_ms: Math.max(0, Date.now() - requestStartedAt),
});
logDevelopmentDebug("chat stream completed", {
...debugContext,
emittedText,
toolCallCount,
totalDurationMs: Math.max(0, Date.now() - requestStartedAt),
});
return { aborted: false, failed: false, toolCallCount };
} finally {
await iterator.return?.(undefined);
if (!promptSettled && !aborted) {
await promptPromise.catch(() => undefined);
} else if (!promptSettled) {
void promptPromise.catch(() => undefined);
}
logDevelopmentDebug("chat stream cleanup finished", {
...debugContext,
promptSettled,
totalDurationMs: Math.max(0, Date.now() - requestStartedAt),
});
}
};