mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-09 01:30:43 +00:00
fix(agents): bound live exec output events (#78645)
* fix gateway exec output starvation * docs changelog for exec output fix
This commit is contained in:
@@ -135,6 +135,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Google Meet/Voice Call: wait longer before playing PIN-derived Twilio DTMF for Meet dial-in prompts and retire stale delegated phone sessions instead of reusing completed calls.
|
||||
- PDF/Codex: include extraction-fallback instructions for `openai-codex/*` PDF tool requests so Codex Responses receives its required system prompt. Fixes #77872. Thanks @anyech.
|
||||
- Onboard/channels: recover externalized channel plugins from stale `channels.<id>` config by falling back to `ensureChannelSetupPluginInstalled` via the trusted catalog when the plugin is missing on disk, so leftover `appId`/token entries no longer dead-end onboard with "<channel> plugin not available." (#78328) Thanks @sliverp.
|
||||
- Agents/Gateway: throttle and cap live exec command-output events so noisy tool runs cannot flood Gateway WebSocket clients or starve RPC handling. (#78645) Thanks @joshavant.
|
||||
- Codex/app-server: forward the OpenClaw workspace bootstrap block through Codex `developerInstructions` instead of `config.instructions`, so persona/style guidance reaches the behavior-shaping app-server lane. Fixes #77363. Thanks @lonexreb.
|
||||
- MS Teams: route proactive channel sends with stored thread roots through the configured threaded reply path instead of forcing every CLI/message-tool send into a new top-level post. Fixes #78298. Thanks @amknight.
|
||||
- CLI/infer: pass minimal instructions to local `openai-codex/*` model probes and surface provider error details when `infer model run` returns no text. Fixes #76464. Thanks @lilesjtu.
|
||||
|
||||
@@ -755,6 +755,53 @@ describe("handleToolExecutionEnd derived tool events", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("throttles high-frequency exec output update events", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(1_000);
|
||||
try {
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
|
||||
await handleToolExecutionStart(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_start",
|
||||
toolName: "exec",
|
||||
toolCallId: "tool-exec-throttled-output",
|
||||
args: { command: "yes" },
|
||||
} as never,
|
||||
);
|
||||
|
||||
const update = (aggregated: string) =>
|
||||
handleToolExecutionUpdate(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_update",
|
||||
toolName: "exec",
|
||||
toolCallId: "tool-exec-throttled-output",
|
||||
partialResult: {
|
||||
details: {
|
||||
aggregated,
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
);
|
||||
|
||||
update("first");
|
||||
update("second");
|
||||
update("x".repeat(1024 * 1024));
|
||||
vi.setSystemTime(1_300);
|
||||
update("third");
|
||||
|
||||
const commandOutputCalls = onAgentEvent.mock.calls
|
||||
.map((call) => call[0] as { stream?: string; data?: { output?: string } })
|
||||
.filter((event) => event.stream === "command_output");
|
||||
|
||||
expect(commandOutputCalls.map((event) => event.data?.output)).toEqual(["first", "third"]);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("emits command output events for exec results", async () => {
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
|
||||
@@ -1180,6 +1227,57 @@ describe("control UI credential redaction (issue #72283)", () => {
|
||||
expect(lastOutput?.data?.output).toContain("OPENROUTER_API_KEY=");
|
||||
});
|
||||
|
||||
it("caps live exec command output events without changing the tool result shape", async () => {
|
||||
const events: Array<{ stream?: string; data?: Record<string, unknown> }> = [];
|
||||
registerAgentEventListener((evt) => {
|
||||
events.push(evt as never);
|
||||
});
|
||||
const { ctx, onAgentEvent } = createTestContext();
|
||||
const aggregated = `head-${"x".repeat(90 * 1024)}-tail`;
|
||||
|
||||
await handleToolExecutionStart(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_start",
|
||||
toolName: "exec",
|
||||
toolCallId: "tool-exec-long-output",
|
||||
args: { command: "yes" },
|
||||
} as never,
|
||||
);
|
||||
|
||||
await handleToolExecutionEnd(
|
||||
ctx as never,
|
||||
{
|
||||
type: "tool_execution_end",
|
||||
toolName: "exec",
|
||||
toolCallId: "tool-exec-long-output",
|
||||
isError: false,
|
||||
result: {
|
||||
details: {
|
||||
status: "completed",
|
||||
aggregated,
|
||||
exitCode: 0,
|
||||
},
|
||||
},
|
||||
} as never,
|
||||
);
|
||||
|
||||
const commandOutputCalls = onAgentEvent.mock.calls
|
||||
.map((call) => call[0])
|
||||
.filter((arg: unknown) => (arg as { stream?: string })?.stream === "command_output");
|
||||
const lastOutput = commandOutputCalls.at(-1) as { data?: { output?: string } } | undefined;
|
||||
expect(lastOutput?.data?.output).toContain("live command output truncated");
|
||||
expect(lastOutput?.data?.output).toContain("-tail");
|
||||
expect(lastOutput?.data?.output).not.toContain("head-");
|
||||
|
||||
const resultEvent = events.find(
|
||||
(evt) => evt.stream === "tool" && (evt.data as { phase?: string })?.phase === "result",
|
||||
);
|
||||
const result = resultEvent?.data?.result as { details?: { aggregated?: string } } | undefined;
|
||||
expect(result?.details?.aggregated).toContain("live command output truncated");
|
||||
expect(result?.details?.aggregated).toContain("-tail");
|
||||
});
|
||||
|
||||
it("redacts details-only results before emitting the tool result event", async () => {
|
||||
const events: Array<{ stream?: string; data?: Record<string, unknown> }> = [];
|
||||
registerAgentEventListener((evt) => {
|
||||
|
||||
@@ -20,6 +20,7 @@ import type { ExecApprovalDecision } from "../infra/exec-approvals.js";
|
||||
import type { PluginHookAfterToolCallEvent } from "../plugins/types.js";
|
||||
import { createLazyImportLoader } from "../shared/lazy-promise.js";
|
||||
import { normalizeOptionalLowercaseString, readStringValue } from "../shared/string-coerce.js";
|
||||
import { truncateUtf16Safe } from "../utils.js";
|
||||
import type { ApplyPatchSummary } from "./apply-patch.js";
|
||||
import type { ExecToolDetails } from "./bash-tools.exec-types.js";
|
||||
import { parseExecApprovalResultText } from "./exec-approval-result.js";
|
||||
@@ -87,11 +88,50 @@ type ToolStartRecord = {
|
||||
|
||||
/** Track tool execution start data for after_tool_call hook. */
|
||||
const toolStartData = new Map<string, ToolStartRecord>();
|
||||
const EXEC_OUTPUT_DELTA_MIN_INTERVAL_MS = 250;
|
||||
const LIVE_COMMAND_OUTPUT_MAX_CHARS = 64 * 1024;
|
||||
type ExecOutputDeltaEmission = {
|
||||
emittedAt: number;
|
||||
};
|
||||
const execOutputDeltaEmissions = new Map<string, ExecOutputDeltaEmission>();
|
||||
|
||||
function buildToolStartKey(runId: string, toolCallId: string): string {
|
||||
return `${runId}:${toolCallId}`;
|
||||
}
|
||||
|
||||
function buildExecOutputDeltaKey(runId: string, toolCallId: string): string {
|
||||
return `${runId}:${toolCallId}`;
|
||||
}
|
||||
|
||||
function shouldEmitExecOutputDelta(params: {
|
||||
runId: string;
|
||||
toolCallId: string;
|
||||
output: string;
|
||||
now?: number;
|
||||
}): boolean {
|
||||
const key = buildExecOutputDeltaKey(params.runId, params.toolCallId);
|
||||
const now = params.now ?? Date.now();
|
||||
const previous = execOutputDeltaEmissions.get(key);
|
||||
if (!previous) {
|
||||
execOutputDeltaEmissions.set(key, {
|
||||
emittedAt: now,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
const elapsedMs = now - previous.emittedAt;
|
||||
if (elapsedMs < EXEC_OUTPUT_DELTA_MIN_INTERVAL_MS) {
|
||||
return false;
|
||||
}
|
||||
execOutputDeltaEmissions.set(key, {
|
||||
emittedAt: now,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
function clearExecOutputDeltaEmission(runId: string, toolCallId: string): void {
|
||||
execOutputDeltaEmissions.delete(buildExecOutputDeltaKey(runId, toolCallId));
|
||||
}
|
||||
|
||||
export function countActiveToolExecutions(runId: string): number {
|
||||
const prefix = `${runId}:`;
|
||||
let count = 0;
|
||||
@@ -189,6 +229,39 @@ function readExecToolDetails(result: unknown): ExecToolDetails | null {
|
||||
return details as ExecToolDetails;
|
||||
}
|
||||
|
||||
function readExecOutputText(result: unknown): string | undefined {
|
||||
const details = readToolResultDetailsRecord(result);
|
||||
if (typeof details?.aggregated === "string") {
|
||||
return details.aggregated;
|
||||
}
|
||||
return extractToolResultText(result);
|
||||
}
|
||||
|
||||
function limitLiveCommandOutput(output: string): string {
|
||||
if (output.length <= LIVE_COMMAND_OUTPUT_MAX_CHARS) {
|
||||
return output;
|
||||
}
|
||||
const tail = truncateUtf16Safe(
|
||||
output.slice(-LIVE_COMMAND_OUTPUT_MAX_CHARS),
|
||||
LIVE_COMMAND_OUTPUT_MAX_CHARS,
|
||||
);
|
||||
return `[openclaw: live command output truncated to last ${tail.length} of ${output.length} chars]\n${tail}`;
|
||||
}
|
||||
|
||||
function limitExecToolResultForLiveEvent(result: unknown): unknown {
|
||||
const details = readToolResultDetailsRecord(result);
|
||||
if (!details || typeof details.aggregated !== "string") {
|
||||
return result;
|
||||
}
|
||||
return {
|
||||
...(result as Record<string, unknown>),
|
||||
details: {
|
||||
...details,
|
||||
aggregated: limitLiveCommandOutput(details.aggregated),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function readApplyPatchSummary(result: unknown): ApplyPatchSummary | null {
|
||||
const details = readToolResultDetailsRecord(result);
|
||||
const summary =
|
||||
@@ -741,6 +814,19 @@ export function handleToolExecutionUpdate(
|
||||
const toolName = normalizeToolName(evt.toolName);
|
||||
const toolCallId = evt.toolCallId;
|
||||
const partial = evt.partialResult;
|
||||
if (isExecToolName(toolName)) {
|
||||
const output = readExecOutputText(partial);
|
||||
if (
|
||||
output &&
|
||||
!shouldEmitExecOutputDelta({
|
||||
runId: ctx.params.runId,
|
||||
toolCallId,
|
||||
output,
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const sanitized = sanitizeToolResult(partial);
|
||||
emitAgentEvent({
|
||||
runId: ctx.params.runId,
|
||||
@@ -772,11 +858,8 @@ export function handleToolExecutionUpdate(
|
||||
},
|
||||
});
|
||||
if (isExecToolName(toolName)) {
|
||||
const execDetails = readExecToolDetails(sanitized);
|
||||
const output =
|
||||
execDetails && "aggregated" in execDetails
|
||||
? execDetails.aggregated
|
||||
: extractToolResultText(sanitized);
|
||||
const rawOutput = readExecOutputText(sanitized);
|
||||
const output = rawOutput ? limitLiveCommandOutput(rawOutput) : undefined;
|
||||
const commandData: AgentItemEventData = {
|
||||
itemId: buildCommandItemId(toolCallId),
|
||||
phase: "update",
|
||||
@@ -829,9 +912,13 @@ export async function handleToolExecutionEnd(
|
||||
const result = evt.result;
|
||||
const isToolError = isError || isToolResultError(result);
|
||||
const sanitizedResult = sanitizeToolResult(result);
|
||||
const liveEventResult = isExecToolName(toolName)
|
||||
? limitExecToolResultForLiveEvent(sanitizedResult)
|
||||
: sanitizedResult;
|
||||
const toolStartKey = buildToolStartKey(runId, toolCallId);
|
||||
const startData = toolStartData.get(toolStartKey);
|
||||
toolStartData.delete(toolStartKey);
|
||||
clearExecOutputDeltaEmission(runId, toolCallId);
|
||||
const callSummary = ctx.state.toolMetaById.get(toolCallId);
|
||||
const completedMutatingAction = !isToolError && Boolean(callSummary?.mutatingAction);
|
||||
const meta = callSummary?.meta;
|
||||
@@ -934,7 +1021,7 @@ export async function handleToolExecutionEnd(
|
||||
toolCallId,
|
||||
meta,
|
||||
isError: isToolError,
|
||||
result: sanitizedResult,
|
||||
result: liveEventResult,
|
||||
},
|
||||
});
|
||||
const endedAt = Date.now();
|
||||
@@ -1027,10 +1114,11 @@ export async function handleToolExecutionEnd(
|
||||
}),
|
||||
});
|
||||
} else {
|
||||
const output =
|
||||
const rawOutput =
|
||||
execDetails && "aggregated" in execDetails
|
||||
? execDetails.aggregated
|
||||
: extractToolResultText(sanitizedResult);
|
||||
const output = rawOutput ? limitLiveCommandOutput(rawOutput) : undefined;
|
||||
const commandStatus =
|
||||
execDetails?.status === "failed" || isToolError ? "failed" : "completed";
|
||||
emitTrackedItemEvent(ctx, {
|
||||
@@ -1075,8 +1163,8 @@ export async function handleToolExecutionEnd(
|
||||
data: outputData,
|
||||
});
|
||||
|
||||
if (typeof output === "string") {
|
||||
const parsedApprovalResult = parseExecApprovalResultText(output);
|
||||
if (typeof rawOutput === "string") {
|
||||
const parsedApprovalResult = parseExecApprovalResultText(rawOutput);
|
||||
if (parsedApprovalResult.kind === "denied") {
|
||||
const approvalData: AgentApprovalEventData = {
|
||||
phase: "resolved",
|
||||
|
||||
Reference in New Issue
Block a user