diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index 0ad6ff06c53..d3d6747ed3e 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -660,6 +660,112 @@ describe("CodexAppServerEventProjector", () => { expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 }); }); + it("synthesizes normalized tool progress for Codex-native tool items", async () => { + const onAgentEvent = vi.fn(); + const projector = await createProjector({ ...(await createParams()), onAgentEvent }); + + await projector.handleNotification( + forCurrentTurn("item/started", { + item: { + type: "commandExecution", + id: "cmd-1", + command: "pnpm test extensions/codex", + cwd: "/workspace", + processId: null, + source: "agent", + status: "inProgress", + commandActions: [], + aggregatedOutput: null, + exitCode: null, + durationMs: null, + }, + }), + ); + await projector.handleNotification( + forCurrentTurn("item/completed", { + item: { + type: "commandExecution", + id: "cmd-1", + command: "pnpm test extensions/codex", + cwd: "/workspace", + processId: null, + source: "agent", + status: "completed", + commandActions: [], + aggregatedOutput: "ok", + exitCode: 0, + durationMs: 42, + }, + }), + ); + + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "item", + data: expect.objectContaining({ + phase: "start", + kind: "command", + name: "bash", + itemId: "cmd-1", + }), + }); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "tool", + data: expect.objectContaining({ + phase: "start", + name: "bash", + itemId: "cmd-1", + toolCallId: "cmd-1", + args: { command: "pnpm test extensions/codex", cwd: "/workspace" }, + }), + }); + expect(onAgentEvent).toHaveBeenCalledWith({ + stream: "tool", + data: expect.objectContaining({ + phase: "result", + name: "bash", + itemId: "cmd-1", + toolCallId: "cmd-1", + status: "completed", + isError: false, + result: expect.objectContaining({ exitCode: 0, durationMs: 42 }), + }), + }); + }); + + it("leaves Codex dynamic tool item progress to item/tool/call normalization", async () => { + const onAgentEvent = vi.fn(); + const projector = await createProjector({ ...(await createParams()), onAgentEvent }); + + await projector.handleNotification( + forCurrentTurn("item/started", { + item: { + type: "dynamicToolCall", + id: "call-1", + namespace: null, + tool: "message", + arguments: { action: "send" }, + status: "inProgress", + contentItems: null, + success: null, + durationMs: null, + }, + }), + ); + + expect(onAgentEvent).toHaveBeenCalledWith( + expect.objectContaining({ + stream: "item", + data: expect.objectContaining({ phase: "start", kind: "tool", name: "message" }), + }), + ); + expect(onAgentEvent).not.toHaveBeenCalledWith( + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ phase: "start", name: "message" }), + }), + ); + }); + it("emits verbose tool summaries through onToolResult", async () => { const onToolResult = vi.fn(); const projector = await createProjector({ diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index 2f9752c6788..c5e2886bcea 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -30,6 +30,11 @@ import { import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js"; import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js"; import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; +import { + resolveCodexToolProgressDetailMode, + sanitizeCodexAgentEventRecord, + sanitizeCodexToolArguments, +} from "./tool-progress-normalization.js"; import { attachCodexMirrorIdentity } from "./transcript-mirror.js"; export type CodexAppServerToolTelemetry = { @@ -396,6 +401,7 @@ export class CodexAppServerEventProjector { }); } this.emitStandardItemEvent({ phase: "start", item }); + this.emitNormalizedToolItemEvent({ phase: "start", item }); this.emitToolResultSummary(item); this.emitAgentEvent({ stream: "codex_app_server.item", @@ -449,6 +455,7 @@ export class CodexAppServerEventProjector { } this.recordToolMeta(item); this.emitStandardItemEvent({ phase: "end", item }); + this.emitNormalizedToolItemEvent({ phase: "result", item }); this.emitToolResultSummary(item); this.emitToolResultOutput(item); this.emitAgentEvent({ @@ -670,6 +677,41 @@ export class CodexAppServerEventProjector { }); } + private emitNormalizedToolItemEvent(params: { + phase: "start" | "result"; + item: CodexThreadItem | undefined; + }): void { + const { item } = params; + if (!item || !shouldSynthesizeToolProgressForItem(item)) { + return; + } + const name = itemName(item); + if (!name) { + return; + } + const meta = itemMeta(item, this.toolProgressDetailMode()); + const args = params.phase === "start" ? itemToolArgs(item) : undefined; + const status = params.phase === "result" ? itemStatus(item) : "running"; + this.emitAgentEvent({ + stream: "tool", + data: { + phase: params.phase, + name, + itemId: item.id, + toolCallId: item.id, + ...(meta ? { meta } : {}), + ...(args ? { args } : {}), + ...(params.phase === "result" + ? { + status, + isError: status === "failed", + ...itemToolResult(item), + } + : {}), + }, + }); + } + private emitToolResultSummary(item: CodexThreadItem | undefined): void { if (!item || !this.params.onToolResult || !this.shouldEmitToolResult()) { return; @@ -743,7 +785,7 @@ export class CodexAppServerEventProjector { } private toolProgressDetailMode(): ToolProgressDetailMode { - return this.params.toolProgressDetail === "raw" ? "raw" : "explain"; + return resolveCodexToolProgressDetailMode(this.params.toolProgressDetail); } private recordToolMeta(item: CodexThreadItem | undefined): void { @@ -1105,6 +1147,73 @@ function itemName(item: CodexThreadItem): string | undefined { return undefined; } +function shouldSynthesizeToolProgressForItem(item: CodexThreadItem): boolean { + switch (item.type) { + case "commandExecution": + case "fileChange": + case "webSearch": + case "mcpToolCall": + return true; + // Dynamic OpenClaw tool requests are emitted at the item/tool/call request + // boundary in run-attempt.ts. Re-emitting them from item notifications can + // duplicate start/result events when the app-server sends both signals. + case "dynamicToolCall": + return false; + default: + return false; + } +} + +function itemToolArgs(item: CodexThreadItem): Record | undefined { + if (item.type === "commandExecution") { + return sanitizeCodexAgentEventRecord({ + command: item.command, + ...(typeof item.cwd === "string" ? { cwd: item.cwd } : {}), + }); + } + if (item.type === "webSearch" && typeof item.query === "string") { + return sanitizeCodexAgentEventRecord({ query: item.query }); + } + if (item.type === "mcpToolCall") { + return sanitizeCodexToolArguments(item.arguments); + } + return undefined; +} + +function itemToolResult(item: CodexThreadItem): { result?: Record } { + if (item.type === "commandExecution") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + exitCode: item.exitCode, + durationMs: item.durationMs, + }), + }; + } + if (item.type === "fileChange") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + changes: item.changes.map((change) => ({ path: change.path, kind: change.kind })), + }), + }; + } + if (item.type === "mcpToolCall") { + return { + result: sanitizeCodexAgentEventRecord({ + status: item.status, + durationMs: item.durationMs, + ...(item.error ? { error: item.error } : {}), + ...(item.result ? { result: item.result } : {}), + }), + }; + } + if (item.type === "webSearch") { + return { result: sanitizeCodexAgentEventRecord({ status: "completed" }) }; + } + return {}; +} + function itemMeta( item: CodexThreadItem, detailMode: ToolProgressDetailMode = "explain", diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index f67f3991854..488538c8e7e 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -183,6 +183,7 @@ function createAppServerHarness( ) { const requests: Array<{ method: string; params: unknown }> = []; let notify: (notification: CodexServerNotification) => Promise = async () => undefined; + let handleServerRequest: AppServerRequestHandler | undefined; const request = vi.fn(async (method: string, params?: unknown) => { requests.push({ method, params }); return requestImpl(method, params); @@ -197,11 +198,22 @@ function createAppServerHarness( notify = handler; return () => undefined; }, - addRequestHandler: () => () => undefined, + addRequestHandler: (handler: AppServerRequestHandler) => { + handleServerRequest = handler; + return () => undefined; + }, } as never; }, ); + const waitForServerRequestHandler = async () => { + await vi.waitFor(() => expect(handleServerRequest).toBeTypeOf("function"), { + interval: 1, + timeout: 30_000, + }); + return handleServerRequest!; + }; + return { request, requests, @@ -223,6 +235,11 @@ function createAppServerHarness( async notify(notification: CodexServerNotification) { await notify(notification); }, + waitForServerRequestHandler, + async handleServerRequest(request: Parameters[0]) { + const handler = await waitForServerRequestHandler(); + return handler(request); + }, async completeTurn(params: { threadId: string; turnId: string }) { await notify({ method: "turn/completed", @@ -349,6 +366,12 @@ function createNamedDynamicTool( }; } +type AppServerRequestHandler = (request: { + id: string | number; + method: string; + params?: unknown; +}) => Promise; + function extractRelayIdFromThreadRequest(params: unknown): string { const command = ( params as { @@ -656,6 +679,93 @@ describe("runCodexAppServerAttempt", () => { }); }); + it("emits normalized tool progress around app-server dynamic tool requests", async () => { + const harness = createStartedThreadHarness(); + const onRunAgentEvent = vi.fn(); + const globalAgentEvents: AgentEventPayload[] = []; + onAgentEvent((event) => globalAgentEvents.push(event)); + const params = createParams( + path.join(tempDir, "session.jsonl"), + path.join(tempDir, "workspace"), + ); + params.onAgentEvent = onRunAgentEvent; + + const run = runCodexAppServerAttempt(params); + await harness.waitForMethod("turn/start"); + + await expect( + harness.handleServerRequest({ + id: "request-tool-1", + method: "item/tool/call", + params: { + threadId: "thread-1", + turnId: "turn-1", + callId: "call-1", + namespace: null, + tool: "message", + arguments: { + action: "send", + token: "plain-secret-value-12345", + text: "hello", + }, + }, + }), + ).resolves.toMatchObject({ + success: false, + contentItems: [ + { + type: "inputText", + text: expect.stringMatching( + /^(Unknown OpenClaw tool: message|Action send requires a target\.)$/u, + ), + }, + ], + }); + + await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" }); + await run; + + const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event); + expect(agentEvents).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ + phase: "start", + name: "message", + toolCallId: "call-1", + args: expect.objectContaining({ + action: "send", + token: "plain-…2345", + text: "hello", + }), + }), + }), + expect.objectContaining({ + stream: "tool", + data: expect.objectContaining({ + phase: "result", + name: "message", + toolCallId: "call-1", + isError: true, + result: expect.objectContaining({ success: false }), + }), + }), + ]), + ); + expect(JSON.stringify(agentEvents)).not.toContain("plain-secret-value-12345"); + expect(globalAgentEvents).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + runId: "run-1", + sessionKey: "agent:main:session-1", + stream: "tool", + data: expect.objectContaining({ phase: "start", name: "message" }), + }), + ]), + ); + }); + it("releases the session when Codex never completes after a dynamic tool response", async () => { let handleRequest: | ((request: { id: string; method: string; params?: unknown }) => Promise) diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 8556a823f9a..200d6070610 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -98,6 +98,12 @@ import { codexDynamicToolsFingerprint, startOrResumeThread, } from "./thread-lifecycle.js"; +import { + inferCodexDynamicToolMeta, + resolveCodexToolProgressDetailMode, + sanitizeCodexToolArguments, + sanitizeCodexToolResponse, +} from "./tool-progress-normalization.js"; import { createCodexTrajectoryRecorder, normalizeCodexTrajectoryError, @@ -973,6 +979,19 @@ export async function runCodexAppServerAttempt( name: call.tool, arguments: call.arguments, }); + const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail); + const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode); + const toolArgs = sanitizeCodexToolArguments(call.arguments); + emitCodexAppServerEvent(params, { + stream: "tool", + data: { + phase: "start", + name: call.tool, + toolCallId: call.callId, + ...(toolMeta ? { meta: toolMeta } : {}), + ...(toolArgs ? { args: toolArgs } : {}), + }, + }); const response = await handleDynamicToolCallWithTimeout({ call, toolBridge, @@ -996,6 +1015,17 @@ export async function runCodexAppServerAttempt( success: response.success, contentItems: response.contentItems, }); + emitCodexAppServerEvent(params, { + stream: "tool", + data: { + phase: "result", + name: call.tool, + toolCallId: call.callId, + ...(toolMeta ? { meta: toolMeta } : {}), + isError: !response.success, + result: sanitizeCodexToolResponse(response), + }, + }); return response as JsonValue; } finally { activeAppServerTurnRequests = Math.max(0, activeAppServerTurnRequests - 1); diff --git a/extensions/codex/src/app-server/tool-progress-normalization.ts b/extensions/codex/src/app-server/tool-progress-normalization.ts new file mode 100644 index 00000000000..be09c380a0c --- /dev/null +++ b/extensions/codex/src/app-server/tool-progress-normalization.ts @@ -0,0 +1,77 @@ +import { + inferToolMetaFromArgs, + type EmbeddedRunAttemptParams, + type ToolProgressDetailMode, +} from "openclaw/plugin-sdk/agent-harness-runtime"; +import { redactSensitiveFieldValue, redactToolPayloadText } from "openclaw/plugin-sdk/text-runtime"; +import { + isJsonObject, + type CodexDynamicToolCallParams, + type CodexDynamicToolCallResponse, + type JsonValue, +} from "./protocol.js"; + +export function resolveCodexToolProgressDetailMode( + value: EmbeddedRunAttemptParams["toolProgressDetail"], +): ToolProgressDetailMode { + return value === "raw" ? "raw" : "explain"; +} + +export function sanitizeCodexAgentEventValue( + value: unknown, + seen = new WeakSet(), +): unknown { + if (typeof value === "string") { + return redactToolPayloadText(value); + } + if (Array.isArray(value)) { + if (seen.has(value)) { + return "[Circular]"; + } + seen.add(value); + return value.map((entry) => sanitizeCodexAgentEventValue(entry, seen)); + } + if (value && typeof value === "object") { + if (seen.has(value)) { + return "[Circular]"; + } + seen.add(value); + const out: Record = {}; + for (const [key, child] of Object.entries(value as Record)) { + out[key] = + typeof child === "string" + ? redactSensitiveFieldValue(key, child) + : sanitizeCodexAgentEventValue(child, seen); + } + return out; + } + return value; +} + +export function sanitizeCodexAgentEventRecord( + value: Record, +): Record { + return sanitizeCodexAgentEventValue(value) as Record; +} + +export function sanitizeCodexToolArguments( + value: JsonValue | undefined, +): Record | undefined { + if (!isJsonObject(value)) { + return undefined; + } + return sanitizeCodexAgentEventRecord(value); +} + +export function sanitizeCodexToolResponse( + response: CodexDynamicToolCallResponse, +): Record { + return sanitizeCodexAgentEventRecord(response as unknown as Record); +} + +export function inferCodexDynamicToolMeta( + call: Pick, + detailMode: ToolProgressDetailMode, +): string | undefined { + return inferToolMetaFromArgs(call.tool, call.arguments, { detailMode }); +} diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 7f4baa217aa..a9698616c8b 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -998,6 +998,61 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("shows Telegram progress drafts immediately for explicit tool starts", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + return { queuedFinal: false }; + }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith(expect.stringMatching(/^Shelling\n`šŸ› ļø Exec`$/)); + expect(draftStream.flush).toHaveBeenCalled(); + }); + + it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" }); + await replyOptions?.onItemEvent?.({ progressText: "tests passed" }); + await replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith( + expect.stringMatching(/^Shelling\n`šŸ”Ž Web Search: docs lookup`\n• `tests passed`$/), + ); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.materialize).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 2001, + "Final after tool", + expect.any(Object), + ); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + it("falls back to normal send for error payloads and clears the pending stream", async () => { const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index f227bb19fed..d537d1f945c 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -490,7 +490,10 @@ export const dispatchTelegramMessage = async ({ const progressDraftGate = createChannelProgressDraftGate({ onStart: () => renderProgressDraft({ flush: true }), }); - const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => { + const pushStreamToolProgress = async ( + line?: string, + options?: { toolName?: string; startImmediately?: boolean }, + ) => { if (!answerLane.stream) { return; } @@ -529,6 +532,19 @@ export const dispatchTelegramMessage = async ({ ); } } + if ( + options?.startImmediately && + streamToolProgressEnabled && + !streamToolProgressSuppressed && + normalized + ) { + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.startNow(); + if (alreadyStarted && progressDraftGate.hasStarted) { + await renderProgressDraft(); + } + return; + } const alreadyStarted = progressDraftGate.hasStarted; await progressDraftGate.noteWork(); if (alreadyStarted && progressDraftGate.hasStarted) { @@ -1205,7 +1221,7 @@ export const dispatchTelegramMessage = async ({ }, payload.detailMode ? { detailMode: payload.detailMode } : undefined, ), - { toolName }, + { toolName, startImmediately: true }, ); }, onItemEvent: async (payload) => {