Show Codex tool progress in channel drafts

This commit is contained in:
Kelaw - Keshav's Agent
2026-05-05 22:15:42 +05:30
committed by Peter Steinberger
parent 13504f693d
commit e483115131
7 changed files with 507 additions and 4 deletions

View File

@@ -660,6 +660,112 @@ describe("CodexAppServerEventProjector", () => {
expect(result.itemLifecycle).toMatchObject({ compactionCount: 1 });
});
it("synthesizes normalized tool progress for Codex-native tool items", async () => {
const onAgentEvent = vi.fn();
const projector = await createProjector({ ...(await createParams()), onAgentEvent });
await projector.handleNotification(
forCurrentTurn("item/started", {
item: {
type: "commandExecution",
id: "cmd-1",
command: "pnpm test extensions/codex",
cwd: "/workspace",
processId: null,
source: "agent",
status: "inProgress",
commandActions: [],
aggregatedOutput: null,
exitCode: null,
durationMs: null,
},
}),
);
await projector.handleNotification(
forCurrentTurn("item/completed", {
item: {
type: "commandExecution",
id: "cmd-1",
command: "pnpm test extensions/codex",
cwd: "/workspace",
processId: null,
source: "agent",
status: "completed",
commandActions: [],
aggregatedOutput: "ok",
exitCode: 0,
durationMs: 42,
},
}),
);
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "item",
data: expect.objectContaining({
phase: "start",
kind: "command",
name: "bash",
itemId: "cmd-1",
}),
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
data: expect.objectContaining({
phase: "start",
name: "bash",
itemId: "cmd-1",
toolCallId: "cmd-1",
args: { command: "pnpm test extensions/codex", cwd: "/workspace" },
}),
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "tool",
data: expect.objectContaining({
phase: "result",
name: "bash",
itemId: "cmd-1",
toolCallId: "cmd-1",
status: "completed",
isError: false,
result: expect.objectContaining({ exitCode: 0, durationMs: 42 }),
}),
});
});
it("leaves Codex dynamic tool item progress to item/tool/call normalization", async () => {
const onAgentEvent = vi.fn();
const projector = await createProjector({ ...(await createParams()), onAgentEvent });
await projector.handleNotification(
forCurrentTurn("item/started", {
item: {
type: "dynamicToolCall",
id: "call-1",
namespace: null,
tool: "message",
arguments: { action: "send" },
status: "inProgress",
contentItems: null,
success: null,
durationMs: null,
},
}),
);
expect(onAgentEvent).toHaveBeenCalledWith(
expect.objectContaining({
stream: "item",
data: expect.objectContaining({ phase: "start", kind: "tool", name: "message" }),
}),
);
expect(onAgentEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({ phase: "start", name: "message" }),
}),
);
});
it("emits verbose tool summaries through onToolResult", async () => {
const onToolResult = vi.fn();
const projector = await createProjector({

View File

@@ -30,6 +30,11 @@ import {
import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js";
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
import {
resolveCodexToolProgressDetailMode,
sanitizeCodexAgentEventRecord,
sanitizeCodexToolArguments,
} from "./tool-progress-normalization.js";
import { attachCodexMirrorIdentity } from "./transcript-mirror.js";
export type CodexAppServerToolTelemetry = {
@@ -396,6 +401,7 @@ export class CodexAppServerEventProjector {
});
}
this.emitStandardItemEvent({ phase: "start", item });
this.emitNormalizedToolItemEvent({ phase: "start", item });
this.emitToolResultSummary(item);
this.emitAgentEvent({
stream: "codex_app_server.item",
@@ -449,6 +455,7 @@ export class CodexAppServerEventProjector {
}
this.recordToolMeta(item);
this.emitStandardItemEvent({ phase: "end", item });
this.emitNormalizedToolItemEvent({ phase: "result", item });
this.emitToolResultSummary(item);
this.emitToolResultOutput(item);
this.emitAgentEvent({
@@ -670,6 +677,41 @@ export class CodexAppServerEventProjector {
});
}
private emitNormalizedToolItemEvent(params: {
phase: "start" | "result";
item: CodexThreadItem | undefined;
}): void {
const { item } = params;
if (!item || !shouldSynthesizeToolProgressForItem(item)) {
return;
}
const name = itemName(item);
if (!name) {
return;
}
const meta = itemMeta(item, this.toolProgressDetailMode());
const args = params.phase === "start" ? itemToolArgs(item) : undefined;
const status = params.phase === "result" ? itemStatus(item) : "running";
this.emitAgentEvent({
stream: "tool",
data: {
phase: params.phase,
name,
itemId: item.id,
toolCallId: item.id,
...(meta ? { meta } : {}),
...(args ? { args } : {}),
...(params.phase === "result"
? {
status,
isError: status === "failed",
...itemToolResult(item),
}
: {}),
},
});
}
private emitToolResultSummary(item: CodexThreadItem | undefined): void {
if (!item || !this.params.onToolResult || !this.shouldEmitToolResult()) {
return;
@@ -743,7 +785,7 @@ export class CodexAppServerEventProjector {
}
private toolProgressDetailMode(): ToolProgressDetailMode {
return this.params.toolProgressDetail === "raw" ? "raw" : "explain";
return resolveCodexToolProgressDetailMode(this.params.toolProgressDetail);
}
private recordToolMeta(item: CodexThreadItem | undefined): void {
@@ -1105,6 +1147,73 @@ function itemName(item: CodexThreadItem): string | undefined {
return undefined;
}
function shouldSynthesizeToolProgressForItem(item: CodexThreadItem): boolean {
switch (item.type) {
case "commandExecution":
case "fileChange":
case "webSearch":
case "mcpToolCall":
return true;
// Dynamic OpenClaw tool requests are emitted at the item/tool/call request
// boundary in run-attempt.ts. Re-emitting them from item notifications can
// duplicate start/result events when the app-server sends both signals.
case "dynamicToolCall":
return false;
default:
return false;
}
}
function itemToolArgs(item: CodexThreadItem): Record<string, unknown> | undefined {
if (item.type === "commandExecution") {
return sanitizeCodexAgentEventRecord({
command: item.command,
...(typeof item.cwd === "string" ? { cwd: item.cwd } : {}),
});
}
if (item.type === "webSearch" && typeof item.query === "string") {
return sanitizeCodexAgentEventRecord({ query: item.query });
}
if (item.type === "mcpToolCall") {
return sanitizeCodexToolArguments(item.arguments);
}
return undefined;
}
function itemToolResult(item: CodexThreadItem): { result?: Record<string, unknown> } {
if (item.type === "commandExecution") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
exitCode: item.exitCode,
durationMs: item.durationMs,
}),
};
}
if (item.type === "fileChange") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
changes: item.changes.map((change) => ({ path: change.path, kind: change.kind })),
}),
};
}
if (item.type === "mcpToolCall") {
return {
result: sanitizeCodexAgentEventRecord({
status: item.status,
durationMs: item.durationMs,
...(item.error ? { error: item.error } : {}),
...(item.result ? { result: item.result } : {}),
}),
};
}
if (item.type === "webSearch") {
return { result: sanitizeCodexAgentEventRecord({ status: "completed" }) };
}
return {};
}
function itemMeta(
item: CodexThreadItem,
detailMode: ToolProgressDetailMode = "explain",

View File

@@ -183,6 +183,7 @@ function createAppServerHarness(
) {
const requests: Array<{ method: string; params: unknown }> = [];
let notify: (notification: CodexServerNotification) => Promise<void> = async () => undefined;
let handleServerRequest: AppServerRequestHandler | undefined;
const request = vi.fn(async (method: string, params?: unknown) => {
requests.push({ method, params });
return requestImpl(method, params);
@@ -197,11 +198,22 @@ function createAppServerHarness(
notify = handler;
return () => undefined;
},
addRequestHandler: () => () => undefined,
addRequestHandler: (handler: AppServerRequestHandler) => {
handleServerRequest = handler;
return () => undefined;
},
} as never;
},
);
const waitForServerRequestHandler = async () => {
await vi.waitFor(() => expect(handleServerRequest).toBeTypeOf("function"), {
interval: 1,
timeout: 30_000,
});
return handleServerRequest!;
};
return {
request,
requests,
@@ -223,6 +235,11 @@ function createAppServerHarness(
async notify(notification: CodexServerNotification) {
await notify(notification);
},
waitForServerRequestHandler,
async handleServerRequest(request: Parameters<AppServerRequestHandler>[0]) {
const handler = await waitForServerRequestHandler();
return handler(request);
},
async completeTurn(params: { threadId: string; turnId: string }) {
await notify({
method: "turn/completed",
@@ -349,6 +366,12 @@ function createNamedDynamicTool(
};
}
type AppServerRequestHandler = (request: {
id: string | number;
method: string;
params?: unknown;
}) => Promise<unknown>;
function extractRelayIdFromThreadRequest(params: unknown): string {
const command = (
params as {
@@ -656,6 +679,93 @@ describe("runCodexAppServerAttempt", () => {
});
});
it("emits normalized tool progress around app-server dynamic tool requests", async () => {
const harness = createStartedThreadHarness();
const onRunAgentEvent = vi.fn();
const globalAgentEvents: AgentEventPayload[] = [];
onAgentEvent((event) => globalAgentEvents.push(event));
const params = createParams(
path.join(tempDir, "session.jsonl"),
path.join(tempDir, "workspace"),
);
params.onAgentEvent = onRunAgentEvent;
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
await expect(
harness.handleServerRequest({
id: "request-tool-1",
method: "item/tool/call",
params: {
threadId: "thread-1",
turnId: "turn-1",
callId: "call-1",
namespace: null,
tool: "message",
arguments: {
action: "send",
token: "plain-secret-value-12345",
text: "hello",
},
},
}),
).resolves.toMatchObject({
success: false,
contentItems: [
{
type: "inputText",
text: expect.stringMatching(
/^(Unknown OpenClaw tool: message|Action send requires a target\.)$/u,
),
},
],
});
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
const agentEvents = onRunAgentEvent.mock.calls.map(([event]) => event);
expect(agentEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({
phase: "start",
name: "message",
toolCallId: "call-1",
args: expect.objectContaining({
action: "send",
token: "plain-…2345",
text: "hello",
}),
}),
}),
expect.objectContaining({
stream: "tool",
data: expect.objectContaining({
phase: "result",
name: "message",
toolCallId: "call-1",
isError: true,
result: expect.objectContaining({ success: false }),
}),
}),
]),
);
expect(JSON.stringify(agentEvents)).not.toContain("plain-secret-value-12345");
expect(globalAgentEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
runId: "run-1",
sessionKey: "agent:main:session-1",
stream: "tool",
data: expect.objectContaining({ phase: "start", name: "message" }),
}),
]),
);
});
it("releases the session when Codex never completes after a dynamic tool response", async () => {
let handleRequest:
| ((request: { id: string; method: string; params?: unknown }) => Promise<unknown>)

View File

@@ -98,6 +98,12 @@ import {
codexDynamicToolsFingerprint,
startOrResumeThread,
} from "./thread-lifecycle.js";
import {
inferCodexDynamicToolMeta,
resolveCodexToolProgressDetailMode,
sanitizeCodexToolArguments,
sanitizeCodexToolResponse,
} from "./tool-progress-normalization.js";
import {
createCodexTrajectoryRecorder,
normalizeCodexTrajectoryError,
@@ -973,6 +979,19 @@ export async function runCodexAppServerAttempt(
name: call.tool,
arguments: call.arguments,
});
const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail);
const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode);
const toolArgs = sanitizeCodexToolArguments(call.arguments);
emitCodexAppServerEvent(params, {
stream: "tool",
data: {
phase: "start",
name: call.tool,
toolCallId: call.callId,
...(toolMeta ? { meta: toolMeta } : {}),
...(toolArgs ? { args: toolArgs } : {}),
},
});
const response = await handleDynamicToolCallWithTimeout({
call,
toolBridge,
@@ -996,6 +1015,17 @@ export async function runCodexAppServerAttempt(
success: response.success,
contentItems: response.contentItems,
});
emitCodexAppServerEvent(params, {
stream: "tool",
data: {
phase: "result",
name: call.tool,
toolCallId: call.callId,
...(toolMeta ? { meta: toolMeta } : {}),
isError: !response.success,
result: sanitizeCodexToolResponse(response),
},
});
return response as JsonValue;
} finally {
activeAppServerTurnRequests = Math.max(0, activeAppServerTurnRequests - 1);

View File

@@ -0,0 +1,77 @@
import {
inferToolMetaFromArgs,
type EmbeddedRunAttemptParams,
type ToolProgressDetailMode,
} from "openclaw/plugin-sdk/agent-harness-runtime";
import { redactSensitiveFieldValue, redactToolPayloadText } from "openclaw/plugin-sdk/text-runtime";
import {
isJsonObject,
type CodexDynamicToolCallParams,
type CodexDynamicToolCallResponse,
type JsonValue,
} from "./protocol.js";
export function resolveCodexToolProgressDetailMode(
value: EmbeddedRunAttemptParams["toolProgressDetail"],
): ToolProgressDetailMode {
return value === "raw" ? "raw" : "explain";
}
export function sanitizeCodexAgentEventValue(
value: unknown,
seen = new WeakSet<object>(),
): unknown {
if (typeof value === "string") {
return redactToolPayloadText(value);
}
if (Array.isArray(value)) {
if (seen.has(value)) {
return "[Circular]";
}
seen.add(value);
return value.map((entry) => sanitizeCodexAgentEventValue(entry, seen));
}
if (value && typeof value === "object") {
if (seen.has(value)) {
return "[Circular]";
}
seen.add(value);
const out: Record<string, unknown> = {};
for (const [key, child] of Object.entries(value as Record<string, unknown>)) {
out[key] =
typeof child === "string"
? redactSensitiveFieldValue(key, child)
: sanitizeCodexAgentEventValue(child, seen);
}
return out;
}
return value;
}
export function sanitizeCodexAgentEventRecord(
value: Record<string, unknown>,
): Record<string, unknown> {
return sanitizeCodexAgentEventValue(value) as Record<string, unknown>;
}
export function sanitizeCodexToolArguments(
value: JsonValue | undefined,
): Record<string, unknown> | undefined {
if (!isJsonObject(value)) {
return undefined;
}
return sanitizeCodexAgentEventRecord(value);
}
export function sanitizeCodexToolResponse(
response: CodexDynamicToolCallResponse,
): Record<string, unknown> {
return sanitizeCodexAgentEventRecord(response as unknown as Record<string, unknown>);
}
export function inferCodexDynamicToolMeta(
call: Pick<CodexDynamicToolCallParams, "tool" | "arguments">,
detailMode: ToolProgressDetailMode,
): string | undefined {
return inferToolMetaFromArgs(call.tool, call.arguments, { detailMode });
}

View File

@@ -998,6 +998,61 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("shows Telegram progress drafts immediately for explicit tool starts", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
return { queuedFinal: false };
});
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(expect.stringMatching(/^Shelling\n`🛠️ Exec`$/));
expect(draftStream.flush).toHaveBeenCalled();
});
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
const draftStream = createSequencedDraftStream(2001);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onReplyStart?.();
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onItemEvent?.({ kind: "search", progressText: "docs lookup" });
await replyOptions?.onItemEvent?.({ progressText: "tests passed" });
await replyOptions?.onAssistantMessageStart?.();
await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
});
expect(draftStream.update).toHaveBeenCalledWith(
expect.stringMatching(/^Shelling\n`🔎 Web Search: docs lookup`\n• `tests passed`$/),
);
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.materialize).not.toHaveBeenCalled();
expect(editMessageTelegram).toHaveBeenCalledWith(
123,
2001,
"Final after tool",
expect.any(Object),
);
expect(draftStream.clear).not.toHaveBeenCalled();
});
it("falls back to normal send for error payloads and clears the pending stream", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {

View File

@@ -490,7 +490,10 @@ export const dispatchTelegramMessage = async ({
const progressDraftGate = createChannelProgressDraftGate({
onStart: () => renderProgressDraft({ flush: true }),
});
const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => {
const pushStreamToolProgress = async (
line?: string,
options?: { toolName?: string; startImmediately?: boolean },
) => {
if (!answerLane.stream) {
return;
}
@@ -529,6 +532,19 @@ export const dispatchTelegramMessage = async ({
);
}
}
if (
options?.startImmediately &&
streamToolProgressEnabled &&
!streamToolProgressSuppressed &&
normalized
) {
const alreadyStarted = progressDraftGate.hasStarted;
await progressDraftGate.startNow();
if (alreadyStarted && progressDraftGate.hasStarted) {
await renderProgressDraft();
}
return;
}
const alreadyStarted = progressDraftGate.hasStarted;
await progressDraftGate.noteWork();
if (alreadyStarted && progressDraftGate.hasStarted) {
@@ -1205,7 +1221,7 @@ export const dispatchTelegramMessage = async ({
},
payload.detailMode ? { detailMode: payload.detailMode } : undefined,
),
{ toolName },
{ toolName, startImmediately: true },
);
},
onItemEvent: async (payload) => {