Cron: clean up deleteAfterRun direct deliveries

This commit is contained in:
Ted Li
2026-04-16 13:23:02 -07:00
parent 354dbf2161
commit 700a8e8dd0
2 changed files with 72 additions and 7 deletions

View File

@@ -852,6 +852,34 @@ describe("dispatchCronDelivery — double-announce guard", () => {
);
});
it("cleans up the direct cron session after threaded direct delivery when deleteAfterRun is enabled", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
const params = makeBaseParams({ synthesizedText: "Final weather summary" });
params.resolvedDelivery = {
...makeResolvedDelivery(),
mode: "implicit",
threadId: 42,
};
(params.job as { deleteAfterRun?: boolean }).deleteAfterRun = true;
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(callGateway).toHaveBeenCalledWith({
method: "sessions.delete",
params: {
key: "agent:main",
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
});
it("delivers structured heartbeat/media payloads once through the outbound adapter", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
@@ -883,6 +911,33 @@ describe("dispatchCronDelivery — double-announce guard", () => {
);
});
it("cleans up the direct cron session after structured direct delivery when deleteAfterRun is enabled", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
const params = makeBaseParams({ synthesizedText: "HEARTBEAT_OK" });
params.deliveryPayloadHasStructuredContent = true;
params.deliveryPayloads = [
{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" },
] as never;
(params.job as { deleteAfterRun?: boolean }).deleteAfterRun = true;
const state = await dispatchCronDelivery(params);
expect(state.result).toBeUndefined();
expect(state.delivered).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
expect(callGateway).toHaveBeenCalledWith({
method: "sessions.delete",
params: {
key: "agent:main",
deleteTranscript: true,
emitLifecycleHooks: false,
},
timeoutMs: 10_000,
});
});
it("suppresses NO_REPLY payload with surrounding whitespace", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
@@ -965,6 +1020,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
},
timeoutMs: 10_000,
});
expect(callGateway).toHaveBeenCalledTimes(1);
});
it("suppresses trailing NO_REPLY after summary text in direct delivery (#64976)", async () => {

View File

@@ -441,6 +441,7 @@ export async function dispatchCronDelivery(
// remains the only source of delivered state.
let delivered = skipMessagingToolDelivery;
let deliveryAttempted = skipMessagingToolDelivery;
let directCronSessionDeleted = false;
const failDeliveryTarget = (error: string) =>
params.withRunSession({
status: "error",
@@ -452,7 +453,7 @@ export async function dispatchCronDelivery(
...params.telemetry,
});
const cleanupDirectCronSessionIfNeeded = async (): Promise<void> => {
if (!params.job.deleteAfterRun) {
if (!params.job.deleteAfterRun || directCronSessionDeleted) {
return;
}
try {
@@ -466,6 +467,7 @@ export async function dispatchCronDelivery(
},
timeoutMs: 10_000,
});
directCronSessionDeleted = true;
} catch {
// Best-effort; direct delivery result should still be returned.
}
@@ -648,6 +650,17 @@ export async function dispatchCronDelivery(
}
};
const deliverViaDirectAndCleanup = async (
delivery: SuccessfulDeliveryTarget,
options?: { retryTransient?: boolean },
): Promise<RunCronAgentTurnResult | null> => {
try {
return await deliverViaDirect(delivery, options);
} finally {
await cleanupDirectCronSessionIfNeeded();
}
};
const finalizeTextDelivery = async (
delivery: SuccessfulDeliveryTarget,
): Promise<RunCronAgentTurnResult | null> => {
@@ -758,11 +771,7 @@ export async function dispatchCronDelivery(
...params.telemetry,
});
}
try {
return await deliverViaDirect(delivery, { retryTransient: true });
} finally {
await cleanupDirectCronSessionIfNeeded();
}
return await deliverViaDirectAndCleanup(delivery, { retryTransient: true });
};
if (params.deliveryRequested && !params.skipHeartbeatDelivery && !skipMessagingToolDelivery) {
@@ -802,7 +811,7 @@ export async function dispatchCronDelivery(
const useDirectDelivery =
params.deliveryPayloadHasStructuredContent || params.resolvedDelivery.threadId != null;
if (useDirectDelivery) {
const directResult = await deliverViaDirect(params.resolvedDelivery);
const directResult = await deliverViaDirectAndCleanup(params.resolvedDelivery);
if (directResult) {
return {
result: directResult,