diff --git a/CHANGELOG.md b/CHANGELOG.md index 8efb13f7eea..f5dc57ea2b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,7 @@ Docs: https://docs.openclaw.ai - Dependencies: override transitive `ip-address` to `10.2.0` so the runtime lockfile no longer includes the vulnerable `10.1.0` build flagged by Dependabot alert 109. Thanks @vincentkoc. - Feishu: hydrate missing native topic starter thread IDs before session routing so first turns and follow-ups stay in the same topic session. Fixes #78262. Thanks @joeyzenghuan. - LINE: reject `dmPolicy: "open"` configs without wildcard `allowFrom` so webhook DMs fail validation instead of being acknowledged and silently blocked before inbound processing. Fixes #78316. +- Telegram/Codex: keep message-tool-only progress drafts visible and render native Codex tool progress once per tool instead of duplicating item/tool draft lines. Fixes #75641. (#77949) - Providers/xAI: stop sending OpenAI-style reasoning effort controls to native Grok Responses models, so `xai/grok-4.3` no longer fails live Docker/Gateway runs with `Invalid reasoning effort`. - Providers/xAI: clamp the bundled xAI thinking profile to `off` so live Gateway runs cannot send unsupported reasoning levels to native Grok Responses models. - Matrix/approvals: retry approval delivery up to 3 times with a short backoff so transient Matrix send failures do not strand pending approval prompts. (#78179) Thanks @Patrick-Erichsen. diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index 0ad6ff06c53..1462c5fbc7b 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -660,6 +660,164 @@ describe("CodexAppServerEventProjector", () => { expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 }); }); + it("synthesizes normalized tool progress for Codex-native tool items", async () => { + const onAgentEvent = vi.fn(); + const projector = await createProjector({ ...(await createParams()), onAgentEvent }); + + await projector.handleNotification( + forCurrentTurn("item/started", { + item: { + type: "commandExecution", + id: "cmd-1", + command: "pnpm test extensions/codex", + cwd: "/workspace", + processId: null, + source: "agent", + status: "inProgress", + commandActions: [], + aggregatedOutput: null, + exitCode: null, + durationMs: null, + }, + }), + ); + await projector.handleNotification( + forCurrentTurn("item/completed", { + item: { + type: "commandExecution", + id: "cmd-1", + command: "pnpm test extensions/codex", + cwd: "/workspace", + processId: null, + source: "agent", + status: "completed", + commandActions: [], + aggregatedOutput: "ok", + exitCode: 0, + durationMs: 42, + }, + }), + ); + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "item", + data: expect.objectContaining({ + phase: "start", + kind: "command", + name: "bash", + itemId: "cmd-1", + suppressChannelProgress: true, + }), + }); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "tool", + data: expect.objectContaining({ + phase: "start", + name: "bash", + itemId: "cmd-1", + toolCallId: "cmd-1", + args: { command: "pnpm test extensions/codex", cwd: "/workspace" }, + }), + }); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "tool", + data: expect.objectContaining({ + phase: "result", + name: "bash", + itemId: "cmd-1", + toolCallId: "cmd-1", + status: "completed", + isError: false, + result: expect.objectContaining({ exitCode: 0, durationMs: 42 }), + }), + }); + }); + + it("marks declined Codex-native tool results as non-success", async () => { + const onAgentEvent = vi.fn(); + const projector = await createProjector({ ...(await createParams()), onAgentEvent }); + + await projector.handleNotification( + forCurrentTurn("item/completed", { + item: { + type: "commandExecution", + id: "cmd-declined", + command: "pnpm test extensions/codex", + cwd: "/workspace", + processId: null, + source: "agent", + status: "declined", + commandActions: [], + aggregatedOutput: null, + exitCode: null, + durationMs: null, + }, + }), + ); + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "item", + data: expect.objectContaining({ + phase: "end", + kind: "command", + name: "bash", + itemId: "cmd-declined", + status: "blocked", + suppressChannelProgress: true, + }), + }); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "tool", + data: expect.objectContaining({ + phase: "result", + name: "bash", + itemId: "cmd-declined", + toolCallId: "cmd-declined", + status: "blocked", + isError: true, + }), + }); + }); + + it("leaves Codex dynamic tool item progress to item/tool/call normalization", async () => { + const onAgentEvent = vi.fn(); + const projector = await createProjector({ ...(await createParams()), onAgentEvent }); + + await projector.handleNotification( + forCurrentTurn("item/started", { + item: { + type: "dynamicToolCall", + id: "call-1", + namespace: null, + tool: "message", + arguments: { action: "send" }, + status: "inProgress", + contentItems: null, + success: null, + durationMs: null, + }, + }), + ); + + expect(onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "item", + data: expect.objectContaining({ + phase: "start", + kind: "tool", + name: "message", + suppressChannelProgress: true, + }), + }), + ); + expect(onAgentEvent).not.toHaveBeenCalledWith( + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ phase: "start", name: "message" }), + }), + ); + }); + it("emits verbose tool summaries through onToolResult", async () => { const onToolResult = vi.fn(); const projector = await createProjector({ diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index 2f9752c6788..e21efe51604 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -30,6 +30,11 @@ import { import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js"; import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; +import { + resolveCodexToolProgressDetailMode, + sanitizeCodexAgentEventRecord, + sanitizeCodexToolArguments, +} from "./tool-progress-normalization.js"; import { attachCodexMirrorIdentity } from "./transcript-mirror.js"; export type CodexAppServerToolTelemetry = { @@ -396,6 +401,7 @@ export class CodexAppServerEventProjector { }); } this.emitStandardItemEvent({ phase: "start", item }); + this.emitNormalizedToolItemEvent({ phase: "start", item }); this.emitToolResultSummary(item); this.emitAgentEvent({ stream: "codex_app_server.item", @@ -449,6 +455,7 @@ export class CodexAppServerEventProjector { } this.recordToolMeta(item); this.emitStandardItemEvent({ phase: "end", item }); + this.emitNormalizedToolItemEvent({ phase: "result", item }); this.emitToolResultSummary(item); this.emitToolResultOutput(item); this.emitAgentEvent({ @@ -656,6 +663,7 @@ export class CodexAppServerEventProjector { return; } const meta = itemMeta(item, this.toolProgressDetailMode()); + const suppressChannelProgress = shouldSuppressChannelProgressForItem(item); this.emitAgentEvent({ stream: "item", data: { @@ -666,6 +674,42 @@ export class CodexAppServerEventProjector { status: params.phase === "start" ? "running" : itemStatus(item), ...(itemName(item) ? { name: itemName(item) } : {}), ...(meta ? { meta } : {}), + ...(suppressChannelProgress ? { suppressChannelProgress: true } : {}), + }, + }); + } + + private emitNormalizedToolItemEvent(params: { + phase: "start" | "result"; + item: CodexThreadItem | undefined; + }): void { + const { item } = params; + if (!item || !shouldSynthesizeToolProgressForItem(item)) { + return; + } + const name = itemName(item); + if (!name) { + return; + } + const meta = itemMeta(item, this.toolProgressDetailMode()); + const args = params.phase === "start" ? itemToolArgs(item) : undefined; + const status = params.phase === "result" ? itemStatus(item) : "running"; + this.emitAgentEvent({ + stream: "tool", + data: { + phase: params.phase, + name, + itemId: item.id, + toolCallId: item.id, + ...(meta ? { meta } : {}), + ...(args ? { args } : {}), + ...(params.phase === "result" + ? { + status, + isError: isNonSuccessItemStatus(status), + ...itemToolResult(item), + } + : {}), }, }); } @@ -743,7 +787,7 @@ export class CodexAppServerEventProjector { } private toolProgressDetailMode(): ToolProgressDetailMode { - return this.params.toolProgressDetail === "raw" ? "raw" : "explain"; + return resolveCodexToolProgressDetailMode(this.params.toolProgressDetail); } private recordToolMeta(item: CodexThreadItem | undefined): void { @@ -1074,17 +1118,24 @@ function itemTitle(item: CodexThreadItem): string { } } -function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" { +function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" | "blocked" { const status = readItemString(item, "status"); if (status === "failed") { return "failed"; } + if (status === "declined") { + return "blocked"; + } if (status === "inProgress" || status === "running") { return "running"; } return "completed"; } +function isNonSuccessItemStatus(status: ReturnType): boolean { + return status === "failed" || status === "blocked"; +} + function itemName(item: CodexThreadItem): string | undefined { if (item.type === "dynamicToolCall" && typeof item.tool === "string") { return item.tool; @@ -1105,6 +1156,78 @@ function itemName(item: CodexThreadItem): string | undefined { return undefined; } +function shouldSynthesizeToolProgressForItem(item: CodexThreadItem): boolean { + switch (item.type) { + case "commandExecution": + case "fileChange": + case "webSearch": + case "mcpToolCall": + return true; + default: + return false; + } +} + +function shouldSuppressChannelProgressForItem(item: CodexThreadItem): boolean { + if (shouldSynthesizeToolProgressForItem(item)) { + return true; + } + // Dynamic OpenClaw tool requests are emitted at the item/tool/call request + // boundary in run-attempt.ts. Re-emitting item notifications to channels can + // duplicate start/result progress when the app-server sends both signals. + return item.type === "dynamicToolCall"; +} + +function itemToolArgs(item: CodexThreadItem): Record | undefined { + if (item.type === "commandExecution") { + return sanitizeCodexAgentEventRecord({ + command: item.command, + ...(typeof item.cwd === "string" ? { cwd: item.cwd } : {}), + }); + } + if (item.type === "webSearch" && typeof item.query === "string") { + return sanitizeCodexAgentEventRecord({ query: item.query }); + } + if (item.type === "mcpToolCall") { + return sanitizeCodexToolArguments(item.arguments); + } + return undefined; +} + +function itemToolResult(item: CodexThreadItem): { result?: Record } { + if (item.type === "commandExecution") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + exitCode: item.exitCode, + durationMs: item.durationMs, + }), + }; + } + if (item.type === "fileChange") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + changes: item.changes.map((change) => ({ path: change.path, kind: change.kind })), + }), + }; + } + if (item.type === "mcpToolCall") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + durationMs: item.durationMs, + ...(item.error ? { error: item.error } : {}), + ...(item.result ? { result: item.result } : {}), + }), + }; + } + if (item.type === "webSearch") { + return { result: sanitizeCodexAgentEventRecord({ status: "completed" }) }; + } + return {}; +} + function itemMeta( item: CodexThreadItem, detailMode: ToolProgressDetailMode = "explain", diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index f67f3991854..cc9e6da2fdc 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -79,7 +79,7 @@ function useLightweightCodexRuntimePlan(params: EmbeddedRunAttemptParams): void resolvedRef: `${params.provider}/${params.modelId}`, harnessId: "codex", }, - } as NonNullable; + } as unknown as NonNullable; } function threadStartResult(threadId = "thread-1") { @@ -183,6 +183,7 @@ function createAppServerHarness( ) { const requests: Array<{ method: string; params: unknown }> = []; let notify: (notification: CodexServerNotification) => Promise = async () => undefined; + let handleServerRequest: AppServerRequestHandler | undefined; const request = vi.fn(async (method: string, params?: unknown) => { requests.push({ method, params }); return requestImpl(method, params); @@ -197,11 +198,22 @@ function createAppServerHarness( notify = handler; return () => undefined; }, - addRequestHandler: () => () => undefined, + addRequestHandler: (handler: AppServerRequestHandler) => { + handleServerRequest = handler; + return () => undefined; + }, } as never; }, ); + const waitForServerRequestHandler = async () => { + await vi.waitFor(() => expect(handleServerRequest).toBeTypeOf("function"), { + interval: 1, + timeout: 30_000, + }); + return handleServerRequest!; + }; + return { request, requests, @@ -223,6 +235,11 @@ function createAppServerHarness( async notify(notification: CodexServerNotification) { await notify(notification); }, + waitForServerRequestHandler, + async handleServerRequest(request: Parameters[0]) { + const handler = await waitForServerRequestHandler(); + return handler(request); + }, async completeTurn(params: { threadId: string; turnId: string }) { await notify({ method: "turn/completed", @@ -349,6 +366,12 @@ function createNamedDynamicTool( }; } +type AppServerRequestHandler = (request: { + id: string | number; + method: string; + params?: unknown; +}) => Promise; + function extractRelayIdFromThreadRequest(params: unknown): string { const command = ( params as { @@ -656,6 +679,93 @@ describe("runCodexAppServerAttempt", () => { }); }); + it("emits normalized tool progress around app-server dynamic tool requests", async () => { + const harness = createStartedThreadHarness(); + const onRunAgentEvent = vi.fn(); + const globalAgentEvents: AgentEventPayload[] = []; + onAgentEvent((event) => globalAgentEvents.push(event)); + const params = createParams( + path.join(tempDir, "session.jsonl"), + path.join(tempDir, "workspace"), + ); + params.onAgentEvent = onRunAgentEvent; + + const run = runCodexAppServerAttempt(params); + await harness.waitForMethod("turn/start"); + + await expect( + harness.handleServerRequest({ + id: "request-tool-1", + method: "item/tool/call", + params: { + threadId: "thread-1", + turnId: "turn-1", + callId: "call-1", + namespace: null, + tool: "message", + arguments: { + action: "send", + token: "plain-secret-value-12345", + text: "hello", + }, + }, + }), + ).resolves.toMatchObject({ + success: false, + contentItems: [ + { + type: "inputText", + text: expect.stringMatching( + /^(Unknown OpenClaw tool: message|Action send requires a target\.)$/u, + ), + }, + ], + }); + + await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" }); + await run; + + const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event); + expect(agentEvents).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ + phase: "start", + name: "message", + toolCallId: "call-1", + args: expect.objectContaining({ + action: "send", + token: "plain-…2345", + text: "hello", + }), + }), + }), + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ + phase: "result", + name: "message", + toolCallId: "call-1", + isError: true, + result: expect.objectContaining({ success: false }), + }), + }), + ]), + ); + expect(JSON.stringify(agentEvents)).not.toContain("plain-secret-value-12345"); + expect(globalAgentEvents).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + runId: "run-1", + sessionKey: "agent:main:session-1", + stream: "tool", + data: expect.objectContaining({ phase: "start", name: "message" }), + }), + ]), + ); + }); + it("releases the session when Codex never completes after a dynamic tool response", async () => { let handleRequest: | ((request: { id: string; method: string; params?: unknown }) => Promise) diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 8556a823f9a..200d6070610 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -98,6 +98,12 @@ import { codexDynamicToolsFingerprint, startOrResumeThread, } from "./thread-lifecycle.js"; +import { + inferCodexDynamicToolMeta, + resolveCodexToolProgressDetailMode, + sanitizeCodexToolArguments, + sanitizeCodexToolResponse, +} from "./tool-progress-normalization.js"; import { createCodexTrajectoryRecorder, normalizeCodexTrajectoryError, @@ -973,6 +979,19 @@ export async function runCodexAppServerAttempt( name: call.tool, arguments: call.arguments, }); + const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail); + const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode); + const toolArgs = sanitizeCodexToolArguments(call.arguments); + emitCodexAppServerEvent(params, { + stream: "tool", + data: { + phase: "start", + name: call.tool, + toolCallId: call.callId, + ...(toolMeta ? { meta: toolMeta } : {}), + ...(toolArgs ? { args: toolArgs } : {}), + }, + }); const response = await handleDynamicToolCallWithTimeout({ call, toolBridge, @@ -996,6 +1015,17 @@ export async function runCodexAppServerAttempt( success: response.success, contentItems: response.contentItems, }); + emitCodexAppServerEvent(params, { + stream: "tool", + data: { + phase: "result", + name: call.tool, + toolCallId: call.callId, + ...(toolMeta ? { meta: toolMeta } : {}), + isError: !response.success, + result: sanitizeCodexToolResponse(response), + }, + }); return response as JsonValue; } finally { activeAppServerTurnRequests = Math.max(0, activeAppServerTurnRequests - 1); diff --git a/extensions/codex/src/app-server/tool-progress-normalization.ts b/extensions/codex/src/app-server/tool-progress-normalization.ts new file mode 100644 index 00000000000..be09c380a0c --- /dev/null +++ b/extensions/codex/src/app-server/tool-progress-normalization.ts @@ -0,0 +1,77 @@ +import { + inferToolMetaFromArgs, + type EmbeddedRunAttemptParams, + type ToolProgressDetailMode, +} from "openclaw/plugin-sdk/agent-harness-runtime"; +import { redactSensitiveFieldValue, redactToolPayloadText } from "openclaw/plugin-sdk/text-runtime"; +import { + isJsonObject, + type CodexDynamicToolCallParams, + type CodexDynamicToolCallResponse, + type JsonValue, +} from "./protocol.js"; + +export function resolveCodexToolProgressDetailMode( + value: EmbeddedRunAttemptParams["toolProgressDetail"], +): ToolProgressDetailMode { + return value === "raw" ? "raw" : "explain"; +} + +export function sanitizeCodexAgentEventValue( + value: unknown, + seen = new WeakSet(), +): unknown { + if (typeof value === "string") { + return redactToolPayloadText(value); + } + if (Array.isArray(value)) { + if (seen.has(value)) { + return "[Circular]"; + } + seen.add(value); + return value.map((entry) => sanitizeCodexAgentEventValue(entry, seen)); + } + if (value && typeof value === "object") { + if (seen.has(value)) { + return "[Circular]"; + } + seen.add(value); + const out: Record = {}; + for (const [key, child] of Object.entries(value as Record)) { + out[key] = + typeof child === "string" + ? redactSensitiveFieldValue(key, child) + : sanitizeCodexAgentEventValue(child, seen); + } + return out; + } + return value; +} + +export function sanitizeCodexAgentEventRecord( + value: Record, +): Record { + return sanitizeCodexAgentEventValue(value) as Record; +} + +export function sanitizeCodexToolArguments( + value: JsonValue | undefined, +): Record | undefined { + if (!isJsonObject(value)) { + return undefined; + } + return sanitizeCodexAgentEventRecord(value); +} + +export function sanitizeCodexToolResponse( + response: CodexDynamicToolCallResponse, +): Record { + return sanitizeCodexAgentEventRecord(response as unknown as Record); +} + +export function inferCodexDynamicToolMeta( + call: Pick, + detailMode: ToolProgressDetailMode, +): string | undefined { + return inferToolMetaFromArgs(call.tool, call.arguments, { detailMode }); +} diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 7f4baa217aa..b634ec9cb8f 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -2,7 +2,10 @@ import type { Bot } from "grammy"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { resolveAutoTopicLabelConfig as resolveAutoTopicLabelConfigRuntime } from "./auto-topic-label-config.js"; import type { TelegramBotDeps } from "./bot-deps.js"; -import { createTestDraftStream } from "./draft-stream.test-helpers.js"; +import { + createSequencedTestDraftStream, + createTestDraftStream, +} from "./draft-stream.test-helpers.js"; type DispatchReplyWithBufferedBlockDispatcherArgs = Parameters< TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"] @@ -259,6 +262,8 @@ describe("dispatchTelegramMessage draft streaming", () => { }); const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId }); + const createSequencedDraftStream = (startMessageId = 1001) => + createSequencedTestDraftStream(startMessageId); function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) { const answerDraftStream = createDraftStream(params?.answerMessageId); @@ -998,6 +1003,94 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("shows Telegram progress drafts immediately for explicit tool starts", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + return { queuedFinal: false }; + }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith(expect.stringMatching(/^Shelling\n`šŸ› ļø Exec`$/)); + expect(draftStream.flush).toHaveBeenCalled(); + }); + + it("renders Telegram progress drafts before slow status reactions resolve", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + let releaseSetTool: (() => void) | undefined; + const statusReactionController = createStatusReactionController(); + statusReactionController.setTool.mockImplementation( + () => + new Promise((resolve) => { + releaseSetTool = resolve; + }), + ); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await Promise.resolve(); + await Promise.resolve(); + const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0]; + releaseSetTool?.(); + await pendingToolStart; + expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`šŸ› ļø Exec`$/); + return { queuedFinal: false }; + }); + + await dispatchWithContext({ + context: createContext({ + statusReactionController: statusReactionController as never, + }), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(statusReactionController.setTool).toHaveBeenCalledWith("exec"); + }); + + it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" }); + await replyOptions?.onItemEvent?.({ progressText: "tests passed" }); + await replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith( + expect.stringMatching(/^Shelling\n`šŸ”Ž Web Search: docs lookup`\n• `tests passed`$/), + ); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.materialize).not.toHaveBeenCalled(); + expect(draftStream.clear).toHaveBeenCalledTimes(1); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Final after tool" })], + }), + ); + expect(editMessageTelegram).not.toHaveBeenCalled(); + }); + it("falls back to normal send for error payloads and clears the pending stream", async () => { const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index f227bb19fed..ff9e1051a9e 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -490,7 +490,10 @@ export const dispatchTelegramMessage = async ({ const progressDraftGate = createChannelProgressDraftGate({ onStart: () => renderProgressDraft({ flush: true }), }); - const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => { + const pushStreamToolProgress = async ( + line?: string, + options?: { toolName?: string; startImmediately?: boolean }, + ) => { if (!answerLane.stream) { return; } @@ -529,6 +532,19 @@ export const dispatchTelegramMessage = async ({ ); } } + if ( + options?.startImmediately && + streamToolProgressEnabled && + !streamToolProgressSuppressed && + normalized + ) { + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.startNow(); + if (alreadyStarted && progressDraftGate.hasStarted) { + await renderProgressDraft(); + } + return; + } const alreadyStarted = progressDraftGate.hasStarted; await progressDraftGate.noteWork(); if (alreadyStarted && progressDraftGate.hasStarted) { @@ -1189,12 +1205,10 @@ export const dispatchTelegramMessage = async ({ : undefined, suppressDefaultToolProgressMessages: !streamDeliveryEnabled || Boolean(answerLane.stream), + allowProgressCallbacksWhenSourceDeliverySuppressed: Boolean(answerLane.stream), onToolStart: async (payload) => { const toolName = payload.name?.trim(); - if (statusReactionController && toolName) { - await statusReactionController.setTool(toolName); - } - await pushStreamToolProgress( + const progressPromise = pushStreamToolProgress( formatChannelProgressDraftLineForEntry( telegramCfg, { @@ -1205,8 +1219,12 @@ export const dispatchTelegramMessage = async ({ }, payload.detailMode ? { detailMode: payload.detailMode } : undefined, ), - { toolName }, + { toolName, startImmediately: true }, ); + if (statusReactionController && toolName) { + await statusReactionController.setTool(toolName); + } + await progressPromise; }, onItemEvent: async (payload) => { await pushStreamToolProgress( diff --git a/src/auto-reply/get-reply-options.types.ts b/src/auto-reply/get-reply-options.types.ts index c620a11943d..e9219bbc1f4 100644 --- a/src/auto-reply/get-reply-options.types.ts +++ b/src/auto-reply/get-reply-options.types.ts @@ -163,6 +163,8 @@ export type GetReplyOptions = { * output private; visible channel output must come from the message tool. */ sourceReplyDeliveryMode?: SourceReplyDeliveryMode; + /** Allow channel-owned progress UI while final/source reply delivery remains message-tool-only. */ + allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean; disableBlockStreaming?: boolean; /** Timeout for block reply delivery (ms). */ blockReplyTimeoutMs?: number; diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index 7ecb22af4d4..fbbac755612 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -1145,6 +1145,103 @@ describe("runAgentTurnWithFallback", () => { }); }); + it("skips channel item progress when a matching tool event carries the progress", async () => { + const onItemEvent = vi.fn(); + const onToolStart = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ + stream: "item", + data: { + itemId: "cmd-1", + kind: "command", + title: "Command", + name: "bash", + phase: "start", + status: "running", + suppressChannelProgress: true, + }, + }); + await params.onAgentEvent?.({ + stream: "tool", + data: { + itemId: "cmd-1", + toolCallId: "cmd-1", + name: "bash", + phase: "start", + args: { command: "pnpm test" }, + }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + ...createMinimalRunAgentTurnParams({ + opts: { + onItemEvent, + onToolStart, + } satisfies GetReplyOptions, + }), + }); + + expect(result.kind).toBe("success"); + expect(onItemEvent).not.toHaveBeenCalled(); + expect(onToolStart).toHaveBeenCalledWith({ + name: "bash", + phase: "start", + args: { command: "pnpm test" }, + detailMode: undefined, + }); + }); + + it("preserves suppressed item progress when no tool-start callback is registered", async () => { + const onItemEvent = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + await params.onAgentEvent?.({ + stream: "item", + data: { + itemId: "cmd-1", + kind: "command", + title: "Command", + name: "bash", + phase: "start", + status: "running", + suppressChannelProgress: true, + }, + }); + await params.onAgentEvent?.({ + stream: "tool", + data: { + itemId: "cmd-1", + toolCallId: "cmd-1", + name: "bash", + phase: "start", + args: { command: "pnpm test" }, + }, + }); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + ...createMinimalRunAgentTurnParams({ + opts: { + onItemEvent, + } satisfies GetReplyOptions, + }), + }); + + expect(result.kind).toBe("success"); + expect(onItemEvent).toHaveBeenCalledWith({ + itemId: "cmd-1", + kind: "command", + title: "Command", + name: "bash", + phase: "start", + status: "running", + }); + }); + it("forwards raw tool progress detail mode to tool-start reply options", async () => { const onToolStart = vi.fn(); state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { @@ -1178,6 +1275,54 @@ describe("runAgentTurnWithFallback", () => { }); }); + it("fires tool-start progress before slow typing signals resolve for best-effort Pi events", async () => { + const onToolStart = vi.fn(async () => {}); + let releaseTyping: (() => void) | undefined; + const typingSignals = createMockTypingSignaler(); + vi.mocked(typingSignals.signalToolStart).mockImplementation( + () => + new Promise((resolve) => { + releaseTyping = resolve; + }), + ); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => { + void params.onAgentEvent?.({ + stream: "tool", + data: { + name: "exec", + phase: "start", + args: { command: "echo hi" }, + }, + }); + await Promise.resolve(); + await Promise.resolve(); + return { payloads: [{ text: "final" }], meta: {} }; + }); + + const runAgentTurnWithFallback = await getRunAgentTurnWithFallback(); + const result = await runAgentTurnWithFallback({ + ...createMinimalRunAgentTurnParams({ + opts: { + onToolStart, + } satisfies GetReplyOptions, + }), + typingSignals, + }); + + try { + expect(result.kind).toBe("success"); + expect(onToolStart).toHaveBeenCalledWith({ + name: "exec", + phase: "start", + args: { command: "echo hi" }, + detailMode: undefined, + }); + } finally { + releaseTyping?.(); + await Promise.resolve(); + } + }); + it("leaves Codex app-server telemetry publication to the harness", async () => { const agentEvents = await import("../../infra/agent-events.js"); const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent); diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index a962203c19d..99b7e961e96 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -1660,8 +1660,7 @@ export async function runAgentTurnWithFallback(params: { const phase = readStringValue(evt.data.phase) ?? ""; const name = readStringValue(evt.data.name); if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - await params.opts?.onToolStart?.({ + const toolStartProgressPromise = params.opts?.onToolStart?.({ name, phase, args: @@ -1670,9 +1669,17 @@ export async function runAgentTurnWithFallback(params: { : undefined, detailMode: params.toolProgressDetail, }); + await Promise.all([ + params.typingSignals.signalToolStart(), + toolStartProgressPromise, + ]); } } - if (evt.stream === "item") { + const suppressItemChannelProgress = + evt.stream === "item" && + evt.data.suppressChannelProgress === true && + Boolean(params.opts?.onToolStart); + if (evt.stream === "item" && !suppressItemChannelProgress) { await params.opts?.onItemEvent?.({ itemId: readStringValue(evt.data.itemId), kind: readStringValue(evt.data.kind), diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index a63125e3270..d9079ea8f94 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -3099,7 +3099,7 @@ describe("dispatchReplyFromConfig", () => { }); }); - it("keeps diagnostic progress when source progress callbacks are suppressed", async () => { + it("forwards non-answer progress callbacks when source replies are suppressed", async () => { setNoAbort(); const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; const dispatcher = createDispatcher(); @@ -3131,6 +3131,7 @@ describe("dispatchReplyFromConfig", () => { dispatcher, replyOptions: { sourceReplyDeliveryMode: "message_tool_only", + allowProgressCallbacksWhenSourceDeliverySuppressed: true, onToolStart: callbacks.toolStart, onItemEvent: callbacks.itemEvent, onCommandOutput: callbacks.commandOutput, @@ -3138,9 +3139,9 @@ describe("dispatchReplyFromConfig", () => { replyResolver, }); - expect(callbacks.toolStart).not.toHaveBeenCalled(); - expect(callbacks.itemEvent).not.toHaveBeenCalled(); - expect(callbacks.commandOutput).not.toHaveBeenCalled(); + expect(callbacks.toolStart).toHaveBeenCalledTimes(1); + expect(callbacks.itemEvent).toHaveBeenCalledTimes(1); + expect(callbacks.commandOutput).toHaveBeenCalledTimes(1); expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(3); expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({ sessionKey: "agent:main:discord:channel:C1", diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 45ac7e35d47..cd6831cb13c 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1243,15 +1243,24 @@ export async function dispatchReplyFromConfig( const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate; const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent; const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary; + const allowSuppressedSourceProgressCallbacks = + params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true; + const shouldForwardProgressCallback = (options?: { + forwardWhenSourceDeliverySuppressed?: boolean; + }) => + !suppressAutomaticSourceDelivery || + (allowSuppressedSourceProgressCallbacks && + options?.forwardWhenSourceDeliverySuppressed === true); const wrapProgressCallback = ( callback: ((...args: Args) => Promise | void) | undefined, + options?: { forwardWhenSourceDeliverySuppressed?: boolean }, ): ((...args: Args) => Promise) | undefined => { if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) { return undefined; } return async (...args: Args) => { markProgress(); - if (!suppressAutomaticSourceDelivery) { + if (shouldForwardProgressCallback(options)) { await callback?.(...args); } }; @@ -1274,11 +1283,21 @@ export async function dispatchReplyFromConfig( onReasoningEnd: wrapProgressCallback(params.replyOptions?.onReasoningEnd), onAssistantMessageStart: wrapProgressCallback(params.replyOptions?.onAssistantMessageStart), onBlockReplyQueued: wrapProgressCallback(params.replyOptions?.onBlockReplyQueued), - onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart), - onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent), - onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput), - onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart), - onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd), + onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, { + forwardWhenSourceDeliverySuppressed: true, + }), + onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, { + forwardWhenSourceDeliverySuppressed: true, + }), + onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, { + forwardWhenSourceDeliverySuppressed: true, + }), + onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, { + forwardWhenSourceDeliverySuppressed: true, + }), + onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, { + forwardWhenSourceDeliverySuppressed: true, + }), onToolResult: (payload: ReplyPayload) => { markProgress(); const run = async () => { @@ -1330,7 +1349,7 @@ export async function dispatchReplyFromConfig( onPlanUpdate: async (payload) => { markProgress(); markInboundDedupeReplayUnsafe(); - if (!suppressAutomaticSourceDelivery) { + if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) { await onPlanUpdateFromReplyOptions?.(payload); } if (payload.phase !== "update" || shouldSuppressDefaultToolProgressMessages()) { @@ -1341,7 +1360,7 @@ export async function dispatchReplyFromConfig( onApprovalEvent: async (payload) => { markProgress(); markInboundDedupeReplayUnsafe(); - if (!suppressAutomaticSourceDelivery) { + if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) { await onApprovalEventFromReplyOptions?.(payload); } if (payload.phase !== "requested" || shouldSuppressDefaultToolProgressMessages()) { @@ -1360,7 +1379,7 @@ export async function dispatchReplyFromConfig( onPatchSummary: async (payload) => { markProgress(); markInboundDedupeReplayUnsafe(); - if (!suppressAutomaticSourceDelivery) { + if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) { await onPatchSummaryFromReplyOptions?.(payload); } if (payload.phase !== "end" || shouldSuppressDefaultToolProgressMessages()) { diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts index e1fc3a54340..80d421f1460 100644 --- a/src/infra/agent-events.ts +++ b/src/infra/agent-events.ts @@ -40,6 +40,8 @@ export type AgentItemEventData = { error?: string; summary?: string; progressText?: string; + /** Preserve item telemetry while letting channel progress render a sibling tool event instead. */ + suppressChannelProgress?: boolean; approvalId?: string; approvalSlug?: string; };