From 7168896fdfed027cdb5da995bd18c358aab39b66 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 5 May 2026 01:27:17 +0100 Subject: [PATCH] fix(agents): abort post-compaction loops out-of-band --- .../run.compaction-loop-guard.test.ts | 22 +++++++++---- src/agents/pi-embedded-runner/run.ts | 32 +++++++++++++++++-- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts b/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts index 59e3e245298..348ee9c0a80 100644 --- a/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts +++ b/src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts @@ -153,20 +153,26 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { }); }); - it("aborts the run with PostCompactionLoopPersistedError when identical (tool, args, result) repeats windowSize times after compaction", async () => { + it("aborts the attempt out-of-band when identical (tool, args, result) repeats windowSize times after compaction", async () => { const overflowError = makeOverflowError(); let attemptReturned = false; + let attemptSignalAborted = false; + let attemptSignalReason: unknown; // Attempt 1: overflow → triggers compaction. mockedRunEmbeddedAttempt.mockImplementationOnce(async () => makeAttemptResult({ promptError: overflowError }), ); // Attempt 2: post-compaction. The live wrapped-tool path records each - // outcome while the prompt is still running; the third identical result - // aborts before the attempt can return. + // outcome while the prompt is still running. The third identical result + // must not rely on throwing out of tool execution (the dependency converts + // tool errors into tool results); instead it aborts the attempt signal and + // the runner raises the persisted-loop error after the attempt unwinds. mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => { - const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver }) - .onToolOutcome; + const { abortSignal, onToolOutcome } = attemptParams as { + abortSignal?: AbortSignal; + onToolOutcome?: ToolOutcomeObserver; + }; for (let i = 0; i < 3; i += 1) { await executeWrappedToolOutcome( "gateway", @@ -175,6 +181,8 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { onToolOutcome, ); } + attemptSignalAborted = abortSignal?.aborted ?? false; + attemptSignalReason = abortSignal?.reason; attemptReturned = true; return makeAttemptResult({ promptError: null, @@ -196,7 +204,9 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => { expect(mockedCompactDirect).toHaveBeenCalledTimes(1); expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(2); - expect(attemptReturned).toBe(false); + expect(attemptReturned).toBe(true); + expect(attemptSignalAborted).toBe(true); + expect(attemptSignalReason).toBeInstanceOf(PostCompactionLoopPersistedError); }); it("does not abort when the result hash changes across post-compaction attempts (progress was made)", async () => { diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 81219bd8ee2..f81ee725bb4 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -799,12 +799,15 @@ export async function runEmbeddedPiAgent( resolvedLoopDetectionConfig?.postCompactionGuard, { enabled: resolvedLoopDetectionConfig?.enabled !== false }, ); + let postCompactionAbortController: AbortController | undefined; + let postCompactionAbortError: PostCompactionLoopPersistedError | undefined; const observePostCompactionToolOutcome = ( observation: PostCompactionGuardObservation, ): void => { const verdict = postCompactionGuard.observe(observation); if (verdict.shouldAbort) { - throw PostCompactionLoopPersistedError.fromVerdict(verdict); + postCompactionAbortError ??= PostCompactionLoopPersistedError.fromVerdict(verdict); + postCompactionAbortController?.abort(postCompactionAbortError); } }; let lastRetryFailoverReason: FailoverReason | null = null; @@ -1099,6 +1102,17 @@ export async function runEmbeddedPiAgent( startupStagesEmitted = true; } + const attemptAbortController = new AbortController(); + postCompactionAbortController = attemptAbortController; + const parentAbortSignal = params.abortSignal; + const relayParentAbort = (): void => { + attemptAbortController.abort(parentAbortSignal?.reason); + }; + if (parentAbortSignal?.aborted) { + relayParentAbort(); + } else { + parentAbortSignal?.addEventListener("abort", relayParentAbort, { once: true }); + } const rawAttempt = await runEmbeddedAttemptWithBackend({ sessionId: activeSessionId, sessionKey: resolvedSessionKey, @@ -1177,7 +1191,7 @@ export async function runEmbeddedPiAgent( bashElevated: params.bashElevated, timeoutMs: params.timeoutMs, runId: params.runId, - abortSignal: params.abortSignal, + abortSignal: attemptAbortController.signal, replyOperation: params.replyOperation, shouldEmitToolResult: params.shouldEmitToolResult, shouldEmitToolOutput: params.shouldEmitToolOutput, @@ -1216,7 +1230,19 @@ export async function runEmbeddedPiAgent( bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1], suppressNextUserMessagePersistence, onUserMessagePersisted, - }); + }) + .catch((err: unknown): never => { + throw postCompactionAbortError ?? err; + }) + .finally(() => { + parentAbortSignal?.removeEventListener?.("abort", relayParentAbort); + if (postCompactionAbortController === attemptAbortController) { + postCompactionAbortController = undefined; + } + }); + if (postCompactionAbortError) { + throw postCompactionAbortError; + } const attempt = normalizeEmbeddedRunAttemptResult(rawAttempt); const {