From 700a8e8dd0cc81059f47b8e13d25dc0bdaf8d99d Mon Sep 17 00:00:00 2001 From: Ted Li Date: Thu, 16 Apr 2026 13:23:02 -0700 Subject: [PATCH] Cron: clean up deleteAfterRun direct deliveries --- .../delivery-dispatch.double-announce.test.ts | 56 +++++++++++++++++++ src/cron/isolated-agent/delivery-dispatch.ts | 23 +++++--- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 52dc888b592..ec3f37dc76d 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -852,6 +852,34 @@ describe("dispatchCronDelivery — double-announce guard", () => { ); }); + it("cleans up the direct cron session after threaded direct delivery when deleteAfterRun is enabled", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ synthesizedText: "Final weather summary" }); + params.resolvedDelivery = { + ...makeResolvedDelivery(), + mode: "implicit", + threadId: 42, + }; + (params.job as { deleteAfterRun?: boolean }).deleteAfterRun = true; + + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(callGateway).toHaveBeenCalledWith({ + method: "sessions.delete", + params: { + key: "agent:main", + deleteTranscript: true, + emitLifecycleHooks: false, + }, + timeoutMs: 10_000, + }); + }); + it("delivers structured heartbeat/media payloads once through the outbound adapter", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); @@ -883,6 +911,33 @@ describe("dispatchCronDelivery — double-announce guard", () => { ); }); + it("cleans up the direct cron session after structured direct delivery when deleteAfterRun is enabled", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + + const params = makeBaseParams({ synthesizedText: "HEARTBEAT_OK" }); + params.deliveryPayloadHasStructuredContent = true; + params.deliveryPayloads = [ + { text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }, + ] as never; + (params.job as { deleteAfterRun?: boolean }).deleteAfterRun = true; + + const state = await dispatchCronDelivery(params); + + expect(state.result).toBeUndefined(); + expect(state.delivered).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + expect(callGateway).toHaveBeenCalledWith({ + method: "sessions.delete", + params: { + key: "agent:main", + deleteTranscript: true, + emitLifecycleHooks: false, + }, + timeoutMs: 10_000, + }); + }); + it("suppresses NO_REPLY payload with surrounding whitespace", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); @@ -965,6 +1020,7 @@ describe("dispatchCronDelivery — double-announce guard", () => { }, timeoutMs: 10_000, }); + expect(callGateway).toHaveBeenCalledTimes(1); }); it("suppresses trailing NO_REPLY after summary text in direct delivery (#64976)", async () => { diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 687bebfbac8..085ada30644 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -441,6 +441,7 @@ export async function dispatchCronDelivery( // remains the only source of delivered state. let delivered = skipMessagingToolDelivery; let deliveryAttempted = skipMessagingToolDelivery; + let directCronSessionDeleted = false; const failDeliveryTarget = (error: string) => params.withRunSession({ status: "error", @@ -452,7 +453,7 @@ export async function dispatchCronDelivery( ...params.telemetry, }); const cleanupDirectCronSessionIfNeeded = async (): Promise => { - if (!params.job.deleteAfterRun) { + if (!params.job.deleteAfterRun || directCronSessionDeleted) { return; } try { @@ -466,6 +467,7 @@ export async function dispatchCronDelivery( }, timeoutMs: 10_000, }); + directCronSessionDeleted = true; } catch { // Best-effort; direct delivery result should still be returned. } @@ -648,6 +650,17 @@ export async function dispatchCronDelivery( } }; + const deliverViaDirectAndCleanup = async ( + delivery: SuccessfulDeliveryTarget, + options?: { retryTransient?: boolean }, + ): Promise => { + try { + return await deliverViaDirect(delivery, options); + } finally { + await cleanupDirectCronSessionIfNeeded(); + } + }; + const finalizeTextDelivery = async ( delivery: SuccessfulDeliveryTarget, ): Promise => { @@ -758,11 +771,7 @@ export async function dispatchCronDelivery( ...params.telemetry, }); } - try { - return await deliverViaDirect(delivery, { retryTransient: true }); - } finally { - await cleanupDirectCronSessionIfNeeded(); - } + return await deliverViaDirectAndCleanup(delivery, { retryTransient: true }); }; if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) { @@ -802,7 +811,7 @@ export async function dispatchCronDelivery( const useDirectDelivery = params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null; if (useDirectDelivery) { - const directResult = await deliverViaDirect(params.resolvedDelivery); + const directResult = await deliverViaDirectAndCleanup(params.resolvedDelivery); if (directResult) { return { result: directResult,