From af81ee9fee8bc11c8f1253fcc5d4594811d813d4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 5 Apr 2026 11:16:01 +0100 Subject: [PATCH] fix(agents): add embedded item lifecycle events --- .../run.incomplete-turn.test.ts | 19 +++++ .../run.overflow-compaction.fixture.ts | 5 ++ .../attempt.spawn-workspace.test-support.ts | 2 + src/agents/pi-embedded-runner/run/attempt.ts | 2 + .../pi-embedded-runner/run/incomplete-turn.ts | 3 +- src/agents/pi-embedded-runner/run/types.ts | 5 ++ .../usage-reporting.test.ts | 5 ++ ...ded-subscribe.handlers.tools.media.test.ts | 3 + ...-embedded-subscribe.handlers.tools.test.ts | 7 ++ .../pi-embedded-subscribe.handlers.tools.ts | 85 ++++++++++++++++++- .../pi-embedded-subscribe.handlers.types.ts | 6 ++ src/agents/pi-embedded-subscribe.ts | 11 +++ ...adapter.after-tool-call.fires-once.test.ts | 1 + .../pi-tool-handler-state.test-helpers.ts | 4 + .../reply/agent-runner-execution.test.ts | 67 +++++++++++++++ .../reply/agent-runner-execution.ts | 10 +++ .../reply/dispatch-from-config.test.ts | 55 ++++++++++++ src/auto-reply/reply/dispatch-from-config.ts | 48 +++++++---- src/auto-reply/types.ts | 9 ++ src/gateway/server-chat.ts | 5 ++ src/infra/agent-events.ts | 37 ++++++++ 21 files changed, 370 insertions(+), 19 deletions(-) diff --git a/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts b/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts index 3313326984f..cb580f5759c 100644 --- a/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts +++ b/src/agents/pi-embedded-runner/run.incomplete-turn.test.ts @@ -78,4 +78,23 @@ describe("runEmbeddedPiAgent incomplete-turn safety", () => { expect(retryInstruction).toBeNull(); }); + + it("does not retry planning-only detection after an item has started", () => { + const retryInstruction = resolvePlanningOnlyRetryInstruction({ + provider: "openai", + modelId: "gpt-5.4", + aborted: false, + timedOut: false, + attempt: makeAttemptResult({ + assistantTexts: ["I'll inspect the code, make the change, and run the checks."], + itemLifecycle: { + startedCount: 1, + completedCount: 0, + activeCount: 1, + }, + }), + }); + + expect(retryInstruction).toBeNull(); + }); }); diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts index b2487777e4e..ce8e80e030a 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.fixture.ts @@ -49,6 +49,11 @@ export function makeAttemptResult( didSendViaMessagingTool, successfulCronAdds, }), + itemLifecycle: { + startedCount: 0, + completedCount: 0, + activeCount: 0, + }, didSendViaMessagingTool, messagingToolSentTexts: [], messagingToolSentMediaUrls: [], diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index 9303c0e2452..86a418b5f0b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -76,6 +76,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { getLastToolError: () => undefined, getUsageTotals: () => undefined, getCompactionCount: () => 0, + getItemLifecycle: () => ({ startedCount: 0, completedCount: 0, activeCount: 0 }), isCompacting: () => false, isCompactionInFlight: () => false, }) satisfies SubscriptionMock, @@ -539,6 +540,7 @@ export function createSubscriptionMock(): SubscriptionMock { getLastToolError: () => undefined, getUsageTotals: () => undefined, getCompactionCount: () => 0, + getItemLifecycle: () => ({ startedCount: 0, completedCount: 0, activeCount: 0 }), isCompacting: () => false, isCompactionInFlight: () => false, }; diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 9839069988e..114802e8f81 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -1343,6 +1343,7 @@ export async function runEmbeddedAttempt( unsubscribe, waitForCompactionRetry, isCompactionInFlight, + getItemLifecycle, getMessagingToolSentTexts, getMessagingToolSentMediaUrls, getMessagingToolSentTargets, @@ -2022,6 +2023,7 @@ export async function runEmbeddedAttempt( didSendViaMessagingTool: didSendViaMessagingTool(), successfulCronAdds: getSuccessfulCronAdds(), }), + itemLifecycle: getItemLifecycle(), aborted, timedOut, timedOutDuringCompaction, diff --git a/src/agents/pi-embedded-runner/run/incomplete-turn.ts b/src/agents/pi-embedded-runner/run/incomplete-turn.ts index 970a41b2554..5f79b78f3df 100644 --- a/src/agents/pi-embedded-runner/run/incomplete-turn.ts +++ b/src/agents/pi-embedded-runner/run/incomplete-turn.ts @@ -25,6 +25,7 @@ type PlanningOnlyAttempt = Pick< | "didSendViaMessagingTool" | "lastToolError" | "lastAssistant" + | "itemLifecycle" | "replayMetadata" | "toolMetas" >; @@ -106,7 +107,7 @@ export function resolvePlanningOnlyRetryInstruction(params: { params.attempt.didSendDeterministicApprovalPrompt || params.attempt.didSendViaMessagingTool || params.attempt.lastToolError || - params.attempt.toolMetas.length > 0 || + params.attempt.itemLifecycle.startedCount > 0 || params.attempt.replayMetadata.hadPotentialSideEffects ) { return null; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 7a5f3398240..f3696f79f01 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -67,4 +67,9 @@ export type EmbeddedRunAttemptResult = { hadPotentialSideEffects: boolean; replaySafe: boolean; }; + itemLifecycle: { + startedCount: number; + completedCount: number; + activeCount: number; + }; }; diff --git a/src/agents/pi-embedded-runner/usage-reporting.test.ts b/src/agents/pi-embedded-runner/usage-reporting.test.ts index b997e75eee3..42c30fc69bf 100644 --- a/src/agents/pi-embedded-runner/usage-reporting.test.ts +++ b/src/agents/pi-embedded-runner/usage-reporting.test.ts @@ -33,6 +33,11 @@ function makeAttemptResult( didSendViaMessagingTool, successfulCronAdds, }), + itemLifecycle: { + startedCount: 0, + completedCount: 0, + activeCount: 0, + }, didSendViaMessagingTool, messagingToolSentTexts: [], messagingToolSentMediaUrls: [], diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.media.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.media.test.ts index e3641a4317b..ba7671b0aa9 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.media.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.media.test.ts @@ -21,6 +21,9 @@ function createMockContext(overrides?: { toolMetaById: new Map(), toolMetas: [], toolSummaryById: new Set(), + itemActiveIds: new Set(), + itemStartedCount: 0, + itemCompletedCount: 0, pendingMessagingTexts: new Map(), pendingMessagingTargets: new Map(), pendingMessagingMediaUrls: new Map(), diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts index 126d31b7d5b..3d9611b87ca 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts @@ -37,6 +37,9 @@ function createTestContext(): { toolMetaById: new Map(), toolMetas: [], toolSummaryById: new Set(), + itemActiveIds: new Set(), + itemStartedCount: 0, + itemCompletedCount: 0, pendingMessagingTargets: new Map(), pendingMessagingTexts: new Map(), pendingMessagingMediaUrls: new Map(), @@ -121,6 +124,8 @@ describe("handleToolExecutionStart read path checks", () => { await pending; expect(ctx.state.toolMetaById.has("tool-await-flush")).toBe(true); + expect(ctx.state.itemStartedCount).toBe(1); + expect(ctx.state.itemActiveIds.has("tool:tool-await-flush")).toBe(true); }); }); @@ -175,6 +180,8 @@ describe("handleToolExecutionEnd cron.add commitment tracking", () => { ); expect(ctx.state.successfulCronAdds).toBe(0); + expect(ctx.state.itemCompletedCount).toBe(1); + expect(ctx.state.itemActiveIds.size).toBe(0); }); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 693c694a91a..18809deab3a 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -1,5 +1,6 @@ import type { AgentEvent } from "@mariozechner/pi-agent-core"; -import { emitAgentEvent } from "../infra/agent-events.js"; +import type { AgentItemEventData } from "../infra/agent-events.js"; +import { emitAgentEvent, emitAgentItemEvent } from "../infra/agent-events.js"; import { buildExecApprovalPendingReplyPayload, buildExecApprovalUnavailableReplyPayload, @@ -58,6 +59,14 @@ function buildToolCallSummary(toolName: string, args: unknown, meta?: string): T }; } +function buildToolItemId(toolCallId: string): string { + return `tool:${toolCallId}`; +} + +function buildToolItemTitle(toolName: string, meta?: string): string { + return meta ? `${toolName} ${meta}` : toolName; +} + function extendExecMeta(toolName: string, args: unknown, meta?: string): string | undefined { const normalized = toolName.trim().toLowerCase(); if (normalized !== "exec" && normalized !== "bash") { @@ -358,8 +367,9 @@ export function handleToolExecutionStart( const args = evt.args; const runId = ctx.params.runId; - // Track start time and args for after_tool_call hook - toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args }); + // Track start time and args for after_tool_call hook. + const startedAt = Date.now(); + toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: startedAt, args }); if (toolName === "read") { const record = args && typeof args === "object" ? (args as Record) : {}; @@ -395,11 +405,33 @@ export function handleToolExecutionStart( args: args as Record, }, }); + const itemData: AgentItemEventData = { + itemId: buildToolItemId(toolCallId), + phase: "start", + kind: "tool", + title: buildToolItemTitle(toolName, meta), + status: "running", + name: toolName, + meta, + toolCallId, + startedAt, + }; + ctx.state.itemActiveIds.add(itemData.itemId); + ctx.state.itemStartedCount += 1; + emitAgentItemEvent({ + runId: ctx.params.runId, + ...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}), + data: itemData, + }); // Best-effort typing signal; do not block tool summaries on slow emitters. void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "start", name: toolName, toolCallId }, }); + void ctx.params.onAgentEvent?.({ + stream: "item", + data: itemData, + }); if ( ctx.params.onToolResult && @@ -464,6 +496,21 @@ export function handleToolExecutionUpdate( partialResult: sanitized, }, }); + const itemData: AgentItemEventData = { + itemId: buildToolItemId(toolCallId), + phase: "update", + kind: "tool", + title: buildToolItemTitle(toolName, ctx.state.toolMetaById.get(toolCallId)?.meta), + status: "running", + name: toolName, + meta: ctx.state.toolMetaById.get(toolCallId)?.meta, + toolCallId, + }; + emitAgentItemEvent({ + runId: ctx.params.runId, + ...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}), + data: itemData, + }); void ctx.params.onAgentEvent?.({ stream: "tool", data: { @@ -472,6 +519,10 @@ export function handleToolExecutionUpdate( toolCallId, }, }); + void ctx.params.onAgentEvent?.({ + stream: "item", + data: itemData, + }); } export async function handleToolExecutionEnd( @@ -586,6 +637,30 @@ export async function handleToolExecutionEnd( result: sanitizedResult, }, }); + const endedAt = Date.now(); + const itemId = buildToolItemId(toolCallId); + ctx.state.itemActiveIds.delete(itemId); + ctx.state.itemCompletedCount += 1; + const itemData: AgentItemEventData = { + itemId, + phase: "end", + kind: "tool", + title: buildToolItemTitle(toolName, meta), + status: isToolError ? "failed" : "completed", + name: toolName, + meta, + toolCallId, + startedAt: startData?.startTime, + endedAt, + ...(isToolError && extractToolErrorMessage(sanitizedResult) + ? { error: extractToolErrorMessage(sanitizedResult) } + : {}), + }; + emitAgentItemEvent({ + runId: ctx.params.runId, + ...(ctx.params.sessionKey ? { sessionKey: ctx.params.sessionKey } : {}), + data: itemData, + }); void ctx.params.onAgentEvent?.({ stream: "tool", data: { @@ -596,6 +671,10 @@ export async function handleToolExecutionEnd( isError: isToolError, }, }); + void ctx.params.onAgentEvent?.({ + stream: "item", + data: itemData, + }); ctx.log.debug( `embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 366752ffde0..2790fa80a61 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -29,6 +29,9 @@ export type EmbeddedPiSubscribeState = { toolMetas: Array<{ toolName?: string; meta?: string }>; toolMetaById: Map; toolSummaryById: Set; + itemActiveIds: Set; + itemStartedCount: number; + itemCompletedCount: number; lastToolError?: ToolErrorSummary; blockReplyBreak: "text_end" | "message_end"; @@ -144,6 +147,9 @@ export type ToolHandlerState = Pick< | "toolMetaById" | "toolMetas" | "toolSummaryById" + | "itemActiveIds" + | "itemStartedCount" + | "itemCompletedCount" | "lastToolError" | "pendingMessagingTargets" | "pendingMessagingTexts" diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 7f1fdb424bc..dee600e005d 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -45,6 +45,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar toolMetas: [], toolMetaById: new Map(), toolSummaryById: new Set(), + itemActiveIds: new Set(), + itemStartedCount: 0, + itemCompletedCount: 0, lastToolError: undefined, blockReplyBreak: params.blockReplyBreak ?? "text_end", reasoningMode, @@ -645,6 +648,9 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar toolMetas.length = 0; toolMetaById.clear(); toolSummaryById.clear(); + state.itemActiveIds.clear(); + state.itemStartedCount = 0; + state.itemCompletedCount = 0; state.lastToolError = undefined; messagingToolSentTexts.length = 0; messagingToolSentTextsNormalized.length = 0; @@ -752,6 +758,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar getLastToolError: () => (state.lastToolError ? { ...state.lastToolError } : undefined), getUsageTotals, getCompactionCount: () => compactionCount, + getItemLifecycle: () => ({ + startedCount: state.itemStartedCount, + completedCount: state.itemCompletedCount, + activeCount: state.itemActiveIds.size, + }), waitForCompactionRetry: () => { // Reject after unsubscribe so callers treat it as cancellation, not success if (state.unsubscribed) { diff --git a/src/agents/pi-tool-definition-adapter.after-tool-call.fires-once.test.ts b/src/agents/pi-tool-definition-adapter.after-tool-call.fires-once.test.ts index b65d2240bfe..e69991e0ba4 100644 --- a/src/agents/pi-tool-definition-adapter.after-tool-call.fires-once.test.ts +++ b/src/agents/pi-tool-definition-adapter.after-tool-call.fires-once.test.ts @@ -86,6 +86,7 @@ async function loadFreshAfterToolCallModulesForTest() { })); vi.doMock("../infra/agent-events.js", () => ({ emitAgentEvent: vi.fn(), + emitAgentItemEvent: vi.fn(), })); vi.doMock("./pi-tools.before-tool-call.js", () => ({ consumeAdjustedParamsForToolCall: beforeToolCallMocks.consumeAdjustedParamsForToolCall, diff --git a/src/agents/pi-tool-handler-state.test-helpers.ts b/src/agents/pi-tool-handler-state.test-helpers.ts index 7cccdbdf3bf..8a5405833e4 100644 --- a/src/agents/pi-tool-handler-state.test-helpers.ts +++ b/src/agents/pi-tool-handler-state.test-helpers.ts @@ -1,7 +1,11 @@ export function createBaseToolHandlerState() { return { + toolMetaById: new Map(), toolMetas: [] as Array<{ toolName?: string; meta?: string }>, toolSummaryById: new Set(), + itemActiveIds: new Set(), + itemStartedCount: 0, + itemCompletedCount: 0, lastToolError: undefined, pendingMessagingTexts: new Map(), pendingMessagingTargets: new Map(), diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index e539ad176ef..79f7b5a8eda 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -117,6 +117,14 @@ type FallbackRunnerParams = { type EmbeddedAgentParams = { onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => Promise | void; + onItemEvent?: (payload: { + itemId?: string; + kind?: string; + title?: string; + name?: string; + phase?: string; + status?: string; + }) => Promise | void; onAgentEvent?: (payload: { stream: string; data: { phase?: string; completed?: boolean }; @@ -234,6 +242,65 @@ describe("runAgentTurnWithFallback", () => { expect(onToolResult.mock.calls[0]?.[0]?.text).toBeUndefined(); }); + it("forwards item lifecycle events to reply options", async () => { + const onItemEvent = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ + stream: "item", + data: { + itemId: "tool:read-1", + kind: "tool", + title: "read", + name: "read", + phase: "start", + status: "running", + }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const pendingToolTasks = new Set>(); + const typingSignals = createMockTypingSignaler(); + const result = await runAgentTurnWithFallback({ + commandBody: "hello", + followupRun: createFollowupRun(), + sessionCtx: { + Provider: "whatsapp", + MessageSid: "msg", + } as unknown as TemplateContext, + opts: { + onItemEvent, + } satisfies GetReplyOptions, + typingSignals, + blockReplyPipeline: null, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + applyReplyToMode: (payload) => payload, + shouldEmitToolResult: () => true, + shouldEmitToolOutput: () => false, + pendingToolTasks, + resetSessionAfterCompactionFailure: async () => false, + resetSessionAfterRoleOrderingConflict: async () => false, + isHeartbeat: false, + sessionKey: "main", + getActiveSessionEntry: () => undefined, + resolvedVerboseLevel: "off", + }); + + await Promise.all(pendingToolTasks); + + expect(result.kind).toBe("success"); + expect(onItemEvent).toHaveBeenCalledWith({ + itemId: "tool:read-1", + kind: "tool", + title: "read", + name: "read", + phase: "start", + status: "running", + }); + }); + it("keeps compaction start notices silent by default", async () => { const onBlockReply = vi.fn(); state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index ebe93a98624..7fedfdb46e4 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -718,6 +718,16 @@ export async function runAgentTurnWithFallback(params: { await params.opts?.onToolStart?.({ name, phase }); } } + if (evt.stream === "item") { + await params.opts?.onItemEvent?.({ + itemId: typeof evt.data.itemId === "string" ? evt.data.itemId : undefined, + kind: typeof evt.data.kind === "string" ? evt.data.kind : undefined, + title: typeof evt.data.title === "string" ? evt.data.title : undefined, + name: typeof evt.data.name === "string" ? evt.data.name : undefined, + phase: typeof evt.data.phase === "string" ? evt.data.phase : undefined, + status: typeof evt.data.status === "string" ? evt.data.status : undefined, + }); + } // Track auto-compaction and notify higher layers. if (evt.stream === "compaction") { const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 5ef2b50f0e7..5a511f7d717 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -929,6 +929,61 @@ describe("dispatchReplyFromConfig", () => { expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" }); }); + it("prefers item-start progress updates for direct sessions", async () => { + setNoAbort(); + const cfg = emptyConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "telegram", + ChatType: "direct", + }); + + const replyResolver = async ( + _ctx: MsgContext, + opts?: GetReplyOptions, + _cfg?: OpenClawConfig, + ) => { + await opts?.onItemEvent?.({ + itemId: "tool:read-1", + kind: "tool", + title: "read config", + name: "read", + phase: "start", + status: "running", + }); + await opts?.onItemEvent?.({ + itemId: "tool:read-1", + kind: "tool", + title: "read config", + name: "read", + phase: "end", + status: "completed", + }); + await opts?.onItemEvent?.({ + itemId: "tool:grep-1", + kind: "tool", + title: "grep", + name: "grep", + phase: "start", + status: "running", + }); + return { text: "done" } satisfies ReplyPayload; + }; + + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ text: "Working: read" }), + ); + expect(dispatcher.sendToolResult).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ text: "Working: grep" }), + ); + expect(dispatcher.sendToolResult).toHaveBeenCalledTimes(2); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "done" }); + }); + it("delivers deterministic exec approval tool payloads for native commands", async () => { setNoAbort(); const cfg = emptyConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 3662dbcb16a..9cacbe6e9d1 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -603,6 +603,26 @@ export async function dispatchReplyFromConfig(params: { const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true; const toolStartStatusesSent = new Set(); let toolStartStatusCount = 0; + const maybeSendWorkingStatus = (label: string) => { + const normalizedLabel = label.trim(); + if ( + !shouldSendToolStartStatuses || + !normalizedLabel || + toolStartStatusCount >= 2 || + toolStartStatusesSent.has(normalizedLabel) + ) { + return; + } + toolStartStatusesSent.add(normalizedLabel); + toolStartStatusCount += 1; + const payload: ReplyPayload = { + text: `Working: ${normalizedLabel}`, + }; + if (shouldRouteToOriginating) { + return sendPayloadAsync(payload, undefined, false); + } + dispatcher.sendToolResult(payload); + }; const acpDispatch = await dispatchAcpRuntime.tryDispatchAcpReply({ ctx, cfg, @@ -702,26 +722,24 @@ export async function dispatchReplyFromConfig(params: { return run(); }, onToolStart: ({ name, phase }) => { - if (!shouldSendToolStartStatuses || phase !== "start") { + if (phase !== "start") { return; } - const normalizedName = typeof name === "string" ? name.trim() : ""; - if ( - !normalizedName || - toolStartStatusCount >= 2 || - toolStartStatusesSent.has(normalizedName) - ) { + if (typeof name !== "string") { return; } - toolStartStatusesSent.add(normalizedName); - toolStartStatusCount += 1; - const payload: ReplyPayload = { - text: `Working: ${normalizedName}`, - }; - if (shouldRouteToOriginating) { - return sendPayloadAsync(payload, undefined, false); + return maybeSendWorkingStatus(name); + }, + onItemEvent: ({ phase, name, title, kind }) => { + if (phase !== "start") { + return; + } + if (kind === "tool" && typeof name === "string" && name.trim()) { + return maybeSendWorkingStatus(name); + } + if (typeof title === "string") { + return maybeSendWorkingStatus(title); } - dispatcher.sendToolResult(payload); }, onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => { const run = async () => { diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 3fba9e8bb9b..cc080480687 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -64,6 +64,15 @@ export type GetReplyOptions = { onToolResult?: (payload: ReplyPayload) => Promise | void; /** Called when a tool phase starts/updates, before summary payloads are emitted. */ onToolStart?: (payload: { name?: string; phase?: string }) => Promise | void; + /** Called when a concrete work item starts, updates, or completes. */ + onItemEvent?: (payload: { + itemId?: string; + kind?: string; + title?: string; + name?: string; + phase?: string; + status?: string; + }) => Promise | void; /** Called when context auto-compaction starts (allows UX feedback during the pause). */ onCompactionStart?: () => Promise | void; /** Called when context auto-compaction completes. */ diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 2b830490baa..d96a07c72fe 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -729,6 +729,7 @@ export function createAgentEventHandler({ const agentPayload = sessionKey ? { ...eventForClients, sessionKey } : eventForClients; const last = agentRunSeq.get(evt.runId) ?? 0; const isToolEvent = evt.stream === "tool"; + const isItemEvent = evt.stream === "item"; const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; // Build tool payload: strip result/partialResult unless verbose=full const toolPayload = @@ -792,6 +793,10 @@ export function createAgentEventHandler({ } } } else { + const itemPhase = isItemEvent && typeof evt.data?.phase === "string" ? evt.data.phase : ""; + if (itemPhase === "start" && isControlUiVisible && sessionKey && !isAborted) { + flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + } broadcast("agent", agentPayload); } diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index 1884774a3c2..252a5b6d899 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -4,6 +4,30 @@ import { notifyListeners, registerListener } from "../shared/listeners.js"; export type AgentEventStream = "lifecycle" | "tool" | "assistant" | "error" | (string & {}); +export type AgentItemEventPhase = "start" | "update" | "end"; +export type AgentItemEventStatus = "running" | "completed" | "failed" | "blocked"; +export type AgentItemEventKind = + | "tool" + | "command" + | "patch" + | "search" + | "analysis" + | (string & {}); + +export type AgentItemEventData = { + itemId: string; + phase: AgentItemEventPhase; + kind: AgentItemEventKind; + title: string; + status: AgentItemEventStatus; + name?: string; + meta?: string; + toolCallId?: string; + startedAt?: number; + endedAt?: number; + error?: string; +}; + export type AgentEventPayload = { runId: string; seq: number; @@ -91,6 +115,19 @@ export function emitAgentEvent(event: Omit) { notifyListeners(state.listeners, enriched); } +export function emitAgentItemEvent(params: { + runId: string; + data: AgentItemEventData; + sessionKey?: string; +}) { + emitAgentEvent({ + runId: params.runId, + stream: "item", + data: params.data as unknown as Record, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + }); +} + export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { const state = getAgentEventState(); return registerListener(state.listeners, listener);