From 778902103db2124d56969c77130f82d850627973 Mon Sep 17 00:00:00 2001 From: Dallin Romney Date: Fri, 1 May 2026 12:24:13 +0800 Subject: [PATCH] fix(agents): release embedded-run scope on hung provider abort + heap-leak harness (#75008) * fix(agents): extract abortable from runEmbeddedAttempt to release captured run scope on hung provider abort (#74182) * test(agents): drop synthetic WeakRef retention test for abortable * feat(scripts): add embedded-run-abort-leak harness for runtime closure-leak validation * feat(scripts): add production mode to leak harness importing real abortable * docs(changelog): add #74182 fix entry for embedded-run abort closure release --- .../skills/openclaw-test-heap-leaks/SKILL.md | 34 ++ CHANGELOG.md | 1 + package.json | 1 + scripts/embedded-run-abort-leak.ts | 338 ++++++++++++++++++ .../pi-embedded-runner/run/abortable.test.ts | 29 ++ .../pi-embedded-runner/run/abortable.ts | 38 ++ src/agents/pi-embedded-runner/run/attempt.ts | 39 +- 7 files changed, 444 insertions(+), 36 deletions(-) create mode 100644 scripts/embedded-run-abort-leak.ts create mode 100644 src/agents/pi-embedded-runner/run/abortable.test.ts create mode 100644 src/agents/pi-embedded-runner/run/abortable.ts diff --git a/.agents/skills/openclaw-test-heap-leaks/SKILL.md b/.agents/skills/openclaw-test-heap-leaks/SKILL.md index 38c12383d19..9d83d7a889c 100644 --- a/.agents/skills/openclaw-test-heap-leaks/SKILL.md +++ b/.agents/skills/openclaw-test-heap-leaks/SKILL.md @@ -7,6 +7,8 @@ description: Investigate OpenClaw pnpm test memory growth, Vitest OOMs, RSS spik Use this skill for test-memory investigations. Do not guess from RSS alone when heap snapshots are available. Treat snapshot-name deltas as triage evidence, not proof, until retainers or dominators support the call. +For **runtime fixes** (e.g., closure leaks in long-running services like the gateway), see [Validating runtime fixes](#validating-runtime-fixes-not-test-memory) below — that uses a dedicated harness, not the test-parallel snapshot machinery. + ## Workflow 1. Reproduce the failing shape first. @@ -63,6 +65,38 @@ Use this skill for test-memory investigations. Do not guess from RSS alone when Read the top positive deltas first. Large positive growth in module-transform artifacts suggests lane isolation; large positive growth in runtime objects suggests a real leak. If the names alone do not settle it, open the same snapshot pair in DevTools and inspect retainers/dominators for the top rows before declaring root cause. +## Validating runtime fixes (not test-memory) + +The workflow above is for diagnosing Vitest worker memory growth. For +validating that a runtime/closure fix actually releases captured state, use the +dedicated harness: + +- `pnpm leak:embedded-run` — runs `scripts/embedded-run-abort-leak.ts`. Loops N + aborted runs in a function-shaped scope mimicking `runEmbeddedAttempt`, + writes heap snapshots, and reports a PASS/FAIL verdict on retention growth + using `FinalizationRegistry` for tracked-instance counting plus RSS delta. + +Modes: + +- `closure-extracted` (default) — production fix shape (helper at module scope). +- `closure-inline` — pre-fix shape (closure inside the runner scope). Use as a + sensitivity check: if it passes you've broken the harness, not fixed a bug. +- `synthetic-leak` — deliberately retains via a module-level bucket. Use to + confirm the harness can detect leaks before trusting a PASS on a real fix. + +Snapshots land in `.tmp/embedded-run-abort-leak/`. Diff with the same script +as above: + +``` +node .agents/skills/openclaw-test-heap-leaks/scripts/heapsnapshot-delta.mjs \ + .tmp/embedded-run-abort-leak/baseline-*.heapsnapshot \ + .tmp/embedded-run-abort-leak/batch-N-*.heapsnapshot --top 30 +``` + +When fixing a different runtime leak, add a new harness alongside this one +rather than retrofitting it. The fixture function should mimic the lexical +scope of the function where the leak lives, not be a generic abort-loop. + ## Output Expectations When using this skill, report: diff --git a/CHANGELOG.md b/CHANGELOG.md index a8c745db4f7..9c3c9ce669d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007. - Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041. - Agents/commitments: keep inferred follow-ups internal when heartbeat target is none, strip raw source text from stored commitments, disable tools during due-commitment heartbeat turns, bound hidden extraction queue growth, expire stale commitments, and add QA/Docker safety coverage. Thanks @vignesh07. - Telegram/agents: keep typing indicators and optional generation tools off the reply critical path, so fresh Telegram replies no longer stall while provider catalogs and media models load. (#75360) Thanks @obviyus. diff --git a/package.json b/package.json index 871f3688fe0..4737f482cf3 100644 --- a/package.json +++ b/package.json @@ -1357,6 +1357,7 @@ "ios:version:check": "node --import tsx scripts/ios-sync-versioning.ts --check", "ios:version:pin": "node --import tsx scripts/ios-pin-version.ts", "ios:version:sync": "node --import tsx scripts/ios-sync-versioning.ts --write", + "leak:embedded-run": "node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts", "lint": "node scripts/run-oxlint-shards.mjs", "lint:agent:ingress-owner": "node scripts/check-ingress-agent-owner-context.mjs", "lint:all": "node scripts/run-oxlint.mjs", diff --git a/scripts/embedded-run-abort-leak.ts b/scripts/embedded-run-abort-leak.ts new file mode 100644 index 00000000000..d97ebead535 --- /dev/null +++ b/scripts/embedded-run-abort-leak.ts @@ -0,0 +1,338 @@ +/** + * Heap-leak harness for the runEmbeddedAttempt abort path. Loops aborted runs + * in a function-shaped scope that mimics the runner, snapshots the heap, and + * computes a PASS/FAIL verdict from RSS delta + tracked-instance retention. + * + * Usage: + * node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts \ + * --mode production --iters 50 --batches 5 + * + * Modes: + * production (default): imports the real abortable from src; PASS proves the fix works. + * closure-extracted: self-contained module-scope helper (mirrors production shape). + * closure-inline: pre-fix shape (closure inside runner scope). + * synthetic-leak: deliberately retains via module-level bucket + * (sanity check that the harness detects leaks). + * + * Exit code: 0 if PASS, 1 if FAIL (leak detected). + */ +import * as fs from "node:fs"; +import * as path from "node:path"; +import * as v8 from "node:v8"; +import { abortable as productionAbortable } from "../src/agents/pi-embedded-runner/run/abortable.js"; + +type Mode = "production" | "closure-extracted" | "closure-inline" | "synthetic-leak"; + +type Options = { + iters: number; + batches: number; + snapDir: string; + mode: Mode; + maxRssGrowthMb: number; + maxTrackedRetention: number; + scopeBytes: number; + quiet: boolean; +}; + +function parseArgs(argv: string[]): Options { + const opts: Options = { + iters: 50, + batches: 5, + snapDir: ".tmp/embedded-run-abort-leak", + mode: "production", + maxRssGrowthMb: 64, + maxTrackedRetention: 16, + scopeBytes: 2_000_000, + quiet: false, + }; + for (let i = 0; i < argv.length; i += 1) { + const arg = argv[i]; + const next = argv[i + 1]; + switch (arg) { + case "--iters": + opts.iters = Number.parseInt(next ?? "", 10); + i += 1; + break; + case "--batches": + opts.batches = Number.parseInt(next ?? "", 10); + i += 1; + break; + case "--snap-dir": + opts.snapDir = next ?? opts.snapDir; + i += 1; + break; + case "--mode": + if ( + next === "production" || + next === "closure-extracted" || + next === "closure-inline" || + next === "synthetic-leak" + ) { + opts.mode = next; + } else { + fail( + `--mode must be one of: production, closure-extracted, closure-inline, synthetic-leak`, + ); + } + i += 1; + break; + case "--max-rss-growth-mb": + opts.maxRssGrowthMb = Number.parseInt(next ?? "", 10); + i += 1; + break; + case "--max-tracked-retention": + opts.maxTrackedRetention = Number.parseInt(next ?? "", 10); + i += 1; + break; + case "--scope-bytes": + opts.scopeBytes = Number.parseInt(next ?? "", 10); + i += 1; + break; + case "--quiet": + opts.quiet = true; + break; + case "--help": + case "-h": + printUsage(); + process.exit(0); + break; + default: + fail(`Unknown arg: ${arg}`); + } + } + if (!Number.isFinite(opts.iters) || opts.iters <= 0) { + fail("--iters must be > 0"); + } + if (!Number.isFinite(opts.batches) || opts.batches <= 0) { + fail("--batches must be > 0"); + } + return opts; +} + +function printUsage(): void { + process.stderr.write( + [ + "Usage: node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts [flags]", + " --mode ", + " --iters N iterations per batch (default 50)", + " --batches B batches between snapshots (default 5)", + " --snap-dir DIR heap snapshot output dir (default .tmp/embedded-run-abort-leak)", + " --scope-bytes N simulated run-scope payload size (default 2_000_000)", + " --max-rss-growth-mb PASS threshold for RSS growth (default 64)", + " --max-tracked-retention PASS threshold for tracked finalizer retention (default 16)", + " --quiet only print final verdict", + "", + ].join("\n"), + ); +} + +function fail(msg: string): never { + process.stderr.write(`error: ${msg}\n`); + process.exit(2); +} + +const KEEP_ALIVE: Array> = []; +const SYNTHETIC_LEAK_BUCKET: Uint8Array[] = []; +const FINALIZED = { count: 0 }; +const finalizer = new FinalizationRegistry(() => { + FINALIZED.count += 1; +}); + +function abortableExtracted(signal: AbortSignal, promise: Promise): Promise { + if (signal.aborted) { + return Promise.reject(new Error("aborted")); + } + return new Promise((resolve, reject) => { + const onAbort = () => { + signal.removeEventListener("abort", onAbort); + reject(new Error("aborted")); + }; + signal.addEventListener("abort", onAbort, { once: true }); + promise.then( + (value) => { + signal.removeEventListener("abort", onAbort); + resolve(value); + }, + (err) => { + signal.removeEventListener("abort", onAbort); + reject(err); + }, + ); + }); +} + +function runOnce(mode: Mode, scopeBytes: number, iter: number): void { + const transcript = new Uint8Array(scopeBytes); + const toolMetas = [{ data: new Uint8Array(scopeBytes / 4) }]; + const subscription = { + onPartialReply: (_text: string) => { + void transcript; + }, + onAssistantMessageStart: () => { + void toolMetas; + }, + }; + finalizer.register(transcript, iter); + + const ac = new AbortController(); + const neverSettling = new Promise(() => {}); + KEEP_ALIVE.push(neverSettling); + + if (mode === "production") { + void productionAbortable(ac.signal, neverSettling).catch(() => {}); + } else if (mode === "closure-extracted") { + void abortableExtracted(ac.signal, neverSettling).catch(() => {}); + } else if (mode === "closure-inline") { + const wrapped = new Promise((resolve, reject) => { + const onAbort = () => reject(new Error("aborted")); + ac.signal.addEventListener("abort", onAbort, { once: true }); + neverSettling.then( + (v) => { + void transcript; + void toolMetas; + void subscription; + resolve(v); + }, + (e) => { + void transcript; + void toolMetas; + void subscription; + reject(e); + }, + ); + }); + void wrapped.catch(() => {}); + } else { + SYNTHETIC_LEAK_BUCKET.push(transcript); + } + ac.abort(); + + void transcript.length; + void toolMetas.length; + void subscription.onPartialReply; +} + +async function settleAndGc(): Promise { + for (let i = 0; i < 4; i += 1) { + await new Promise((r) => setImmediate(r)); + globalThis.gc?.(); + } + await new Promise((r) => setTimeout(r, 100)); + globalThis.gc?.(); +} + +type SampleRow = { + label: string; + rssBytes: number; + heapUsedBytes: number; + totalIters: number; + trackedFinalized: number; + snapshotPath: string; +}; + +function takeSnapshot(snapDir: string, label: string): string { + fs.mkdirSync(snapDir, { recursive: true }); + const filename = path.join(snapDir, `${label}-${process.pid}-${Date.now()}.heapsnapshot`); + v8.writeHeapSnapshot(filename); + return filename; +} + +function fmtBytes(bytes: number): string { + return `${(bytes / 1024 / 1024).toFixed(1)}MB`; +} + +async function main(): Promise { + const opts = parseArgs(process.argv.slice(2)); + if (typeof globalThis.gc !== "function") { + fail("--expose-gc is required (run with: node --expose-gc ...)"); + } + + const startedAt = Date.now(); + const samples: SampleRow[] = []; + + if (!opts.quiet) { + process.stdout.write( + `[harness] mode=${opts.mode} iters=${opts.iters} batches=${opts.batches} ` + + `scope=${fmtBytes(opts.scopeBytes)} pid=${process.pid}\n`, + ); + } + + await settleAndGc(); + const baselinePath = takeSnapshot(opts.snapDir, "baseline"); + const baseline: SampleRow = { + label: "baseline", + rssBytes: process.memoryUsage().rss, + heapUsedBytes: process.memoryUsage().heapUsed, + totalIters: 0, + trackedFinalized: FINALIZED.count, + snapshotPath: baselinePath, + }; + samples.push(baseline); + if (!opts.quiet) { + process.stdout.write( + ` baseline rss=${fmtBytes(baseline.rssBytes)} heap=${fmtBytes(baseline.heapUsedBytes)}\n`, + ); + } + + let totalIters = 0; + for (let b = 0; b < opts.batches; b += 1) { + for (let i = 0; i < opts.iters; i += 1) { + runOnce(opts.mode, opts.scopeBytes, totalIters); + totalIters += 1; + } + await settleAndGc(); + const snapshotPath = takeSnapshot(opts.snapDir, `batch-${b}`); + const row: SampleRow = { + label: `batch-${b}`, + rssBytes: process.memoryUsage().rss, + heapUsedBytes: process.memoryUsage().heapUsed, + totalIters, + trackedFinalized: FINALIZED.count, + snapshotPath, + }; + samples.push(row); + if (!opts.quiet) { + process.stdout.write( + ` batch ${b} totalIters=${row.totalIters} ` + + `rss=${fmtBytes(row.rssBytes)} heap=${fmtBytes(row.heapUsedBytes)} ` + + `trackedFinalized=${row.trackedFinalized}/${row.totalIters}\n`, + ); + } + } + + const final = samples[samples.length - 1]; + if (!final) { + fail("no samples collected"); + } + const rssGrowthMb = (final.rssBytes - baseline.rssBytes) / 1024 / 1024; + // Tracked retention: how many iter-allocated transcripts are STILL alive + // (have not been finalized). Lower is better. + const trackedRetention = final.totalIters - final.trackedFinalized; + + const durationSec = ((Date.now() - startedAt) / 1000).toFixed(1); + const verdict = + rssGrowthMb > opts.maxRssGrowthMb || trackedRetention > opts.maxTrackedRetention + ? "FAIL" + : "PASS"; + + process.stdout.write( + `${verdict}: mode=${opts.mode} ` + + `rss_growth=${rssGrowthMb.toFixed(1)}MB ` + + `tracked_retention=${trackedRetention}/${final.totalIters} ` + + `duration=${durationSec}s ` + + `(thresholds: rss<${opts.maxRssGrowthMb}MB, tracked<${opts.maxTrackedRetention})\n`, + ); + if (!opts.quiet) { + process.stdout.write( + `snapshots in ${opts.snapDir}/ — diff with:\n` + + ` node .agents/skills/openclaw-test-heap-leaks/scripts/heapsnapshot-delta.mjs ` + + `${baseline.snapshotPath} ${final.snapshotPath} --top 30\n`, + ); + } + process.exit(verdict === "PASS" ? 0 : 1); +} + +main().catch((err) => { + process.stderr.write(`harness crashed: ${String(err)}\n${(err as Error)?.stack ?? ""}\n`); + process.exit(2); +}); diff --git a/src/agents/pi-embedded-runner/run/abortable.test.ts b/src/agents/pi-embedded-runner/run/abortable.test.ts new file mode 100644 index 00000000000..312deb48f85 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/abortable.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it } from "vitest"; +import { abortable } from "./abortable.js"; + +describe("abortable", () => { + it("rejects with AbortError when signal aborts before inner settles", async () => { + const ac = new AbortController(); + const inner = new Promise(() => {}); + const wrapped = abortable(ac.signal, inner); + ac.abort(); + try { + await wrapped; + expect.fail("expected rejection"); + } catch (err) { + expect((err as Error).name).toBe("AbortError"); + } + }); + + it("rejects immediately when signal is already aborted", async () => { + const ac = new AbortController(); + ac.abort(); + const inner = new Promise(() => {}); + await expect(abortable(ac.signal, inner)).rejects.toThrow(/aborted/i); + }); + + it("resolves with inner value when inner settles before abort", async () => { + const ac = new AbortController(); + await expect(abortable(ac.signal, Promise.resolve(42))).resolves.toBe(42); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/abortable.ts b/src/agents/pi-embedded-runner/run/abortable.ts new file mode 100644 index 00000000000..c5ec7bf4661 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/abortable.ts @@ -0,0 +1,38 @@ +function getAbortReason(signal: AbortSignal): unknown { + return "reason" in signal ? (signal as { reason?: unknown }).reason : undefined; +} + +export function makeAbortError(signal: AbortSignal): Error { + const reason = getAbortReason(signal); + if (reason instanceof Error) { + const err = new Error(reason.message, { cause: reason }); + err.name = "AbortError"; + return err; + } + const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); + err.name = "AbortError"; + return err; +} + +export function abortable(signal: AbortSignal, promise: Promise): Promise { + if (signal.aborted) { + return Promise.reject(makeAbortError(signal)); + } + return new Promise((resolve, reject) => { + const onAbort = () => { + signal.removeEventListener("abort", onAbort); + reject(makeAbortError(signal)); + }; + signal.addEventListener("abort", onAbort, { once: true }); + promise.then( + (value) => { + signal.removeEventListener("abort", onAbort); + resolve(value); + }, + (err) => { + signal.removeEventListener("abort", onAbort); + reject(err); + }, + ); + }); +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 64503cb1eb4..31cd27c81d7 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -228,6 +228,7 @@ import { import { splitSdkTools } from "../tool-split.js"; import { mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; +import { abortable as abortableWithSignal } from "./abortable.js"; import { createEmbeddedAgentSessionWithResourceLoader } from "./attempt-session.js"; export { buildContextEnginePromptCacheInfo } from "./attempt.context-engine-helpers.js"; import { @@ -2108,19 +2109,6 @@ export async function runEmbeddedAttempt( err.name = "TimeoutError"; return err; }; - const makeAbortError = (signal: AbortSignal): Error => { - const reason = getAbortReason(signal); - // If the reason is already an Error, preserve it to keep the original message - // (e.g., "LLM idle timeout (s): no response from model" instead of "aborted") - if (reason instanceof Error) { - const err = new Error(reason.message, { cause: reason }); - err.name = "AbortError"; - return err; - } - const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted"); - err.name = "AbortError"; - return err; - }; const abortCompaction = () => { if (!activeSession.isCompacting) { return; @@ -2152,29 +2140,8 @@ export async function runEmbeddedAttempt( idleTimedOut = true; abortRun(true, error); }; - const abortable = (promise: Promise): Promise => { - const signal = runAbortController.signal; - if (signal.aborted) { - return Promise.reject(makeAbortError(signal)); - } - return new Promise((resolve, reject) => { - const onAbort = () => { - signal.removeEventListener("abort", onAbort); - reject(makeAbortError(signal)); - }; - signal.addEventListener("abort", onAbort, { once: true }); - promise.then( - (value) => { - signal.removeEventListener("abort", onAbort); - resolve(value); - }, - (err) => { - signal.removeEventListener("abort", onAbort); - reject(err); - }, - ); - }); - }; + const abortable = (promise: Promise): Promise => + abortableWithSignal(runAbortController.signal, promise); const subscription = subscribeEmbeddedPiSession( buildEmbeddedSubscriptionParams({