fix: show Codex tool progress in channel drafts (#77949)

Summary:
- Normalize Codex app-server dynamic and native tool activity into channel-visible tool progress.
- Keep Telegram message-tool-only progress drafts visible without duplicate dynamic item/tool lines.
- Preserve suppressed item progress while avoiding duplicate tool callbacks.

Verification:
- OPENCLAW_VITEST_MAX_WORKERS=1 pnpm test extensions/codex/src/app-server/event-projector.test.ts extensions/codex/src/app-server/run-attempt.test.ts extensions/telegram/src/bot-message-dispatch.test.ts src/auto-reply/reply/agent-runner-execution.test.ts src/auto-reply/reply/dispatch-from-config.test.ts --pool=forks --maxWorkers=1
- pnpm tsgo:extensions:test
- pnpm exec oxfmt --check --threads=1 CHANGELOG.md extensions/codex/src/app-server/event-projector.ts extensions/codex/src/app-server/event-projector.test.ts extensions/codex/src/app-server/run-attempt.ts extensions/codex/src/app-server/run-attempt.test.ts extensions/codex/src/app-server/tool-progress-normalization.ts extensions/telegram/src/bot-message-dispatch.ts extensions/telegram/src/bot-message-dispatch.test.ts src/auto-reply/get-reply-options.types.ts src/auto-reply/reply/agent-runner-execution.ts src/auto-reply/reply/agent-runner-execution.test.ts src/auto-reply/reply/dispatch-from-config.ts src/auto-reply/reply/dispatch-from-config.test.ts src/infra/agent-events.ts
- pnpm lint:extensions
- pnpm build
- CI on 6ff6a1f868: 88 success, 20 skipped, 1 neutral, no failures or pending checks

Fixes #75641.
This commit is contained in:
keshavbotagent
2026-05-06 12:48:20 +05:30
committed by GitHub
parent 900e416688
commit 3f210b10ce
14 changed files with 813 additions and 27 deletions

View File

@@ -110,6 +110,7 @@ Docs: https://docs.openclaw.ai
- Dependencies: override transitive `ip-address` to `10.2.0` so the runtime lockfile no longer includes the vulnerable `10.1.0` build flagged by Dependabot alert 109. Thanks @vincentkoc.
- Feishu: hydrate missing native topic starter thread IDs before session routing so first turns and follow-ups stay in the same topic session. Fixes #78262. Thanks @joeyzenghuan.
- LINE: reject `dmPolicy: "open"` configs without wildcard `allowFrom` so webhook DMs fail validation instead of being acknowledged and silently blocked before inbound processing. Fixes #78316.
- Telegram/Codex: keep message-tool-only progress drafts visible and render native Codex tool progress once per tool instead of duplicating item/tool draft lines. Fixes #75641. (#77949)
- Providers/xAI: stop sending OpenAI-style reasoning effort controls to native Grok Responses models, so `xai/grok-4.3` no longer fails live Docker/Gateway runs with `Invalid reasoning effort`.
- Providers/xAI: clamp the bundled xAI thinking profile to `off` so live Gateway runs cannot send unsupported reasoning levels to native Grok Responses models.
- Matrix/approvals: retry approval delivery up to 3 times with a short backoff so transient Matrix send failures do not strand pending approval prompts. (#78179) Thanks @Patrick-Erichsen.

View File

@@ -660,6 +660,164 @@ describe("CodexAppServerEventProjector", () => {
expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 });
});
it("synthesizes normalized tool progress for Codex-native tool items", async () => {
const onAgentEvent = vi.fn();
const projector = await createProjector({ ...(await createParams()), onAgentEvent });
await projector.handleNotification(
forCurrentTurn("item/started", {
item: {
type: "commandExecution",
id: "cmd-1",
command: "pnpm test extensions/codex",
cwd: "/workspace",
processId: null,
source: "agent",
status: "inProgress",
commandActions: [],
aggregatedOutput: null,
exitCode: null,
durationMs: null,
},
}),
);
await projector.handleNotification(
forCurrentTurn("item/completed", {
item: {
type: "commandExecution",
id: "cmd-1",
command: "pnpm test extensions/codex",
cwd: "/workspace",
processId: null,
source: "agent",
status: "completed",
commandActions: [],
aggregatedOutput: "ok",
exitCode: 0,
durationMs: 42,
},
}),
);
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "item",
data: expect.objectContaining({
phase: "start",
kind: "command",
name: "bash",
itemId: "cmd-1",
suppressChannelProgress: true,
}),
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
data: expect.objectContaining({
phase: "start",
name: "bash",
itemId: "cmd-1",
toolCallId: "cmd-1",
args: { command: "pnpm test extensions/codex", cwd: "/workspace" },
}),
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
data: expect.objectContaining({
phase: "result",
name: "bash",
itemId: "cmd-1",
toolCallId: "cmd-1",
status: "completed",
isError: false,
result: expect.objectContaining({ exitCode: 0, durationMs: 42 }),
}),
});
});
it("marks declined Codex-native tool results as non-success", async () => {
const onAgentEvent = vi.fn();
const projector = await createProjector({ ...(await createParams()), onAgentEvent });
await projector.handleNotification(
forCurrentTurn("item/completed", {
item: {
type: "commandExecution",
id: "cmd-declined",
command: "pnpm test extensions/codex",
cwd: "/workspace",
processId: null,
source: "agent",
status: "declined",
commandActions: [],
aggregatedOutput: null,
exitCode: null,
durationMs: null,
},
}),
);
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "item",
data: expect.objectContaining({
phase: "end",
kind: "command",
name: "bash",
itemId: "cmd-declined",
status: "blocked",
suppressChannelProgress: true,
}),
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
data: expect.objectContaining({
phase: "result",
name: "bash",
itemId: "cmd-declined",
toolCallId: "cmd-declined",
status: "blocked",
isError: true,
}),
});
});
it("leaves Codex dynamic tool item progress to item/tool/call normalization", async () => {
const onAgentEvent = vi.fn();
const projector = await createProjector({ ...(await createParams()), onAgentEvent });
await projector.handleNotification(
forCurrentTurn("item/started", {
item: {
type: "dynamicToolCall",
id: "call-1",
namespace: null,
tool: "message",
arguments: { action: "send" },
status: "inProgress",
contentItems: null,
success: null,
durationMs: null,
},
}),
);
expect(onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "item",
data: expect.objectContaining({
phase: "start",
kind: "tool",
name: "message",
suppressChannelProgress: true,
}),
}),
);
expect(onAgentEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({ phase: "start", name: "message" }),
}),
);
});
it("emits verbose tool summaries through onToolResult", async () => {
const onToolResult = vi.fn();
const projector = await createProjector({

View File

@@ -30,6 +30,11 @@ import {
import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js";
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
import {
resolveCodexToolProgressDetailMode,
sanitizeCodexAgentEventRecord,
sanitizeCodexToolArguments,
} from "./tool-progress-normalization.js";
import { attachCodexMirrorIdentity } from "./transcript-mirror.js";
export type CodexAppServerToolTelemetry = {
@@ -396,6 +401,7 @@ export class CodexAppServerEventProjector {
});
}
this.emitStandardItemEvent({ phase: "start", item });
this.emitNormalizedToolItemEvent({ phase: "start", item });
this.emitToolResultSummary(item);
this.emitAgentEvent({
stream: "codex_app_server.item",
@@ -449,6 +455,7 @@ export class CodexAppServerEventProjector {
}
this.recordToolMeta(item);
this.emitStandardItemEvent({ phase: "end", item });
this.emitNormalizedToolItemEvent({ phase: "result", item });
this.emitToolResultSummary(item);
this.emitToolResultOutput(item);
this.emitAgentEvent({
@@ -656,6 +663,7 @@ export class CodexAppServerEventProjector {
return;
}
const meta = itemMeta(item, this.toolProgressDetailMode());
const suppressChannelProgress = shouldSuppressChannelProgressForItem(item);
this.emitAgentEvent({
stream: "item",
data: {
@@ -666,6 +674,42 @@ export class CodexAppServerEventProjector {
status: params.phase === "start" ? "running" : itemStatus(item),
...(itemName(item) ? { name: itemName(item) } : {}),
...(meta ? { meta } : {}),
...(suppressChannelProgress ? { suppressChannelProgress: true } : {}),
},
});
}
private emitNormalizedToolItemEvent(params: {
phase: "start" | "result";
item: CodexThreadItem | undefined;
}): void {
const { item } = params;
if (!item || !shouldSynthesizeToolProgressForItem(item)) {
return;
}
const name = itemName(item);
if (!name) {
return;
}
const meta = itemMeta(item, this.toolProgressDetailMode());
const args = params.phase === "start" ? itemToolArgs(item) : undefined;
const status = params.phase === "result" ? itemStatus(item) : "running";
this.emitAgentEvent({
stream: "tool",
data: {
phase: params.phase,
name,
itemId: item.id,
toolCallId: item.id,
...(meta ? { meta } : {}),
...(args ? { args } : {}),
...(params.phase === "result"
? {
status,
isError: isNonSuccessItemStatus(status),
...itemToolResult(item),
}
: {}),
},
});
}
@@ -743,7 +787,7 @@ export class CodexAppServerEventProjector {
}
private toolProgressDetailMode(): ToolProgressDetailMode {
return this.params.toolProgressDetail === "raw" ? "raw" : "explain";
return resolveCodexToolProgressDetailMode(this.params.toolProgressDetail);
}
private recordToolMeta(item: CodexThreadItem | undefined): void {
@@ -1074,17 +1118,24 @@ function itemTitle(item: CodexThreadItem): string {
}
}
function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" {
function itemStatus(item: CodexThreadItem): "completed" | "failed" | "running" | "blocked" {
const status = readItemString(item, "status");
if (status === "failed") {
return "failed";
}
if (status === "declined") {
return "blocked";
}
if (status === "inProgress" || status === "running") {
return "running";
}
return "completed";
}
function isNonSuccessItemStatus(status: ReturnType<typeof itemStatus>): boolean {
return status === "failed" || status === "blocked";
}
function itemName(item: CodexThreadItem): string | undefined {
if (item.type === "dynamicToolCall" && typeof item.tool === "string") {
return item.tool;
@@ -1105,6 +1156,78 @@ function itemName(item: CodexThreadItem): string | undefined {
return undefined;
}
function shouldSynthesizeToolProgressForItem(item: CodexThreadItem): boolean {
switch (item.type) {
case "commandExecution":
case "fileChange":
case "webSearch":
case "mcpToolCall":
return true;
default:
return false;
}
}
function shouldSuppressChannelProgressForItem(item: CodexThreadItem): boolean {
if (shouldSynthesizeToolProgressForItem(item)) {
return true;
}
// Dynamic OpenClaw tool requests are emitted at the item/tool/call request
// boundary in run-attempt.ts. Re-emitting item notifications to channels can
// duplicate start/result progress when the app-server sends both signals.
return item.type === "dynamicToolCall";
}
function itemToolArgs(item: CodexThreadItem): Record<string, unknown> | undefined {
if (item.type === "commandExecution") {
return sanitizeCodexAgentEventRecord({
command: item.command,
...(typeof item.cwd === "string" ? { cwd: item.cwd } : {}),
});
}
if (item.type === "webSearch" && typeof item.query === "string") {
return sanitizeCodexAgentEventRecord({ query: item.query });
}
if (item.type === "mcpToolCall") {
return sanitizeCodexToolArguments(item.arguments);
}
return undefined;
}
function itemToolResult(item: CodexThreadItem): { result?: Record<string, unknown> } {
if (item.type === "commandExecution") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
exitCode: item.exitCode,
durationMs: item.durationMs,
}),
};
}
if (item.type === "fileChange") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
changes: item.changes.map((change) => ({ path: change.path, kind: change.kind })),
}),
};
}
if (item.type === "mcpToolCall") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
durationMs: item.durationMs,
...(item.error ? { error: item.error } : {}),
...(item.result ? { result: item.result } : {}),
}),
};
}
if (item.type === "webSearch") {
return { result: sanitizeCodexAgentEventRecord({ status: "completed" }) };
}
return {};
}
function itemMeta(
item: CodexThreadItem,
detailMode: ToolProgressDetailMode = "explain",

View File

@@ -79,7 +79,7 @@ function useLightweightCodexRuntimePlan(params: EmbeddedRunAttemptParams): void
resolvedRef: `${params.provider}/${params.modelId}`,
harnessId: "codex",
},
} as NonNullable<EmbeddedRunAttemptParams["runtimePlan"]>;
} as unknown as NonNullable<EmbeddedRunAttemptParams["runtimePlan"]>;
}
function threadStartResult(threadId = "thread-1") {
@@ -183,6 +183,7 @@ function createAppServerHarness(
) {
const requests: Array<{ method: string; params: unknown }> = [];
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
let handleServerRequest: AppServerRequestHandler | undefined;
const request = vi.fn(async (method: string, params?: unknown) => {
requests.push({ method, params });
return requestImpl(method, params);
@@ -197,11 +198,22 @@ function createAppServerHarness(
notify = handler;
return () => undefined;
},
addRequestHandler: () => () => undefined,
addRequestHandler: (handler: AppServerRequestHandler) => {
handleServerRequest = handler;
return () => undefined;
},
} as never;
},
);
const waitForServerRequestHandler = async () => {
await vi.waitFor(() => expect(handleServerRequest).toBeTypeOf("function"), {
interval: 1,
timeout: 30_000,
});
return handleServerRequest!;
};
return {
request,
requests,
@@ -223,6 +235,11 @@ function createAppServerHarness(
async notify(notification: CodexServerNotification) {
await notify(notification);
},
waitForServerRequestHandler,
async handleServerRequest(request: Parameters<AppServerRequestHandler>[0]) {
const handler = await waitForServerRequestHandler();
return handler(request);
},
async completeTurn(params: { threadId: string; turnId: string }) {
await notify({
method: "turn/completed",
@@ -349,6 +366,12 @@ function createNamedDynamicTool(
};
}
type AppServerRequestHandler = (request: {
id: string | number;
method: string;
params?: unknown;
}) => Promise<unknown>;
function extractRelayIdFromThreadRequest(params: unknown): string {
const command = (
params as {
@@ -656,6 +679,93 @@ describe("runCodexAppServerAttempt", () => {
});
});
it("emits normalized tool progress around app-server dynamic tool requests", async () => {
const harness = createStartedThreadHarness();
const onRunAgentEvent = vi.fn();
const globalAgentEvents: AgentEventPayload[] = [];
onAgentEvent((event) => globalAgentEvents.push(event));
const params = createParams(
path.join(tempDir, "session.jsonl"),
path.join(tempDir, "workspace"),
);
params.onAgentEvent = onRunAgentEvent;
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await expect(
harness.handleServerRequest({
id: "request-tool-1",
method: "item/tool/call",
params: {
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
namespace: null,
tool: "message",
arguments: {
action: "send",
token: "plain-secret-value-12345",
text: "hello",
},
},
}),
).resolves.toMatchObject({
success: false,
contentItems: [
{
type: "inputText",
text: expect.stringMatching(
/^(Unknown OpenClaw tool: message|Action send requires a target\.)$/u,
),
},
],
});
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event);
expect(agentEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({
phase: "start",
name: "message",
toolCallId: "call-1",
args: expect.objectContaining({
action: "send",
token: "plain-…2345",
text: "hello",
}),
}),
}),
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({
phase: "result",
name: "message",
toolCallId: "call-1",
isError: true,
result: expect.objectContaining({ success: false }),
}),
}),
]),
);
expect(JSON.stringify(agentEvents)).not.toContain("plain-secret-value-12345");
expect(globalAgentEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
runId: "run-1",
sessionKey: "agent:main:session-1",
stream: "tool",
data: expect.objectContaining({ phase: "start", name: "message" }),
}),
]),
);
});
it("releases the session when Codex never completes after a dynamic tool response", async () => {
let handleRequest:
| ((request: { id: string; method: string; params?: unknown }) => Promise<unknown>)

View File

@@ -98,6 +98,12 @@ import {
codexDynamicToolsFingerprint,
startOrResumeThread,
} from "./thread-lifecycle.js";
import {
inferCodexDynamicToolMeta,
resolveCodexToolProgressDetailMode,
sanitizeCodexToolArguments,
sanitizeCodexToolResponse,
} from "./tool-progress-normalization.js";
import {
createCodexTrajectoryRecorder,
normalizeCodexTrajectoryError,
@@ -973,6 +979,19 @@ export async function runCodexAppServerAttempt(
name: call.tool,
arguments: call.arguments,
});
const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail);
const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode);
const toolArgs = sanitizeCodexToolArguments(call.arguments);
emitCodexAppServerEvent(params, {
stream: "tool",
data: {
phase: "start",
name: call.tool,
toolCallId: call.callId,
...(toolMeta ? { meta: toolMeta } : {}),
...(toolArgs ? { args: toolArgs } : {}),
},
});
const response = await handleDynamicToolCallWithTimeout({
call,
toolBridge,
@@ -996,6 +1015,17 @@ export async function runCodexAppServerAttempt(
success: response.success,
contentItems: response.contentItems,
});
emitCodexAppServerEvent(params, {
stream: "tool",
data: {
phase: "result",
name: call.tool,
toolCallId: call.callId,
...(toolMeta ? { meta: toolMeta } : {}),
isError: !response.success,
result: sanitizeCodexToolResponse(response),
},
});
return response as JsonValue;
} finally {
activeAppServerTurnRequests = Math.max(0, activeAppServerTurnRequests - 1);

View File

@@ -0,0 +1,77 @@
import {
inferToolMetaFromArgs,
type EmbeddedRunAttemptParams,
type ToolProgressDetailMode,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import { redactSensitiveFieldValue, redactToolPayloadText } from "openclaw/plugin-sdk/text-runtime";
import {
isJsonObject,
type CodexDynamicToolCallParams,
type CodexDynamicToolCallResponse,
type JsonValue,
} from "./protocol.js";
export function resolveCodexToolProgressDetailMode(
value: EmbeddedRunAttemptParams["toolProgressDetail"],
): ToolProgressDetailMode {
return value === "raw" ? "raw" : "explain";
}
export function sanitizeCodexAgentEventValue(
value: unknown,
seen = new WeakSet<object>(),
): unknown {
if (typeof value === "string") {
return redactToolPayloadText(value);
}
if (Array.isArray(value)) {
if (seen.has(value)) {
return "[Circular]";
}
seen.add(value);
return value.map((entry) => sanitizeCodexAgentEventValue(entry, seen));
}
if (value && typeof value === "object") {
if (seen.has(value)) {
return "[Circular]";
}
seen.add(value);
const out: Record<string, unknown> = {};
for (const [key, child] of Object.entries(value as Record<string, unknown>)) {
out[key] =
typeof child === "string"
? redactSensitiveFieldValue(key, child)
: sanitizeCodexAgentEventValue(child, seen);
}
return out;
}
return value;
}
export function sanitizeCodexAgentEventRecord(
value: Record<string, unknown>,
): Record<string, unknown> {
return sanitizeCodexAgentEventValue(value) as Record<string, unknown>;
}
export function sanitizeCodexToolArguments(
value: JsonValue | undefined,
): Record<string, unknown> | undefined {
if (!isJsonObject(value)) {
return undefined;
}
return sanitizeCodexAgentEventRecord(value);
}
export function sanitizeCodexToolResponse(
response: CodexDynamicToolCallResponse,
): Record<string, unknown> {
return sanitizeCodexAgentEventRecord(response as unknown as Record<string, unknown>);
}
export function inferCodexDynamicToolMeta(
call: Pick<CodexDynamicToolCallParams, "tool" | "arguments">,
detailMode: ToolProgressDetailMode,
): string | undefined {
return inferToolMetaFromArgs(call.tool, call.arguments, { detailMode });
}

View File

@@ -2,7 +2,10 @@ import type { Bot } from "grammy";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { resolveAutoTopicLabelConfig as resolveAutoTopicLabelConfigRuntime } from "./auto-topic-label-config.js";
import type { TelegramBotDeps } from "./bot-deps.js";
import { createTestDraftStream } from "./draft-stream.test-helpers.js";
import {
createSequencedTestDraftStream,
createTestDraftStream,
} from "./draft-stream.test-helpers.js";
type DispatchReplyWithBufferedBlockDispatcherArgs = Parameters<
TelegramBotDeps["dispatchReplyWithBufferedBlockDispatcher"]
@@ -259,6 +262,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
});
const createDraftStream = (messageId?: number) => createTestDraftStream({ messageId });
const createSequencedDraftStream = (startMessageId = 1001) =>
createSequencedTestDraftStream(startMessageId);
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
@@ -998,6 +1003,94 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("shows Telegram progress drafts immediately for explicit tool starts", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(expect.stringMatching(/^Shelling\n`🛠️ Exec`$/));
expect(draftStream.flush).toHaveBeenCalled();
});
it("renders Telegram progress drafts before slow status reactions resolve", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
let releaseSetTool: (() => void) | undefined;
const statusReactionController = createStatusReactionController();
statusReactionController.setTool.mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseSetTool = resolve;
}),
);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await Promise.resolve();
await Promise.resolve();
const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0];
releaseSetTool?.();
await pendingToolStart;
expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠️ Exec`$/);
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext({
statusReactionController: statusReactionController as never,
}),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(statusReactionController.setTool).toHaveBeenCalledWith("exec");
});
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
await replyOptions?.onItemEvent?.({ progressText: "tests passed" });
await replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(
expect.stringMatching(/^Shelling\n`🔎 Web Search: docs lookup`\n• `tests passed`$/),
);
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.materialize).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [expect.objectContaining({ text: "Final after tool" })],
}),
);
expect(editMessageTelegram).not.toHaveBeenCalled();
});
it("falls back to normal send for error payloads and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {

View File

@@ -490,7 +490,10 @@ export const dispatchTelegramMessage = async ({
const progressDraftGate = createChannelProgressDraftGate({
onStart: () => renderProgressDraft({ flush: true }),
});
const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => {
const pushStreamToolProgress = async (
line?: string,
options?: { toolName?: string; startImmediately?: boolean },
) => {
if (!answerLane.stream) {
return;
}
@@ -529,6 +532,19 @@ export const dispatchTelegramMessage = async ({
);
}
}
if (
options?.startImmediately &&
streamToolProgressEnabled &&
!streamToolProgressSuppressed &&
normalized
) {
const alreadyStarted = progressDraftGate.hasStarted;
await progressDraftGate.startNow();
if (alreadyStarted && progressDraftGate.hasStarted) {
await renderProgressDraft();
}
return;
}
const alreadyStarted = progressDraftGate.hasStarted;
await progressDraftGate.noteWork();
if (alreadyStarted && progressDraftGate.hasStarted) {
@@ -1189,12 +1205,10 @@ export const dispatchTelegramMessage = async ({
: undefined,
suppressDefaultToolProgressMessages:
!streamDeliveryEnabled || Boolean(answerLane.stream),
allowProgressCallbacksWhenSourceDeliverySuppressed: Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
await pushStreamToolProgress(
const progressPromise = pushStreamToolProgress(
formatChannelProgressDraftLineForEntry(
telegramCfg,
{
@@ -1205,8 +1219,12 @@ export const dispatchTelegramMessage = async ({
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName },
{ toolName, startImmediately: true },
);
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
await progressPromise;
},
onItemEvent: async (payload) => {
await pushStreamToolProgress(

View File

@@ -163,6 +163,8 @@ export type GetReplyOptions = {
* output private; visible channel output must come from the message tool.
*/
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
/** Allow channel-owned progress UI while final/source reply delivery remains message-tool-only. */
allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean;
disableBlockStreaming?: boolean;
/** Timeout for block reply delivery (ms). */
blockReplyTimeoutMs?: number;

View File

@@ -1145,6 +1145,103 @@ describe("runAgentTurnWithFallback", () => {
});
});
it("skips channel item progress when a matching tool event carries the progress", async () => {
const onItemEvent = vi.fn();
const onToolStart = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "item",
data: {
itemId: "cmd-1",
kind: "command",
title: "Command",
name: "bash",
phase: "start",
status: "running",
suppressChannelProgress: true,
},
});
await params.onAgentEvent?.({
stream: "tool",
data: {
itemId: "cmd-1",
toolCallId: "cmd-1",
name: "bash",
phase: "start",
args: { command: "pnpm test" },
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({
opts: {
onItemEvent,
onToolStart,
} satisfies GetReplyOptions,
}),
});
expect(result.kind).toBe("success");
expect(onItemEvent).not.toHaveBeenCalled();
expect(onToolStart).toHaveBeenCalledWith({
name: "bash",
phase: "start",
args: { command: "pnpm test" },
detailMode: undefined,
});
});
it("preserves suppressed item progress when no tool-start callback is registered", async () => {
const onItemEvent = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
await params.onAgentEvent?.({
stream: "item",
data: {
itemId: "cmd-1",
kind: "command",
title: "Command",
name: "bash",
phase: "start",
status: "running",
suppressChannelProgress: true,
},
});
await params.onAgentEvent?.({
stream: "tool",
data: {
itemId: "cmd-1",
toolCallId: "cmd-1",
name: "bash",
phase: "start",
args: { command: "pnpm test" },
},
});
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({
opts: {
onItemEvent,
} satisfies GetReplyOptions,
}),
});
expect(result.kind).toBe("success");
expect(onItemEvent).toHaveBeenCalledWith({
itemId: "cmd-1",
kind: "command",
title: "Command",
name: "bash",
phase: "start",
status: "running",
});
});
it("forwards raw tool progress detail mode to tool-start reply options", async () => {
const onToolStart = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
@@ -1178,6 +1275,54 @@ describe("runAgentTurnWithFallback", () => {
});
});
it("fires tool-start progress before slow typing signals resolve for best-effort Pi events", async () => {
const onToolStart = vi.fn(async () => {});
let releaseTyping: (() => void) | undefined;
const typingSignals = createMockTypingSignaler();
vi.mocked(typingSignals.signalToolStart).mockImplementation(
() =>
new Promise<void>((resolve) => {
releaseTyping = resolve;
}),
);
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
void params.onAgentEvent?.({
stream: "tool",
data: {
name: "exec",
phase: "start",
args: { command: "echo hi" },
},
});
await Promise.resolve();
await Promise.resolve();
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
...createMinimalRunAgentTurnParams({
opts: {
onToolStart,
} satisfies GetReplyOptions,
}),
typingSignals,
});
try {
expect(result.kind).toBe("success");
expect(onToolStart).toHaveBeenCalledWith({
name: "exec",
phase: "start",
args: { command: "echo hi" },
detailMode: undefined,
});
} finally {
releaseTyping?.();
await Promise.resolve();
}
});
it("leaves Codex app-server telemetry publication to the harness", async () => {
const agentEvents = await import("../../infra/agent-events.js");
const emitAgentEvent = vi.mocked(agentEvents.emitAgentEvent);

View File

@@ -1660,8 +1660,7 @@ export async function runAgentTurnWithFallback(params: {
const phase = readStringValue(evt.data.phase) ?? "";
const name = readStringValue(evt.data.name);
if (phase === "start" || phase === "update") {
await params.typingSignals.signalToolStart();
await params.opts?.onToolStart?.({
const toolStartProgressPromise = params.opts?.onToolStart?.({
name,
phase,
args:
@@ -1670,9 +1669,17 @@ export async function runAgentTurnWithFallback(params: {
: undefined,
detailMode: params.toolProgressDetail,
});
await Promise.all([
params.typingSignals.signalToolStart(),
toolStartProgressPromise,
]);
}
}
if (evt.stream === "item") {
const suppressItemChannelProgress =
evt.stream === "item" &&
evt.data.suppressChannelProgress === true &&
Boolean(params.opts?.onToolStart);
if (evt.stream === "item" && !suppressItemChannelProgress) {
await params.opts?.onItemEvent?.({
itemId: readStringValue(evt.data.itemId),
kind: readStringValue(evt.data.kind),

View File

@@ -3099,7 +3099,7 @@ describe("dispatchReplyFromConfig", () => {
});
});
it("keeps diagnostic progress when source progress callbacks are suppressed", async () => {
it("forwards non-answer progress callbacks when source replies are suppressed", async () => {
setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
const dispatcher = createDispatcher();
@@ -3131,6 +3131,7 @@ describe("dispatchReplyFromConfig", () => {
dispatcher,
replyOptions: {
sourceReplyDeliveryMode: "message_tool_only",
allowProgressCallbacksWhenSourceDeliverySuppressed: true,
onToolStart: callbacks.toolStart,
onItemEvent: callbacks.itemEvent,
onCommandOutput: callbacks.commandOutput,
@@ -3138,9 +3139,9 @@ describe("dispatchReplyFromConfig", () => {
replyResolver,
});
expect(callbacks.toolStart).not.toHaveBeenCalled();
expect(callbacks.itemEvent).not.toHaveBeenCalled();
expect(callbacks.commandOutput).not.toHaveBeenCalled();
expect(callbacks.toolStart).toHaveBeenCalledTimes(1);
expect(callbacks.itemEvent).toHaveBeenCalledTimes(1);
expect(callbacks.commandOutput).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledTimes(3);
expect(diagnosticMocks.markDiagnosticSessionProgress).toHaveBeenCalledWith({
sessionKey: "agent:main:discord:channel:C1",

View File

@@ -1243,15 +1243,24 @@ export async function dispatchReplyFromConfig(
const onPlanUpdateFromReplyOptions = params.replyOptions?.onPlanUpdate;
const onApprovalEventFromReplyOptions = params.replyOptions?.onApprovalEvent;
const onPatchSummaryFromReplyOptions = params.replyOptions?.onPatchSummary;
const allowSuppressedSourceProgressCallbacks =
params.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed === true;
const shouldForwardProgressCallback = (options?: {
forwardWhenSourceDeliverySuppressed?: boolean;
}) =>
!suppressAutomaticSourceDelivery ||
(allowSuppressedSourceProgressCallbacks &&
options?.forwardWhenSourceDeliverySuppressed === true);
const wrapProgressCallback = <Args extends unknown[]>(
callback: ((...args: Args) => Promise<void> | void) | undefined,
options?: { forwardWhenSourceDeliverySuppressed?: boolean },
): ((...args: Args) => Promise<void>) | undefined => {
if (!callback && (!suppressAutomaticSourceDelivery || !canTrackSession)) {
return undefined;
}
return async (...args: Args) => {
markProgress();
if (!suppressAutomaticSourceDelivery) {
if (shouldForwardProgressCallback(options)) {
await callback?.(...args);
}
};
@@ -1274,11 +1283,21 @@ export async function dispatchReplyFromConfig(
onReasoningEnd: wrapProgressCallback(params.replyOptions?.onReasoningEnd),
onAssistantMessageStart: wrapProgressCallback(params.replyOptions?.onAssistantMessageStart),
onBlockReplyQueued: wrapProgressCallback(params.replyOptions?.onBlockReplyQueued),
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart),
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent),
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput),
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart),
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd),
onToolStart: wrapProgressCallback(params.replyOptions?.onToolStart, {
forwardWhenSourceDeliverySuppressed: true,
}),
onItemEvent: wrapProgressCallback(params.replyOptions?.onItemEvent, {
forwardWhenSourceDeliverySuppressed: true,
}),
onCommandOutput: wrapProgressCallback(params.replyOptions?.onCommandOutput, {
forwardWhenSourceDeliverySuppressed: true,
}),
onCompactionStart: wrapProgressCallback(params.replyOptions?.onCompactionStart, {
forwardWhenSourceDeliverySuppressed: true,
}),
onCompactionEnd: wrapProgressCallback(params.replyOptions?.onCompactionEnd, {
forwardWhenSourceDeliverySuppressed: true,
}),
onToolResult: (payload: ReplyPayload) => {
markProgress();
const run = async () => {
@@ -1330,7 +1349,7 @@ export async function dispatchReplyFromConfig(
onPlanUpdate: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) {
if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) {
await onPlanUpdateFromReplyOptions?.(payload);
}
if (payload.phase !== "update" || shouldSuppressDefaultToolProgressMessages()) {
@@ -1341,7 +1360,7 @@ export async function dispatchReplyFromConfig(
onApprovalEvent: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) {
if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) {
await onApprovalEventFromReplyOptions?.(payload);
}
if (payload.phase !== "requested" || shouldSuppressDefaultToolProgressMessages()) {
@@ -1360,7 +1379,7 @@ export async function dispatchReplyFromConfig(
onPatchSummary: async (payload) => {
markProgress();
markInboundDedupeReplayUnsafe();
if (!suppressAutomaticSourceDelivery) {
if (shouldForwardProgressCallback({ forwardWhenSourceDeliverySuppressed: true })) {
await onPatchSummaryFromReplyOptions?.(payload);
}
if (payload.phase !== "end" || shouldSuppressDefaultToolProgressMessages()) {

View File

@@ -40,6 +40,8 @@ export type AgentItemEventData = {
error?: string;
summary?: string;
progressText?: string;
/** Preserve item telemetry while letting channel progress render a sibling tool event instead. */
suppressChannelProgress?: boolean;
approvalId?: string;
approvalSlug?: string;
};