fix(agents): add embedded item lifecycle events

This commit is contained in:
Peter Steinberger
2026-04-05 11:16:01 +01:00
parent 1ad5695aa4
commit af81ee9fee
21 changed files with 370 additions and 19 deletions

View File

@@ -78,4 +78,23 @@ describe("runEmbeddedPiAgent incomplete-turn safety", () => {
expect(retryInstruction).toBeNull();
});
it("does not retry planning-only detection after an item has started", () => {
const retryInstruction = resolvePlanningOnlyRetryInstruction({
provider: "openai",
modelId: "gpt-5.4",
aborted: false,
timedOut: false,
attempt: makeAttemptResult({
assistantTexts: ["I'll inspect the code, make the change, and run the checks."],
itemLifecycle: {
startedCount: 1,
completedCount: 0,
activeCount: 1,
},
}),
});
expect(retryInstruction).toBeNull();
});
});

View File

@@ -49,6 +49,11 @@ export function makeAttemptResult(
didSendViaMessagingTool,
successfulCronAdds,
}),
itemLifecycle: {
startedCount: 0,
completedCount: 0,
activeCount: 0,
},
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],

View File

@@ -76,6 +76,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
getLastToolError: () => undefined,
getUsageTotals: () => undefined,
getCompactionCount: () => 0,
getItemLifecycle: () => ({ startedCount: 0, completedCount: 0, activeCount: 0 }),
isCompacting: () => false,
isCompactionInFlight: () => false,
}) satisfies SubscriptionMock,
@@ -539,6 +540,7 @@ export function createSubscriptionMock(): SubscriptionMock {
getLastToolError: () => undefined,
getUsageTotals: () => undefined,
getCompactionCount: () => 0,
getItemLifecycle: () => ({ startedCount: 0, completedCount: 0, activeCount: 0 }),
isCompacting: () => false,
isCompactionInFlight: () => false,
};

View File

@@ -1343,6 +1343,7 @@ export async function runEmbeddedAttempt(
unsubscribe,
waitForCompactionRetry,
isCompactionInFlight,
getItemLifecycle,
getMessagingToolSentTexts,
getMessagingToolSentMediaUrls,
getMessagingToolSentTargets,
@@ -2022,6 +2023,7 @@ export async function runEmbeddedAttempt(
didSendViaMessagingTool: didSendViaMessagingTool(),
successfulCronAdds: getSuccessfulCronAdds(),
}),
itemLifecycle: getItemLifecycle(),
aborted,
timedOut,
timedOutDuringCompaction,

View File

@@ -25,6 +25,7 @@ type PlanningOnlyAttempt = Pick<
| "didSendViaMessagingTool"
| "lastToolError"
| "lastAssistant"
| "itemLifecycle"
| "replayMetadata"
| "toolMetas"
>;
@@ -106,7 +107,7 @@ export function resolvePlanningOnlyRetryInstruction(params: {
params.attempt.didSendDeterministicApprovalPrompt ||
params.attempt.didSendViaMessagingTool ||
params.attempt.lastToolError ||
params.attempt.toolMetas.length > 0 ||
params.attempt.itemLifecycle.startedCount > 0 ||
params.attempt.replayMetadata.hadPotentialSideEffects
) {
return null;

View File

@@ -67,4 +67,9 @@ export type EmbeddedRunAttemptResult = {
hadPotentialSideEffects: boolean;
replaySafe: boolean;
};
itemLifecycle: {
startedCount: number;
completedCount: number;
activeCount: number;
};
};

View File

@@ -33,6 +33,11 @@ function makeAttemptResult(
didSendViaMessagingTool,
successfulCronAdds,
}),
itemLifecycle: {
startedCount: 0,
completedCount: 0,
activeCount: 0,
},
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],

View File

@@ -21,6 +21,9 @@ function createMockContext(overrides?: {
toolMetaById: new Map(),
toolMetas: [],
toolSummaryById: new Set(),
itemActiveIds: new Set(),
itemStartedCount: 0,
itemCompletedCount: 0,
pendingMessagingTexts: new Map(),
pendingMessagingTargets: new Map(),
pendingMessagingMediaUrls: new Map(),

View File

@@ -37,6 +37,9 @@ function createTestContext(): {
toolMetaById: new Map<string, ToolCallSummary>(),
toolMetas: [],
toolSummaryById: new Set<string>(),
itemActiveIds: new Set<string>(),
itemStartedCount: 0,
itemCompletedCount: 0,
pendingMessagingTargets: new Map<string, MessagingToolSend>(),
pendingMessagingTexts: new Map<string, string>(),
pendingMessagingMediaUrls: new Map<string, string[]>(),
@@ -121,6 +124,8 @@ describe("handleToolExecutionStart read path checks", () => {
await pending;
expect(ctx.state.toolMetaById.has("tool-await-flush")).toBe(true);
expect(ctx.state.itemStartedCount).toBe(1);
expect(ctx.state.itemActiveIds.has("tool:tool-await-flush")).toBe(true);
});
});
@@ -175,6 +180,8 @@ describe("handleToolExecutionEnd cron.add commitment tracking", () => {
);
expect(ctx.state.successfulCronAdds).toBe(0);
expect(ctx.state.itemCompletedCount).toBe(1);
expect(ctx.state.itemActiveIds.size).toBe(0);
});
});

View File

@@ -1,5 +1,6 @@
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { emitAgentEvent } from "../infra/agent-events.js";
import type { AgentItemEventData } from "../infra/agent-events.js";
import { emitAgentEvent, emitAgentItemEvent } from "../infra/agent-events.js";
import {
buildExecApprovalPendingReplyPayload,
buildExecApprovalUnavailableReplyPayload,
@@ -58,6 +59,14 @@ function buildToolCallSummary(toolName: string, args: unknown, meta?: string): T
};
}
function buildToolItemId(toolCallId: string): string {
return `tool:${toolCallId}`;
}
function buildToolItemTitle(toolName: string, meta?: string): string {
return meta ? `${toolName} ${meta}` : toolName;
}
function extendExecMeta(toolName: string, args: unknown, meta?: string): string | undefined {
const normalized = toolName.trim().toLowerCase();
if (normalized !== "exec" && normalized !== "bash") {
@@ -358,8 +367,9 @@ export function handleToolExecutionStart(
const args = evt.args;
const runId = ctx.params.runId;
// Track start time and args for after_tool_call hook
toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args });
// Track start time and args for after_tool_call hook.
const startedAt = Date.now();
toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: startedAt, args });
if (toolName === "read") {
const record = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
@@ -395,11 +405,33 @@ export function handleToolExecutionStart(
args: args as Record<string, unknown>,
},
});
const itemData: AgentItemEventData = {
itemId: buildToolItemId(toolCallId),
phase: "start",
kind: "tool",
title: buildToolItemTitle(toolName, meta),
status: "running",
name: toolName,
meta,
toolCallId,
startedAt,
};
ctx.state.itemActiveIds.add(itemData.itemId);
ctx.state.itemStartedCount += 1;
emitAgentItemEvent({
runId: ctx.params.runId,
...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}),
data: itemData,
});
// Best-effort typing signal; do not block tool summaries on slow emitters.
void ctx.params.onAgentEvent?.({
stream: "tool",
data: { phase: "start", name: toolName, toolCallId },
});
void ctx.params.onAgentEvent?.({
stream: "item",
data: itemData,
});
if (
ctx.params.onToolResult &&
@@ -464,6 +496,21 @@ export function handleToolExecutionUpdate(
partialResult: sanitized,
},
});
const itemData: AgentItemEventData = {
itemId: buildToolItemId(toolCallId),
phase: "update",
kind: "tool",
title: buildToolItemTitle(toolName, ctx.state.toolMetaById.get(toolCallId)?.meta),
status: "running",
name: toolName,
meta: ctx.state.toolMetaById.get(toolCallId)?.meta,
toolCallId,
};
emitAgentItemEvent({
runId: ctx.params.runId,
...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}),
data: itemData,
});
void ctx.params.onAgentEvent?.({
stream: "tool",
data: {
@@ -472,6 +519,10 @@ export function handleToolExecutionUpdate(
toolCallId,
},
});
void ctx.params.onAgentEvent?.({
stream: "item",
data: itemData,
});
}
export async function handleToolExecutionEnd(
@@ -586,6 +637,30 @@ export async function handleToolExecutionEnd(
result: sanitizedResult,
},
});
const endedAt = Date.now();
const itemId = buildToolItemId(toolCallId);
ctx.state.itemActiveIds.delete(itemId);
ctx.state.itemCompletedCount += 1;
const itemData: AgentItemEventData = {
itemId,
phase: "end",
kind: "tool",
title: buildToolItemTitle(toolName, meta),
status: isToolError ? "failed" : "completed",
name: toolName,
meta,
toolCallId,
startedAt: startData?.startTime,
endedAt,
...(isToolError && extractToolErrorMessage(sanitizedResult)
? { error: extractToolErrorMessage(sanitizedResult) }
: {}),
};
emitAgentItemEvent({
runId: ctx.params.runId,
...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}),
data: itemData,
});
void ctx.params.onAgentEvent?.({
stream: "tool",
data: {
@@ -596,6 +671,10 @@ export async function handleToolExecutionEnd(
isError: isToolError,
},
});
void ctx.params.onAgentEvent?.({
stream: "item",
data: itemData,
});
ctx.log.debug(
`embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,

View File

@@ -29,6 +29,9 @@ export type EmbeddedPiSubscribeState = {
toolMetas: Array<{ toolName?: string; meta?: string }>;
toolMetaById: Map<string, ToolCallSummary>;
toolSummaryById: Set<string>;
itemActiveIds: Set<string>;
itemStartedCount: number;
itemCompletedCount: number;
lastToolError?: ToolErrorSummary;
blockReplyBreak: "text_end" | "message_end";
@@ -144,6 +147,9 @@ export type ToolHandlerState = Pick<
| "toolMetaById"
| "toolMetas"
| "toolSummaryById"
| "itemActiveIds"
| "itemStartedCount"
| "itemCompletedCount"
| "lastToolError"
| "pendingMessagingTargets"
| "pendingMessagingTexts"

View File

@@ -45,6 +45,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
toolMetas: [],
toolMetaById: new Map(),
toolSummaryById: new Set(),
itemActiveIds: new Set(),
itemStartedCount: 0,
itemCompletedCount: 0,
lastToolError: undefined,
blockReplyBreak: params.blockReplyBreak ?? "text_end",
reasoningMode,
@@ -645,6 +648,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
toolMetas.length = 0;
toolMetaById.clear();
toolSummaryById.clear();
state.itemActiveIds.clear();
state.itemStartedCount = 0;
state.itemCompletedCount = 0;
state.lastToolError = undefined;
messagingToolSentTexts.length = 0;
messagingToolSentTextsNormalized.length = 0;
@@ -752,6 +758,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined),
getUsageTotals,
getCompactionCount: () => compactionCount,
getItemLifecycle: () => ({
startedCount: state.itemStartedCount,
completedCount: state.itemCompletedCount,
activeCount: state.itemActiveIds.size,
}),
waitForCompactionRetry: () => {
// Reject after unsubscribe so callers treat it as cancellation, not success
if (state.unsubscribed) {

View File

@@ -86,6 +86,7 @@ async function loadFreshAfterToolCallModulesForTest() {
}));
vi.doMock("../infra/agent-events.js", () => ({
emitAgentEvent: vi.fn(),
emitAgentItemEvent: vi.fn(),
}));
vi.doMock("./pi-tools.before-tool-call.js", () => ({
consumeAdjustedParamsForToolCall: beforeToolCallMocks.consumeAdjustedParamsForToolCall,

View File

@@ -1,7 +1,11 @@
export function createBaseToolHandlerState() {
return {
toolMetaById: new Map<string, unknown>(),
toolMetas: [] as Array<{ toolName?: string; meta?: string }>,
toolSummaryById: new Set<string>(),
itemActiveIds: new Set<string>(),
itemStartedCount: 0,
itemCompletedCount: 0,
lastToolError: undefined,
pendingMessagingTexts: new Map<string, string>(),
pendingMessagingTargets: new Map<string, unknown>(),

View File

@@ -117,6 +117,14 @@ type FallbackRunnerParams = {
type EmbeddedAgentParams = {
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void> | void;
onItemEvent?: (payload: {
itemId?: string;
kind?: string;
title?: string;
name?: string;
phase?: string;
status?: string;
}) => Promise<void> | void;
onAgentEvent?: (payload: {
stream: string;
data: { phase?: string; completed?: boolean };
@@ -234,6 +242,65 @@ describe("runAgentTurnWithFallback", () => {
expect(onToolResult.mock.calls[0]?.[0]?.text).toBeUndefined();
});
it("forwards item lifecycle events to reply options", async () => {
const onItemEvent = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "item",
data: {
itemId: "tool:read-1",
kind: "tool",
title: "read",
name: "read",
phase: "start",
status: "running",
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void>>();
const typingSignals = createMockTypingSignaler();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: {
onItemEvent,
} satisfies GetReplyOptions,
typingSignals,
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success");
expect(onItemEvent).toHaveBeenCalledWith({
itemId: "tool:read-1",
kind: "tool",
title: "read",
name: "read",
phase: "start",
status: "running",
});
});
it("keeps compaction start notices silent by default", async () => {
const onBlockReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {

View File

@@ -718,6 +718,16 @@ export async function runAgentTurnWithFallback(params: {
await params.opts?.onToolStart?.({ name, phase });
}
}
if (evt.stream === "item") {
await params.opts?.onItemEvent?.({
itemId: typeof evt.data.itemId === "string" ? evt.data.itemId : undefined,
kind: typeof evt.data.kind === "string" ? evt.data.kind : undefined,
title: typeof evt.data.title === "string" ? evt.data.title : undefined,
name: typeof evt.data.name === "string" ? evt.data.name : undefined,
phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined,
status: typeof evt.data.status === "string" ? evt.data.status : undefined,
});
}
// Track auto-compaction and notify higher layers.
if (evt.stream === "compaction") {
const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";

View File

@@ -929,6 +929,61 @@ describe("dispatchReplyFromConfig", () => {
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" });
});
it("prefers item-start progress updates for direct sessions", async () => {
setNoAbort();
const cfg = emptyConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "telegram",
ChatType: "direct",
});
const replyResolver = async (
_ctx: MsgContext,
opts?: GetReplyOptions,
_cfg?: OpenClawConfig,
) => {
await opts?.onItemEvent?.({
itemId: "tool:read-1",
kind: "tool",
title: "read config",
name: "read",
phase: "start",
status: "running",
});
await opts?.onItemEvent?.({
itemId: "tool:read-1",
kind: "tool",
title: "read config",
name: "read",
phase: "end",
status: "completed",
});
await opts?.onItemEvent?.({
itemId: "tool:grep-1",
kind: "tool",
title: "grep",
name: "grep",
phase: "start",
status: "running",
});
return { text: "done" } satisfies ReplyPayload;
};
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ text: "Working: read" }),
);
expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ text: "Working: grep" }),
);
expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(2);
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" });
});
it("delivers deterministic exec approval tool payloads for native commands", async () => {
setNoAbort();
const cfg = emptyConfig;

View File

@@ -603,6 +603,26 @@ export async function dispatchReplyFromConfig(params: {
const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true;
const toolStartStatusesSent = new Set<string>();
let toolStartStatusCount = 0;
const maybeSendWorkingStatus = (label: string) => {
const normalizedLabel = label.trim();
if (
!shouldSendToolStartStatuses ||
!normalizedLabel ||
toolStartStatusCount >= 2 ||
toolStartStatusesSent.has(normalizedLabel)
) {
return;
}
toolStartStatusesSent.add(normalizedLabel);
toolStartStatusCount += 1;
const payload: ReplyPayload = {
text: `Working: ${normalizedLabel}`,
};
if (shouldRouteToOriginating) {
return sendPayloadAsync(payload, undefined, false);
}
dispatcher.sendToolResult(payload);
};
const acpDispatch = await dispatchAcpRuntime.tryDispatchAcpReply({
ctx,
cfg,
@@ -702,26 +722,24 @@ export async function dispatchReplyFromConfig(params: {
return run();
},
onToolStart: ({ name, phase }) => {
if (!shouldSendToolStartStatuses || phase !== "start") {
if (phase !== "start") {
return;
}
const normalizedName = typeof name === "string" ? name.trim() : "";
if (
!normalizedName ||
toolStartStatusCount >= 2 ||
toolStartStatusesSent.has(normalizedName)
) {
if (typeof name !== "string") {
return;
}
toolStartStatusesSent.add(normalizedName);
toolStartStatusCount += 1;
const payload: ReplyPayload = {
text: `Working: ${normalizedName}`,
};
if (shouldRouteToOriginating) {
return sendPayloadAsync(payload, undefined, false);
return maybeSendWorkingStatus(name);
},
onItemEvent: ({ phase, name, title, kind }) => {
if (phase !== "start") {
return;
}
if (kind === "tool" && typeof name === "string" && name.trim()) {
return maybeSendWorkingStatus(name);
}
if (typeof title === "string") {
return maybeSendWorkingStatus(title);
}
dispatcher.sendToolResult(payload);
},
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
const run = async () => {

View File

@@ -64,6 +64,15 @@ export type GetReplyOptions = {
onToolResult?: (payload: ReplyPayload) => Promise<void> | void;
/** Called when a tool phase starts/updates, before summary payloads are emitted. */
onToolStart?: (payload: { name?: string; phase?: string }) => Promise<void> | void;
/** Called when a concrete work item starts, updates, or completes. */
onItemEvent?: (payload: {
itemId?: string;
kind?: string;
title?: string;
name?: string;
phase?: string;
status?: string;
}) => Promise<void> | void;
/** Called when context auto-compaction starts (allows UX feedback during the pause). */
onCompactionStart?: () => Promise<void> | void;
/** Called when context auto-compaction completes. */

View File

@@ -729,6 +729,7 @@ export function createAgentEventHandler({
const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients;
const last = agentRunSeq.get(evt.runId) ?? 0;
const isToolEvent = evt.stream === "tool";
const isItemEvent = evt.stream === "item";
const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off";
// Build tool payload: strip result/partialResult unless verbose=full
const toolPayload =
@@ -792,6 +793,10 @@ export function createAgentEventHandler({
}
}
} else {
const itemPhase = isItemEvent && typeof evt.data?.phase === "string" ? evt.data.phase : "";
if (itemPhase === "start" && isControlUiVisible && sessionKey && !isAborted) {
flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq);
}
broadcast("agent", agentPayload);
}

View File

@@ -4,6 +4,30 @@ import { notifyListeners, registerListener } from "../shared/listeners.js";
export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {});
export type AgentItemEventPhase = "start" | "update" | "end";
export type AgentItemEventStatus = "running" | "completed" | "failed" | "blocked";
export type AgentItemEventKind =
| "tool"
| "command"
| "patch"
| "search"
| "analysis"
| (string & {});
export type AgentItemEventData = {
itemId: string;
phase: AgentItemEventPhase;
kind: AgentItemEventKind;
title: string;
status: AgentItemEventStatus;
name?: string;
meta?: string;
toolCallId?: string;
startedAt?: number;
endedAt?: number;
error?: string;
};
export type AgentEventPayload = {
runId: string;
seq: number;
@@ -91,6 +115,19 @@ export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
notifyListeners(state.listeners, enriched);
}
export function emitAgentItemEvent(params: {
runId: string;
data: AgentItemEventData;
sessionKey?: string;
}) {
emitAgentEvent({
runId: params.runId,
stream: "item",
data: params.data as unknown as Record<string, unknown>,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
});
}
export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
const state = getAgentEventState();
return registerListener(state.listeners, listener);