feat(agents): add structured execution item events

This commit is contained in:
Peter Steinberger
2026-04-05 12:34:51 +01:00
parent 3b7e6152d1
commit 996dccb19c
11 changed files with 1130 additions and 48 deletions

View File

@@ -124,18 +124,14 @@ type EmbeddedAgentParams = {
name?: string;
phase?: string;
status?: string;
summary?: string;
progressText?: string;
approvalId?: string;
approvalSlug?: string;
}) => Promise<void> | void;
onAgentEvent?: (payload: {
stream: string;
data: {
itemId?: string;
kind?: string;
title?: string;
name?: string;
phase?: string;
status?: string;
completed?: boolean;
};
data: Record<string, unknown>;
}) => Promise<void> | void;
};
@@ -309,6 +305,134 @@ describe("runAgentTurnWithFallback", () => {
});
});
it("forwards plan, approval, command output, and patch events", async () => {
const onPlanUpdate = vi.fn();
const onApprovalEvent = vi.fn();
const onCommandOutput = vi.fn();
const onPatchSummary = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "plan",
data: {
phase: "update",
title: "Assistant proposed a plan",
explanation: "Inspect code, patch it, run tests.",
steps: ["Inspect code", "Patch code", "Run tests"],
},
});
await params.onAgentEvent?.({
stream: "approval",
data: {
phase: "requested",
kind: "exec",
status: "pending",
title: "Command approval requested",
approvalId: "approval-1",
},
});
await params.onAgentEvent?.({
stream: "command_output",
data: {
itemId: "command:exec-1",
phase: "delta",
title: "command ls",
toolCallId: "exec-1",
output: "README.md",
},
});
await params.onAgentEvent?.({
stream: "patch",
data: {
itemId: "patch:patch-1",
phase: "end",
title: "apply patch",
toolCallId: "patch-1",
added: ["a.ts"],
modified: ["b.ts"],
deleted: [],
summary: "1 added, 1 modified",
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void>>();
await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: {
onPlanUpdate,
onApprovalEvent,
onCommandOutput,
onPatchSummary,
} satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(onPlanUpdate).toHaveBeenCalledWith({
phase: "update",
title: "Assistant proposed a plan",
explanation: "Inspect code, patch it, run tests.",
steps: ["Inspect code", "Patch code", "Run tests"],
source: undefined,
});
expect(onApprovalEvent).toHaveBeenCalledWith({
phase: "requested",
kind: "exec",
status: "pending",
title: "Command approval requested",
itemId: undefined,
toolCallId: undefined,
approvalId: "approval-1",
approvalSlug: undefined,
command: undefined,
host: undefined,
reason: undefined,
message: undefined,
});
expect(onCommandOutput).toHaveBeenCalledWith({
itemId: "command:exec-1",
phase: "delta",
title: "command ls",
toolCallId: "exec-1",
name: undefined,
output: "README.md",
status: undefined,
exitCode: undefined,
durationMs: undefined,
cwd: undefined,
});
expect(onPatchSummary).toHaveBeenCalledWith({
itemId: "patch:patch-1",
phase: "end",
title: "apply patch",
toolCallId: "patch-1",
name: undefined,
added: ["a.ts"],
modified: ["b.ts"],
deleted: [],
summary: "1 added, 1 modified",
});
});
it("keeps compaction start notices silent by default", async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {

View File

@@ -726,6 +726,95 @@ export async function runAgentTurnWithFallback(params: {
name: typeof evt.data.name === "string" ? evt.data.name : undefined,
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
status: typeof evt.data.status === "string" ? evt.data.status : undefined,
summary: typeof evt.data.summary === "string" ? evt.data.summary : undefined,
progressText:
typeof evt.data.progressText === "string"
? evt.data.progressText
: undefined,
approvalId:
typeof evt.data.approvalId === "string" ? evt.data.approvalId : undefined,
approvalSlug:
typeof evt.data.approvalSlug === "string"
? evt.data.approvalSlug
: undefined,
});
}
if (evt.stream === "plan") {
await params.opts?.onPlanUpdate?.({
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
title: typeof evt.data.title === "string" ? evt.data.title : undefined,
explanation:
typeof evt.data.explanation === "string" ? evt.data.explanation : undefined,
steps: Array.isArray(evt.data.steps)
? evt.data.steps.filter((step): step is string => typeof step === "string")
: undefined,
source: typeof evt.data.source === "string" ? evt.data.source : undefined,
});
}
if (evt.stream === "approval") {
await params.opts?.onApprovalEvent?.({
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
kind: typeof evt.data.kind === "string" ? evt.data.kind : undefined,
status: typeof evt.data.status === "string" ? evt.data.status : undefined,
title: typeof evt.data.title === "string" ? evt.data.title : undefined,
itemId: typeof evt.data.itemId === "string" ? evt.data.itemId : undefined,
toolCallId:
typeof evt.data.toolCallId === "string" ? evt.data.toolCallId : undefined,
approvalId:
typeof evt.data.approvalId === "string" ? evt.data.approvalId : undefined,
approvalSlug:
typeof evt.data.approvalSlug === "string"
? evt.data.approvalSlug
: undefined,
command: typeof evt.data.command === "string" ? evt.data.command : undefined,
host: typeof evt.data.host === "string" ? evt.data.host : undefined,
reason: typeof evt.data.reason === "string" ? evt.data.reason : undefined,
message: typeof evt.data.message === "string" ? evt.data.message : undefined,
});
}
if (evt.stream === "command_output") {
await params.opts?.onCommandOutput?.({
itemId: typeof evt.data.itemId === "string" ? evt.data.itemId : undefined,
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
title: typeof evt.data.title === "string" ? evt.data.title : undefined,
toolCallId:
typeof evt.data.toolCallId === "string" ? evt.data.toolCallId : undefined,
name: typeof evt.data.name === "string" ? evt.data.name : undefined,
output: typeof evt.data.output === "string" ? evt.data.output : undefined,
status: typeof evt.data.status === "string" ? evt.data.status : undefined,
exitCode:
typeof evt.data.exitCode === "number" || evt.data.exitCode === null
? evt.data.exitCode
: undefined,
durationMs:
typeof evt.data.durationMs === "number" ? evt.data.durationMs : undefined,
cwd: typeof evt.data.cwd === "string" ? evt.data.cwd : undefined,
});
}
if (evt.stream === "patch") {
await params.opts?.onPatchSummary?.({
itemId: typeof evt.data.itemId === "string" ? evt.data.itemId : undefined,
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
title: typeof evt.data.title === "string" ? evt.data.title : undefined,
toolCallId:
typeof evt.data.toolCallId === "string" ? evt.data.toolCallId : undefined,
name: typeof evt.data.name === "string" ? evt.data.name : undefined,
added: Array.isArray(evt.data.added)
? evt.data.added.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined,
modified: Array.isArray(evt.data.modified)
? evt.data.modified.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined,
deleted: Array.isArray(evt.data.deleted)
? evt.data.deleted.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined,
summary: typeof evt.data.summary === "string" ? evt.data.summary : undefined,
});
}
// Track auto-compaction and notify higher layers.

View File

@@ -984,6 +984,79 @@ describe("dispatchReplyFromConfig", () => {
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" });
});
it("renders concise plan and approval progress updates for direct sessions", async () => {
setNoAbort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "telegram",
ChatType: "direct",
});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
_cfg?: OpenClawConfig,
) => {
await opts?.onPlanUpdate?.({
phase: "update",
explanation: "Inspect code, patch it, run tests.",
steps: ["Inspect code", "Patch code", "Run tests"],
});
await opts?.onApprovalEvent?.({
phase: "requested",
status: "pending",
command: "pnpm test",
});
return { text: "done" } satisfies ReplyPayload;
};
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ text: "Working: Inspect code" }),
);
expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ text: "Working: awaiting approval: pnpm test" }),
);
expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(2);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" });
});
it("renders concise patch summaries for direct sessions", async () => {
setNoAbort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "telegram",
ChatType: "direct",
});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
_cfg?: OpenClawConfig,
) => {
await opts?.onPatchSummary?.({
phase: "end",
title: "apply patch",
summary: "1 added, 2 modified",
});
return { text: "done" } satisfies ReplyPayload;
};
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ text: "Working: 1 added, 2 modified" }),
);
expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(1);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" });
});
it("delivers deterministic exec approval tool payloads for native commands", async () => {
setNoAbort();
const cfg = emptyConfig;

View File

@@ -603,8 +603,25 @@ export async function dispatchReplyFromConfig(params: {
const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true;
const toolStartStatusesSent = new Set<string>();
let toolStartStatusCount = 0;
const normalizeWorkingLabel = (label: string) => {
const collapsed = label.replace(/\s+/g, " ").trim();
if (collapsed.length <= 80) {
return collapsed;
}
return `${collapsed.slice(0, 77).trimEnd()}...`;
};
const summarizePlanLabel = (payload: { explanation?: string; steps?: string[] }) => {
const firstStep = payload.steps?.find((step) => typeof step === "string" && step.trim());
if (firstStep) {
return normalizeWorkingLabel(firstStep);
}
if (payload.explanation?.trim()) {
return normalizeWorkingLabel(payload.explanation);
}
return "planning next steps";
};
const maybeSendWorkingStatus = (label: string) => {
const normalizedLabel = label.trim();
const normalizedLabel = normalizeWorkingLabel(label);
if (
!shouldSendToolStartStatuses ||
!normalizedLabel ||
@@ -623,6 +640,34 @@ export async function dispatchReplyFromConfig(params: {
}
dispatcher.sendToolResult(payload);
};
const summarizeApprovalLabel = (payload: {
status?: string;
command?: string;
message?: string;
}) => {
if (payload.status === "pending") {
if (payload.command?.trim()) {
return normalizeWorkingLabel(`awaiting approval: ${payload.command}`);
}
return "awaiting approval";
}
if (payload.status === "unavailable") {
if (payload.message?.trim()) {
return normalizeWorkingLabel(payload.message);
}
return "approval unavailable";
}
return "";
};
const summarizePatchLabel = (payload: { summary?: string; title?: string }) => {
if (payload.summary?.trim()) {
return normalizeWorkingLabel(payload.summary);
}
if (payload.title?.trim()) {
return normalizeWorkingLabel(payload.title);
}
return "";
};
const acpDispatch = await dispatchAcpRuntime.tryDispatchAcpReply({
ctx,
cfg,
@@ -741,6 +786,32 @@ export async function dispatchReplyFromConfig(params: {
return maybeSendWorkingStatus(title);
}
},
onPlanUpdate: ({ phase, explanation, steps }) => {
if (phase !== "update") {
return;
}
return maybeSendWorkingStatus(summarizePlanLabel({ explanation, steps }));
},
onApprovalEvent: ({ phase, status, command, message }) => {
if (phase !== "requested") {
return;
}
const label = summarizeApprovalLabel({ status, command, message });
if (!label) {
return;
}
return maybeSendWorkingStatus(label);
},
onPatchSummary: ({ phase, summary, title }) => {
if (phase !== "end") {
return;
}
const label = summarizePatchLabel({ summary, title });
if (!label) {
return;
}
return maybeSendWorkingStatus(label);
},
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
const run = async () => {
// Suppress reasoning payloads — channels using this generic dispatch

View File

@@ -72,6 +72,58 @@ export type GetReplyOptions = {
name?: string;
phase?: string;
status?: string;
summary?: string;
progressText?: string;
approvalId?: string;
approvalSlug?: string;
}) => Promise<void> | void;
/** Called when the agent emits a structured plan update. */
onPlanUpdate?: (payload: {
phase?: string;
title?: string;
explanation?: string;
steps?: string[];
source?: string;
}) => Promise<void> | void;
/** Called when an approval becomes pending or resolves. */
onApprovalEvent?: (payload: {
phase?: string;
kind?: string;
status?: string;
title?: string;
itemId?: string;
toolCallId?: string;
approvalId?: string;
approvalSlug?: string;
command?: string;
host?: string;
reason?: string;
message?: string;
}) => Promise<void> | void;
/** Called when command output streams or completes. */
onCommandOutput?: (payload: {
itemId?: string;
phase?: string;
title?: string;
toolCallId?: string;
name?: string;
output?: string;
status?: string;
exitCode?: number | null;
durationMs?: number;
cwd?: string;
}) => Promise<void> | void;
/** Called when a patch completes with a file summary. */
onPatchSummary?: (payload: {
itemId?: string;
phase?: string;
title?: string;
toolCallId?: string;
name?: string;
added?: string[];
modified?: string[];
deleted?: string[];
summary?: string;
}) => Promise<void> | void;
/** Called when context auto-compaction starts (allows UX feedback during the pause). */
onCompactionStart?: () => Promise<void> | void;