diff --git a/CHANGELOG.md b/CHANGELOG.md index 177710ae2b8..02a5dd0aaaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -135,6 +135,7 @@ Docs: https://docs.openclaw.ai - Google Meet/Voice Call: wait longer before playing PIN-derived Twilio DTMF for Meet dial-in prompts and retire stale delegated phone sessions instead of reusing completed calls. - PDF/Codex: include extraction-fallback instructions for `openai-codex/*` PDF tool requests so Codex Responses receives its required system prompt. Fixes #77872. Thanks @anyech. - Onboard/channels: recover externalized channel plugins from stale `channels.` config by falling back to `ensureChannelSetupPluginInstalled` via the trusted catalog when the plugin is missing on disk, so leftover `appId`/token entries no longer dead-end onboard with " plugin not available." (#78328) Thanks @sliverp. +- Agents/Gateway: throttle and cap live exec command-output events so noisy tool runs cannot flood Gateway WebSocket clients or starve RPC handling. (#78645) Thanks @joshavant. - Codex/app-server: forward the OpenClaw workspace bootstrap block through Codex `developerInstructions` instead of `config.instructions`, so persona/style guidance reaches the behavior-shaping app-server lane. Fixes #77363. Thanks @lonexreb. - MS Teams: route proactive channel sends with stored thread roots through the configured threaded reply path instead of forcing every CLI/message-tool send into a new top-level post. Fixes #78298. Thanks @amknight. - CLI/infer: pass minimal instructions to local `openai-codex/*` model probes and surface provider error details when `infer model run` returns no text. Fixes #76464. Thanks @lilesjtu. diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts index 0580533a1f4..3e649130b7e 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts @@ -755,6 +755,53 @@ describe("handleToolExecutionEnd derived tool events", () => { ); }); + it("throttles high-frequency exec output update events", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1_000); + try { + const { ctx, onAgentEvent } = createTestContext(); + + await handleToolExecutionStart( + ctx as never, + { + type: "tool_execution_start", + toolName: "exec", + toolCallId: "tool-exec-throttled-output", + args: { command: "yes" }, + } as never, + ); + + const update = (aggregated: string) => + handleToolExecutionUpdate( + ctx as never, + { + type: "tool_execution_update", + toolName: "exec", + toolCallId: "tool-exec-throttled-output", + partialResult: { + details: { + aggregated, + }, + }, + } as never, + ); + + update("first"); + update("second"); + update("x".repeat(1024 * 1024)); + vi.setSystemTime(1_300); + update("third"); + + const commandOutputCalls = onAgentEvent.mock.calls + .map((call) => call[0] as { stream?: string; data?: { output?: string } }) + .filter((event) => event.stream === "command_output"); + + expect(commandOutputCalls.map((event) => event.data?.output)).toEqual(["first", "third"]); + } finally { + vi.useRealTimers(); + } + }); + it("emits command output events for exec results", async () => { const { ctx, onAgentEvent } = createTestContext(); @@ -1180,6 +1227,57 @@ describe("control UI credential redaction (issue #72283)", () => { expect(lastOutput?.data?.output).toContain("OPENROUTER_API_KEY="); }); + it("caps live exec command output events without changing the tool result shape", async () => { + const events: Array<{ stream?: string; data?: Record }> = []; + registerAgentEventListener((evt) => { + events.push(evt as never); + }); + const { ctx, onAgentEvent } = createTestContext(); + const aggregated = `head-${"x".repeat(90 * 1024)}-tail`; + + await handleToolExecutionStart( + ctx as never, + { + type: "tool_execution_start", + toolName: "exec", + toolCallId: "tool-exec-long-output", + args: { command: "yes" }, + } as never, + ); + + await handleToolExecutionEnd( + ctx as never, + { + type: "tool_execution_end", + toolName: "exec", + toolCallId: "tool-exec-long-output", + isError: false, + result: { + details: { + status: "completed", + aggregated, + exitCode: 0, + }, + }, + } as never, + ); + + const commandOutputCalls = onAgentEvent.mock.calls + .map((call) => call[0]) + .filter((arg: unknown) => (arg as { stream?: string })?.stream === "command_output"); + const lastOutput = commandOutputCalls.at(-1) as { data?: { output?: string } } | undefined; + expect(lastOutput?.data?.output).toContain("live command output truncated"); + expect(lastOutput?.data?.output).toContain("-tail"); + expect(lastOutput?.data?.output).not.toContain("head-"); + + const resultEvent = events.find( + (evt) => evt.stream === "tool" && (evt.data as { phase?: string })?.phase === "result", + ); + const result = resultEvent?.data?.result as { details?: { aggregated?: string } } | undefined; + expect(result?.details?.aggregated).toContain("live command output truncated"); + expect(result?.details?.aggregated).toContain("-tail"); + }); + it("redacts details-only results before emitting the tool result event", async () => { const events: Array<{ stream?: string; data?: Record }> = []; registerAgentEventListener((evt) => { diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 3df28deeb87..fb4d492ca89 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -20,6 +20,7 @@ import type { ExecApprovalDecision } from "../infra/exec-approvals.js"; import type { PluginHookAfterToolCallEvent } from "../plugins/types.js"; import { createLazyImportLoader } from "../shared/lazy-promise.js"; import { normalizeOptionalLowercaseString, readStringValue } from "../shared/string-coerce.js"; +import { truncateUtf16Safe } from "../utils.js"; import type { ApplyPatchSummary } from "./apply-patch.js"; import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import { parseExecApprovalResultText } from "./exec-approval-result.js"; @@ -87,11 +88,50 @@ type ToolStartRecord = { /** Track tool execution start data for after_tool_call hook. */ const toolStartData = new Map(); +const EXEC_OUTPUT_DELTA_MIN_INTERVAL_MS = 250; +const LIVE_COMMAND_OUTPUT_MAX_CHARS = 64 * 1024; +type ExecOutputDeltaEmission = { + emittedAt: number; +}; +const execOutputDeltaEmissions = new Map(); function buildToolStartKey(runId: string, toolCallId: string): string { return `${runId}:${toolCallId}`; } +function buildExecOutputDeltaKey(runId: string, toolCallId: string): string { + return `${runId}:${toolCallId}`; +} + +function shouldEmitExecOutputDelta(params: { + runId: string; + toolCallId: string; + output: string; + now?: number; +}): boolean { + const key = buildExecOutputDeltaKey(params.runId, params.toolCallId); + const now = params.now ?? Date.now(); + const previous = execOutputDeltaEmissions.get(key); + if (!previous) { + execOutputDeltaEmissions.set(key, { + emittedAt: now, + }); + return true; + } + const elapsedMs = now - previous.emittedAt; + if (elapsedMs < EXEC_OUTPUT_DELTA_MIN_INTERVAL_MS) { + return false; + } + execOutputDeltaEmissions.set(key, { + emittedAt: now, + }); + return true; +} + +function clearExecOutputDeltaEmission(runId: string, toolCallId: string): void { + execOutputDeltaEmissions.delete(buildExecOutputDeltaKey(runId, toolCallId)); +} + export function countActiveToolExecutions(runId: string): number { const prefix = `${runId}:`; let count = 0; @@ -189,6 +229,39 @@ function readExecToolDetails(result: unknown): ExecToolDetails | null { return details as ExecToolDetails; } +function readExecOutputText(result: unknown): string | undefined { + const details = readToolResultDetailsRecord(result); + if (typeof details?.aggregated === "string") { + return details.aggregated; + } + return extractToolResultText(result); +} + +function limitLiveCommandOutput(output: string): string { + if (output.length <= LIVE_COMMAND_OUTPUT_MAX_CHARS) { + return output; + } + const tail = truncateUtf16Safe( + output.slice(-LIVE_COMMAND_OUTPUT_MAX_CHARS), + LIVE_COMMAND_OUTPUT_MAX_CHARS, + ); + return `[openclaw: live command output truncated to last ${tail.length} of ${output.length} chars]\n${tail}`; +} + +function limitExecToolResultForLiveEvent(result: unknown): unknown { + const details = readToolResultDetailsRecord(result); + if (!details || typeof details.aggregated !== "string") { + return result; + } + return { + ...(result as Record), + details: { + ...details, + aggregated: limitLiveCommandOutput(details.aggregated), + }, + }; +} + function readApplyPatchSummary(result: unknown): ApplyPatchSummary | null { const details = readToolResultDetailsRecord(result); const summary = @@ -741,6 +814,19 @@ export function handleToolExecutionUpdate( const toolName = normalizeToolName(evt.toolName); const toolCallId = evt.toolCallId; const partial = evt.partialResult; + if (isExecToolName(toolName)) { + const output = readExecOutputText(partial); + if ( + output && + !shouldEmitExecOutputDelta({ + runId: ctx.params.runId, + toolCallId, + output, + }) + ) { + return; + } + } const sanitized = sanitizeToolResult(partial); emitAgentEvent({ runId: ctx.params.runId, @@ -772,11 +858,8 @@ export function handleToolExecutionUpdate( }, }); if (isExecToolName(toolName)) { - const execDetails = readExecToolDetails(sanitized); - const output = - execDetails && "aggregated" in execDetails - ? execDetails.aggregated - : extractToolResultText(sanitized); + const rawOutput = readExecOutputText(sanitized); + const output = rawOutput ? limitLiveCommandOutput(rawOutput) : undefined; const commandData: AgentItemEventData = { itemId: buildCommandItemId(toolCallId), phase: "update", @@ -829,9 +912,13 @@ export async function handleToolExecutionEnd( const result = evt.result; const isToolError = isError || isToolResultError(result); const sanitizedResult = sanitizeToolResult(result); + const liveEventResult = isExecToolName(toolName) + ? limitExecToolResultForLiveEvent(sanitizedResult) + : sanitizedResult; const toolStartKey = buildToolStartKey(runId, toolCallId); const startData = toolStartData.get(toolStartKey); toolStartData.delete(toolStartKey); + clearExecOutputDeltaEmission(runId, toolCallId); const callSummary = ctx.state.toolMetaById.get(toolCallId); const completedMutatingAction = !isToolError && Boolean(callSummary?.mutatingAction); const meta = callSummary?.meta; @@ -934,7 +1021,7 @@ export async function handleToolExecutionEnd( toolCallId, meta, isError: isToolError, - result: sanitizedResult, + result: liveEventResult, }, }); const endedAt = Date.now(); @@ -1027,10 +1114,11 @@ export async function handleToolExecutionEnd( }), }); } else { - const output = + const rawOutput = execDetails && "aggregated" in execDetails ? execDetails.aggregated : extractToolResultText(sanitizedResult); + const output = rawOutput ? limitLiveCommandOutput(rawOutput) : undefined; const commandStatus = execDetails?.status === "failed" || isToolError ? "failed" : "completed"; emitTrackedItemEvent(ctx, { @@ -1075,8 +1163,8 @@ export async function handleToolExecutionEnd( data: outputData, }); - if (typeof output === "string") { - const parsedApprovalResult = parseExecApprovalResultText(output); + if (typeof rawOutput === "string") { + const parsedApprovalResult = parseExecApprovalResultText(rawOutput); if (parsedApprovalResult.kind === "denied") { const approvalData: AgentApprovalEventData = { phase: "resolved",