mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:50:43 +00:00
fix(agents): release embedded-run scope on hung provider abort + heap-leak harness (#75008)
* fix(agents): extract abortable from runEmbeddedAttempt to release captured run scope on hung provider abort (#74182) * test(agents): drop synthetic WeakRef retention test for abortable * feat(scripts): add embedded-run-abort-leak harness for runtime closure-leak validation * feat(scripts): add production mode to leak harness importing real abortable * docs(changelog): add #74182 fix entry for embedded-run abort closure release
This commit is contained in:
@@ -7,6 +7,8 @@ description: Investigate OpenClaw pnpm test memory growth, Vitest OOMs, RSS spik
|
||||
|
||||
Use this skill for test-memory investigations. Do not guess from RSS alone when heap snapshots are available. Treat snapshot-name deltas as triage evidence, not proof, until retainers or dominators support the call.
|
||||
|
||||
For **runtime fixes** (e.g., closure leaks in long-running services like the gateway), see [Validating runtime fixes](#validating-runtime-fixes-not-test-memory) below — that uses a dedicated harness, not the test-parallel snapshot machinery.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Reproduce the failing shape first.
|
||||
@@ -63,6 +65,38 @@ Use this skill for test-memory investigations. Do not guess from RSS alone when
|
||||
|
||||
Read the top positive deltas first. Large positive growth in module-transform artifacts suggests lane isolation; large positive growth in runtime objects suggests a real leak. If the names alone do not settle it, open the same snapshot pair in DevTools and inspect retainers/dominators for the top rows before declaring root cause.
|
||||
|
||||
## Validating runtime fixes (not test-memory)
|
||||
|
||||
The workflow above is for diagnosing Vitest worker memory growth. For
|
||||
validating that a runtime/closure fix actually releases captured state, use the
|
||||
dedicated harness:
|
||||
|
||||
- `pnpm leak:embedded-run` — runs `scripts/embedded-run-abort-leak.ts`. Loops N
|
||||
aborted runs in a function-shaped scope mimicking `runEmbeddedAttempt`,
|
||||
writes heap snapshots, and reports a PASS/FAIL verdict on retention growth
|
||||
using `FinalizationRegistry` for tracked-instance counting plus RSS delta.
|
||||
|
||||
Modes:
|
||||
|
||||
- `closure-extracted` (default) — production fix shape (helper at module scope).
|
||||
- `closure-inline` — pre-fix shape (closure inside the runner scope). Use as a
|
||||
sensitivity check: if it passes you've broken the harness, not fixed a bug.
|
||||
- `synthetic-leak` — deliberately retains via a module-level bucket. Use to
|
||||
confirm the harness can detect leaks before trusting a PASS on a real fix.
|
||||
|
||||
Snapshots land in `.tmp/embedded-run-abort-leak/`. Diff with the same script
|
||||
as above:
|
||||
|
||||
```
|
||||
node .agents/skills/openclaw-test-heap-leaks/scripts/heapsnapshot-delta.mjs \
|
||||
.tmp/embedded-run-abort-leak/baseline-*.heapsnapshot \
|
||||
.tmp/embedded-run-abort-leak/batch-N-*.heapsnapshot --top 30
|
||||
```
|
||||
|
||||
When fixing a different runtime leak, add a new harness alongside this one
|
||||
rather than retrofitting it. The fixture function should mimic the lexical
|
||||
scope of the function where the leak lives, not be a generic abort-loop.
|
||||
|
||||
## Output Expectations
|
||||
|
||||
When using this skill, report:
|
||||
|
||||
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007.
|
||||
- Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041.
|
||||
- Agents/commitments: keep inferred follow-ups internal when heartbeat target is none, strip raw source text from stored commitments, disable tools during due-commitment heartbeat turns, bound hidden extraction queue growth, expire stale commitments, and add QA/Docker safety coverage. Thanks @vignesh07.
|
||||
- Telegram/agents: keep typing indicators and optional generation tools off the reply critical path, so fresh Telegram replies no longer stall while provider catalogs and media models load. (#75360) Thanks @obviyus.
|
||||
|
||||
@@ -1357,6 +1357,7 @@
|
||||
"ios:version:check": "node --import tsx scripts/ios-sync-versioning.ts --check",
|
||||
"ios:version:pin": "node --import tsx scripts/ios-pin-version.ts",
|
||||
"ios:version:sync": "node --import tsx scripts/ios-sync-versioning.ts --write",
|
||||
"leak:embedded-run": "node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts",
|
||||
"lint": "node scripts/run-oxlint-shards.mjs",
|
||||
"lint:agent:ingress-owner": "node scripts/check-ingress-agent-owner-context.mjs",
|
||||
"lint:all": "node scripts/run-oxlint.mjs",
|
||||
|
||||
338
scripts/embedded-run-abort-leak.ts
Normal file
338
scripts/embedded-run-abort-leak.ts
Normal file
@@ -0,0 +1,338 @@
|
||||
/**
|
||||
* Heap-leak harness for the runEmbeddedAttempt abort path. Loops aborted runs
|
||||
* in a function-shaped scope that mimics the runner, snapshots the heap, and
|
||||
* computes a PASS/FAIL verdict from RSS delta + tracked-instance retention.
|
||||
*
|
||||
* Usage:
|
||||
* node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts \
|
||||
* --mode production --iters 50 --batches 5
|
||||
*
|
||||
* Modes:
|
||||
* production (default): imports the real abortable from src; PASS proves the fix works.
|
||||
* closure-extracted: self-contained module-scope helper (mirrors production shape).
|
||||
* closure-inline: pre-fix shape (closure inside runner scope).
|
||||
* synthetic-leak: deliberately retains via module-level bucket
|
||||
* (sanity check that the harness detects leaks).
|
||||
*
|
||||
* Exit code: 0 if PASS, 1 if FAIL (leak detected).
|
||||
*/
|
||||
import * as fs from "node:fs";
|
||||
import * as path from "node:path";
|
||||
import * as v8 from "node:v8";
|
||||
import { abortable as productionAbortable } from "../src/agents/pi-embedded-runner/run/abortable.js";
|
||||
|
||||
type Mode = "production" | "closure-extracted" | "closure-inline" | "synthetic-leak";
|
||||
|
||||
type Options = {
|
||||
iters: number;
|
||||
batches: number;
|
||||
snapDir: string;
|
||||
mode: Mode;
|
||||
maxRssGrowthMb: number;
|
||||
maxTrackedRetention: number;
|
||||
scopeBytes: number;
|
||||
quiet: boolean;
|
||||
};
|
||||
|
||||
function parseArgs(argv: string[]): Options {
|
||||
const opts: Options = {
|
||||
iters: 50,
|
||||
batches: 5,
|
||||
snapDir: ".tmp/embedded-run-abort-leak",
|
||||
mode: "production",
|
||||
maxRssGrowthMb: 64,
|
||||
maxTrackedRetention: 16,
|
||||
scopeBytes: 2_000_000,
|
||||
quiet: false,
|
||||
};
|
||||
for (let i = 0; i < argv.length; i += 1) {
|
||||
const arg = argv[i];
|
||||
const next = argv[i + 1];
|
||||
switch (arg) {
|
||||
case "--iters":
|
||||
opts.iters = Number.parseInt(next ?? "", 10);
|
||||
i += 1;
|
||||
break;
|
||||
case "--batches":
|
||||
opts.batches = Number.parseInt(next ?? "", 10);
|
||||
i += 1;
|
||||
break;
|
||||
case "--snap-dir":
|
||||
opts.snapDir = next ?? opts.snapDir;
|
||||
i += 1;
|
||||
break;
|
||||
case "--mode":
|
||||
if (
|
||||
next === "production" ||
|
||||
next === "closure-extracted" ||
|
||||
next === "closure-inline" ||
|
||||
next === "synthetic-leak"
|
||||
) {
|
||||
opts.mode = next;
|
||||
} else {
|
||||
fail(
|
||||
`--mode must be one of: production, closure-extracted, closure-inline, synthetic-leak`,
|
||||
);
|
||||
}
|
||||
i += 1;
|
||||
break;
|
||||
case "--max-rss-growth-mb":
|
||||
opts.maxRssGrowthMb = Number.parseInt(next ?? "", 10);
|
||||
i += 1;
|
||||
break;
|
||||
case "--max-tracked-retention":
|
||||
opts.maxTrackedRetention = Number.parseInt(next ?? "", 10);
|
||||
i += 1;
|
||||
break;
|
||||
case "--scope-bytes":
|
||||
opts.scopeBytes = Number.parseInt(next ?? "", 10);
|
||||
i += 1;
|
||||
break;
|
||||
case "--quiet":
|
||||
opts.quiet = true;
|
||||
break;
|
||||
case "--help":
|
||||
case "-h":
|
||||
printUsage();
|
||||
process.exit(0);
|
||||
break;
|
||||
default:
|
||||
fail(`Unknown arg: ${arg}`);
|
||||
}
|
||||
}
|
||||
if (!Number.isFinite(opts.iters) || opts.iters <= 0) {
|
||||
fail("--iters must be > 0");
|
||||
}
|
||||
if (!Number.isFinite(opts.batches) || opts.batches <= 0) {
|
||||
fail("--batches must be > 0");
|
||||
}
|
||||
return opts;
|
||||
}
|
||||
|
||||
function printUsage(): void {
|
||||
process.stderr.write(
|
||||
[
|
||||
"Usage: node --import tsx --expose-gc scripts/embedded-run-abort-leak.ts [flags]",
|
||||
" --mode <production|closure-extracted|closure-inline|synthetic-leak>",
|
||||
" --iters N iterations per batch (default 50)",
|
||||
" --batches B batches between snapshots (default 5)",
|
||||
" --snap-dir DIR heap snapshot output dir (default .tmp/embedded-run-abort-leak)",
|
||||
" --scope-bytes N simulated run-scope payload size (default 2_000_000)",
|
||||
" --max-rss-growth-mb PASS threshold for RSS growth (default 64)",
|
||||
" --max-tracked-retention PASS threshold for tracked finalizer retention (default 16)",
|
||||
" --quiet only print final verdict",
|
||||
"",
|
||||
].join("\n"),
|
||||
);
|
||||
}
|
||||
|
||||
function fail(msg: string): never {
|
||||
process.stderr.write(`error: ${msg}\n`);
|
||||
process.exit(2);
|
||||
}
|
||||
|
||||
const KEEP_ALIVE: Array<Promise<unknown>> = [];
|
||||
const SYNTHETIC_LEAK_BUCKET: Uint8Array[] = [];
|
||||
const FINALIZED = { count: 0 };
|
||||
const finalizer = new FinalizationRegistry<number>(() => {
|
||||
FINALIZED.count += 1;
|
||||
});
|
||||
|
||||
function abortableExtracted<T>(signal: AbortSignal, promise: Promise<T>): Promise<T> {
|
||||
if (signal.aborted) {
|
||||
return Promise.reject(new Error("aborted"));
|
||||
}
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
promise.then(
|
||||
(value) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolve(value);
|
||||
},
|
||||
(err) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(err);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
function runOnce(mode: Mode, scopeBytes: number, iter: number): void {
|
||||
const transcript = new Uint8Array(scopeBytes);
|
||||
const toolMetas = [{ data: new Uint8Array(scopeBytes / 4) }];
|
||||
const subscription = {
|
||||
onPartialReply: (_text: string) => {
|
||||
void transcript;
|
||||
},
|
||||
onAssistantMessageStart: () => {
|
||||
void toolMetas;
|
||||
},
|
||||
};
|
||||
finalizer.register(transcript, iter);
|
||||
|
||||
const ac = new AbortController();
|
||||
const neverSettling = new Promise<unknown>(() => {});
|
||||
KEEP_ALIVE.push(neverSettling);
|
||||
|
||||
if (mode === "production") {
|
||||
void productionAbortable(ac.signal, neverSettling).catch(() => {});
|
||||
} else if (mode === "closure-extracted") {
|
||||
void abortableExtracted(ac.signal, neverSettling).catch(() => {});
|
||||
} else if (mode === "closure-inline") {
|
||||
const wrapped = new Promise<unknown>((resolve, reject) => {
|
||||
const onAbort = () => reject(new Error("aborted"));
|
||||
ac.signal.addEventListener("abort", onAbort, { once: true });
|
||||
neverSettling.then(
|
||||
(v) => {
|
||||
void transcript;
|
||||
void toolMetas;
|
||||
void subscription;
|
||||
resolve(v);
|
||||
},
|
||||
(e) => {
|
||||
void transcript;
|
||||
void toolMetas;
|
||||
void subscription;
|
||||
reject(e);
|
||||
},
|
||||
);
|
||||
});
|
||||
void wrapped.catch(() => {});
|
||||
} else {
|
||||
SYNTHETIC_LEAK_BUCKET.push(transcript);
|
||||
}
|
||||
ac.abort();
|
||||
|
||||
void transcript.length;
|
||||
void toolMetas.length;
|
||||
void subscription.onPartialReply;
|
||||
}
|
||||
|
||||
async function settleAndGc(): Promise<void> {
|
||||
for (let i = 0; i < 4; i += 1) {
|
||||
await new Promise<void>((r) => setImmediate(r));
|
||||
globalThis.gc?.();
|
||||
}
|
||||
await new Promise<void>((r) => setTimeout(r, 100));
|
||||
globalThis.gc?.();
|
||||
}
|
||||
|
||||
type SampleRow = {
|
||||
label: string;
|
||||
rssBytes: number;
|
||||
heapUsedBytes: number;
|
||||
totalIters: number;
|
||||
trackedFinalized: number;
|
||||
snapshotPath: string;
|
||||
};
|
||||
|
||||
function takeSnapshot(snapDir: string, label: string): string {
|
||||
fs.mkdirSync(snapDir, { recursive: true });
|
||||
const filename = path.join(snapDir, `${label}-${process.pid}-${Date.now()}.heapsnapshot`);
|
||||
v8.writeHeapSnapshot(filename);
|
||||
return filename;
|
||||
}
|
||||
|
||||
function fmtBytes(bytes: number): string {
|
||||
return `${(bytes / 1024 / 1024).toFixed(1)}MB`;
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const opts = parseArgs(process.argv.slice(2));
|
||||
if (typeof globalThis.gc !== "function") {
|
||||
fail("--expose-gc is required (run with: node --expose-gc ...)");
|
||||
}
|
||||
|
||||
const startedAt = Date.now();
|
||||
const samples: SampleRow[] = [];
|
||||
|
||||
if (!opts.quiet) {
|
||||
process.stdout.write(
|
||||
`[harness] mode=${opts.mode} iters=${opts.iters} batches=${opts.batches} ` +
|
||||
`scope=${fmtBytes(opts.scopeBytes)} pid=${process.pid}\n`,
|
||||
);
|
||||
}
|
||||
|
||||
await settleAndGc();
|
||||
const baselinePath = takeSnapshot(opts.snapDir, "baseline");
|
||||
const baseline: SampleRow = {
|
||||
label: "baseline",
|
||||
rssBytes: process.memoryUsage().rss,
|
||||
heapUsedBytes: process.memoryUsage().heapUsed,
|
||||
totalIters: 0,
|
||||
trackedFinalized: FINALIZED.count,
|
||||
snapshotPath: baselinePath,
|
||||
};
|
||||
samples.push(baseline);
|
||||
if (!opts.quiet) {
|
||||
process.stdout.write(
|
||||
` baseline rss=${fmtBytes(baseline.rssBytes)} heap=${fmtBytes(baseline.heapUsedBytes)}\n`,
|
||||
);
|
||||
}
|
||||
|
||||
let totalIters = 0;
|
||||
for (let b = 0; b < opts.batches; b += 1) {
|
||||
for (let i = 0; i < opts.iters; i += 1) {
|
||||
runOnce(opts.mode, opts.scopeBytes, totalIters);
|
||||
totalIters += 1;
|
||||
}
|
||||
await settleAndGc();
|
||||
const snapshotPath = takeSnapshot(opts.snapDir, `batch-${b}`);
|
||||
const row: SampleRow = {
|
||||
label: `batch-${b}`,
|
||||
rssBytes: process.memoryUsage().rss,
|
||||
heapUsedBytes: process.memoryUsage().heapUsed,
|
||||
totalIters,
|
||||
trackedFinalized: FINALIZED.count,
|
||||
snapshotPath,
|
||||
};
|
||||
samples.push(row);
|
||||
if (!opts.quiet) {
|
||||
process.stdout.write(
|
||||
` batch ${b} totalIters=${row.totalIters} ` +
|
||||
`rss=${fmtBytes(row.rssBytes)} heap=${fmtBytes(row.heapUsedBytes)} ` +
|
||||
`trackedFinalized=${row.trackedFinalized}/${row.totalIters}\n`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const final = samples[samples.length - 1];
|
||||
if (!final) {
|
||||
fail("no samples collected");
|
||||
}
|
||||
const rssGrowthMb = (final.rssBytes - baseline.rssBytes) / 1024 / 1024;
|
||||
// Tracked retention: how many iter-allocated transcripts are STILL alive
|
||||
// (have not been finalized). Lower is better.
|
||||
const trackedRetention = final.totalIters - final.trackedFinalized;
|
||||
|
||||
const durationSec = ((Date.now() - startedAt) / 1000).toFixed(1);
|
||||
const verdict =
|
||||
rssGrowthMb > opts.maxRssGrowthMb || trackedRetention > opts.maxTrackedRetention
|
||||
? "FAIL"
|
||||
: "PASS";
|
||||
|
||||
process.stdout.write(
|
||||
`${verdict}: mode=${opts.mode} ` +
|
||||
`rss_growth=${rssGrowthMb.toFixed(1)}MB ` +
|
||||
`tracked_retention=${trackedRetention}/${final.totalIters} ` +
|
||||
`duration=${durationSec}s ` +
|
||||
`(thresholds: rss<${opts.maxRssGrowthMb}MB, tracked<${opts.maxTrackedRetention})\n`,
|
||||
);
|
||||
if (!opts.quiet) {
|
||||
process.stdout.write(
|
||||
`snapshots in ${opts.snapDir}/ — diff with:\n` +
|
||||
` node .agents/skills/openclaw-test-heap-leaks/scripts/heapsnapshot-delta.mjs ` +
|
||||
`${baseline.snapshotPath} ${final.snapshotPath} --top 30\n`,
|
||||
);
|
||||
}
|
||||
process.exit(verdict === "PASS" ? 0 : 1);
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
process.stderr.write(`harness crashed: ${String(err)}\n${(err as Error)?.stack ?? ""}\n`);
|
||||
process.exit(2);
|
||||
});
|
||||
29
src/agents/pi-embedded-runner/run/abortable.test.ts
Normal file
29
src/agents/pi-embedded-runner/run/abortable.test.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { abortable } from "./abortable.js";
|
||||
|
||||
describe("abortable", () => {
|
||||
it("rejects with AbortError when signal aborts before inner settles", async () => {
|
||||
const ac = new AbortController();
|
||||
const inner = new Promise<void>(() => {});
|
||||
const wrapped = abortable(ac.signal, inner);
|
||||
ac.abort();
|
||||
try {
|
||||
await wrapped;
|
||||
expect.fail("expected rejection");
|
||||
} catch (err) {
|
||||
expect((err as Error).name).toBe("AbortError");
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects immediately when signal is already aborted", async () => {
|
||||
const ac = new AbortController();
|
||||
ac.abort();
|
||||
const inner = new Promise<void>(() => {});
|
||||
await expect(abortable(ac.signal, inner)).rejects.toThrow(/aborted/i);
|
||||
});
|
||||
|
||||
it("resolves with inner value when inner settles before abort", async () => {
|
||||
const ac = new AbortController();
|
||||
await expect(abortable(ac.signal, Promise.resolve(42))).resolves.toBe(42);
|
||||
});
|
||||
});
|
||||
38
src/agents/pi-embedded-runner/run/abortable.ts
Normal file
38
src/agents/pi-embedded-runner/run/abortable.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
function getAbortReason(signal: AbortSignal): unknown {
|
||||
return "reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
|
||||
}
|
||||
|
||||
export function makeAbortError(signal: AbortSignal): Error {
|
||||
const reason = getAbortReason(signal);
|
||||
if (reason instanceof Error) {
|
||||
const err = new Error(reason.message, { cause: reason });
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
}
|
||||
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
}
|
||||
|
||||
export function abortable<T>(signal: AbortSignal, promise: Promise<T>): Promise<T> {
|
||||
if (signal.aborted) {
|
||||
return Promise.reject(makeAbortError(signal));
|
||||
}
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(makeAbortError(signal));
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
promise.then(
|
||||
(value) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolve(value);
|
||||
},
|
||||
(err) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(err);
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -228,6 +228,7 @@ import {
|
||||
import { splitSdkTools } from "../tool-split.js";
|
||||
import { mapThinkingLevel } from "../utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||
import { abortable as abortableWithSignal } from "./abortable.js";
|
||||
import { createEmbeddedAgentSessionWithResourceLoader } from "./attempt-session.js";
|
||||
export { buildContextEnginePromptCacheInfo } from "./attempt.context-engine-helpers.js";
|
||||
import {
|
||||
@@ -2108,19 +2109,6 @@ export async function runEmbeddedAttempt(
|
||||
err.name = "TimeoutError";
|
||||
return err;
|
||||
};
|
||||
const makeAbortError = (signal: AbortSignal): Error => {
|
||||
const reason = getAbortReason(signal);
|
||||
// If the reason is already an Error, preserve it to keep the original message
|
||||
// (e.g., "LLM idle timeout (<n>s): no response from model" instead of "aborted")
|
||||
if (reason instanceof Error) {
|
||||
const err = new Error(reason.message, { cause: reason });
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
}
|
||||
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
};
|
||||
const abortCompaction = () => {
|
||||
if (!activeSession.isCompacting) {
|
||||
return;
|
||||
@@ -2152,29 +2140,8 @@ export async function runEmbeddedAttempt(
|
||||
idleTimedOut = true;
|
||||
abortRun(true, error);
|
||||
};
|
||||
const abortable = <T>(promise: Promise<T>): Promise<T> => {
|
||||
const signal = runAbortController.signal;
|
||||
if (signal.aborted) {
|
||||
return Promise.reject(makeAbortError(signal));
|
||||
}
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(makeAbortError(signal));
|
||||
};
|
||||
signal.addEventListener("abort", onAbort, { once: true });
|
||||
promise.then(
|
||||
(value) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
resolve(value);
|
||||
},
|
||||
(err) => {
|
||||
signal.removeEventListener("abort", onAbort);
|
||||
reject(err);
|
||||
},
|
||||
);
|
||||
});
|
||||
};
|
||||
const abortable = <T>(promise: Promise<T>): Promise<T> =>
|
||||
abortableWithSignal(runAbortController.signal, promise);
|
||||
|
||||
const subscription = subscribeEmbeddedPiSession(
|
||||
buildEmbeddedSubscriptionParams({
|
||||
|
||||
Reference in New Issue
Block a user