diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 5d6f5501060..cd6545a2df9 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -556,7 +556,11 @@ async function maybeQueueSubagentAnnounce(params: { triggerMessage: string; summaryLine?: string; requesterOrigin?: DeliveryContext; + signal?: AbortSignal; }): Promise<"steered" | "queued" | "none"> { + if (params.signal?.aborted) { + return "none"; + } const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); const sessionId = entry?.sessionId; @@ -637,7 +641,14 @@ async function sendSubagentAnnounceDirectly(params: { completionDirectOrigin?: DeliveryContext; directOrigin?: DeliveryContext; requesterIsSubagent: boolean; + signal?: AbortSignal; }): Promise { + if (params.signal?.aborted) { + return { + delivered: false, + path: "none", + }; + } const cfg = loadConfig(); const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg); const canonicalRequesterSessionKey = resolveRequesterStoreKey( @@ -691,6 +702,12 @@ async function sendSubagentAnnounceDirectly(params: { completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== "" ? String(completionDirectOrigin.threadId) : undefined; + if (params.signal?.aborted) { + return { + delivered: false, + path: "none", + }; + } await callGateway({ method: "send", params: { @@ -717,6 +734,12 @@ async function sendSubagentAnnounceDirectly(params: { directOrigin?.threadId != null && directOrigin.threadId !== "" ? String(directOrigin.threadId) : undefined; + if (params.signal?.aborted) { + return { + delivered: false, + path: "none", + }; + } await callGateway({ method: "agent", params: { @@ -761,7 +784,14 @@ async function deliverSubagentAnnouncement(params: { completionRouteMode?: "bound" | "fallback" | "hook"; spawnMode?: SpawnSubagentMode; directIdempotencyKey: string; + signal?: AbortSignal; }): Promise { + if (params.signal?.aborted) { + return { + delivered: false, + path: "none", + }; + } // Non-completion mode mirrors historical behavior: try queued/steered delivery first, // then (only if not queued) attempt direct delivery. if (!params.expectsCompletionMessage) { @@ -771,6 +801,7 @@ async function deliverSubagentAnnouncement(params: { triggerMessage: params.triggerMessage, summaryLine: params.summaryLine, requesterOrigin: params.requesterOrigin, + signal: params.signal, }); const queued = queueOutcomeToDeliveryResult(queueOutcome); if (queued.delivered) { @@ -791,6 +822,7 @@ async function deliverSubagentAnnouncement(params: { directOrigin: params.directOrigin, requesterIsSubagent: params.requesterIsSubagent, expectsCompletionMessage: params.expectsCompletionMessage, + signal: params.signal, }); if (direct.delivered || !params.expectsCompletionMessage) { return direct; @@ -804,6 +836,7 @@ async function deliverSubagentAnnouncement(params: { triggerMessage: params.triggerMessage, summaryLine: params.summaryLine, requesterOrigin: params.requesterOrigin, + signal: params.signal, }); if (queueOutcome === "steered" || queueOutcome === "queued") { return queueOutcomeToDeliveryResult(queueOutcome); @@ -956,6 +989,7 @@ export async function runSubagentAnnounceFlow(params: { announceType?: SubagentAnnounceType; expectsCompletionMessage?: boolean; spawnMode?: SpawnSubagentMode; + signal?: AbortSignal; }): Promise { let didAnnounce = false; const expectsCompletionMessage = params.expectsCompletionMessage === true; @@ -1216,6 +1250,7 @@ export async function runSubagentAnnounceFlow(params: { completionRouteMode: completionResolution.routeMode, spawnMode: params.spawnMode, directIdempotencyKey, + signal: params.signal, }); didAnnounce = delivery.delivered; if (!delivery.delivered && delivery.path === "direct" && delivery.error) { diff --git a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts index f42e314831f..0a3a151e5a6 100644 --- a/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts +++ b/src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts @@ -133,4 +133,56 @@ describe("runCronIsolatedAgentTurn", () => { expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); }); }); + + it("skips structured outbound delivery when timeout abort is already set", async () => { + await withTempCronHome(async (home) => { + const storePath = await writeSessionStore(home, { + lastProvider: "telegram", + lastChannel: "telegram", + lastTo: "123", + }); + const deps: CliDeps = { + sendMessageSlack: vi.fn(), + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn().mockResolvedValue({ + messageId: "t1", + chatId: "123", + }), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + const controller = new AbortController(); + controller.abort("cron: job execution timed out"); + + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: { + ...makeJob({ + kind: "agentTurn", + message: "do it", + }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + signal: controller.signal, + lane: "cron", + }); + + expect(res.status).toBe("error"); + expect(res.error).toContain("timed out"); + expect(deps.sendMessageTelegram).not.toHaveBeenCalled(); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + }); + }); }); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 6d59a8e14e0..5af6119a151 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -156,10 +156,19 @@ export async function runCronIsolatedAgentTurn(params: { job: CronJob; message: string; abortSignal?: AbortSignal; + signal?: AbortSignal; sessionKey: string; agentId?: string; lane?: string; }): Promise { + const abortSignal = params.abortSignal ?? params.signal; + const isAborted = () => abortSignal?.aborted === true; + const abortReason = () => { + const reason = abortSignal?.reason; + return typeof reason === "string" && reason.trim() + ? reason.trim() + : "cron: job execution timed out"; + }; const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1"; const defaultAgentId = resolveDefaultAgentId(params.cfg); const requestedAgentId = @@ -473,8 +482,8 @@ export async function runCronIsolatedAgentTurn(params: { agentDir, fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId), run: (providerOverride, modelOverride) => { - if (params.abortSignal?.aborted) { - throw new Error("cron: isolated run aborted"); + if (abortSignal?.aborted) { + throw new Error(abortReason()); } if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride); @@ -517,7 +526,7 @@ export async function runCronIsolatedAgentTurn(params: { runId: cronSession.sessionEntry.sessionId, requireExplicitMessageTarget: true, disableMessageTool: deliveryRequested, - abortSignal: params.abortSignal, + abortSignal, }); }, }); @@ -529,6 +538,10 @@ export async function runCronIsolatedAgentTurn(params: { return withRunSession({ status: "error", error: String(err) }); } + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason() }); + } + const payloads = runResult.payloads ?? []; // Update token+model fields in the session store. @@ -584,6 +597,10 @@ export async function runCronIsolatedAgentTurn(params: { } await persistSessionEntry(); } + + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } const firstText = payloads[0]?.text ?? ""; let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); let outputText = pickLastNonEmptyTextFromPayloads(payloads); @@ -672,6 +689,9 @@ export async function runCronIsolatedAgentTurn(params: { ? [{ text: synthesizedText }] : []; if (payloadsForDelivery.length > 0) { + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } const deliveryResults = await deliverOutboundPayloads({ cfg: cfgWithAgentDefaults, channel: resolvedDelivery.channel, @@ -683,6 +703,7 @@ export async function runCronIsolatedAgentTurn(params: { identity, bestEffort: deliveryBestEffort, deps: createOutboundSendDeps(params.deps), + abortSignal, }); delivered = deliveryResults.length > 0; } @@ -765,6 +786,9 @@ export async function runCronIsolatedAgentTurn(params: { return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry }); } try { + if (isAborted()) { + return withRunSession({ status: "error", error: abortReason(), ...telemetry }); + } const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: agentSessionKey, childRunId: `${params.job.id}:${runSessionId}`, @@ -785,6 +809,7 @@ export async function runCronIsolatedAgentTurn(params: { endedAt: runEndedAt, outcome: { status: "ok" }, announceType: "cron job", + signal: abortSignal, }); if (didAnnounce) { delivered = true; diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 878fad4149c..a0838b6f6f4 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -731,6 +731,60 @@ describe("Cron issue regressions", () => { expect(job?.state.lastError).toContain("timed out"); }); + it("suppresses isolated follow-up side effects after timeout", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); + const enqueueSystemEvent = vi.fn(); + + const cronJob = createIsolatedRegressionJob({ + id: "timeout-side-effects", + name: "timeout side effects", + scheduledAt, + schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + let now = scheduledAt; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async (params) => { + const abortSignal = params.abortSignal; + await new Promise((resolve, reject) => { + const onAbort = () => { + abortSignal?.removeEventListener("abort", onAbort); + now += 100; + reject(new Error("aborted")); + }; + abortSignal?.addEventListener("abort", onAbort, { once: true }); + }); + return { + status: "ok" as const, + summary: "late-summary", + delivered: false, + error: + abortSignal?.aborted && typeof abortSignal.reason === "string" + ? abortSignal.reason + : undefined, + }; + }), + }); + + await onTimer(state); + + const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects"); + expect(jobAfterTimeout?.state.lastStatus).toBe("error"); + expect(jobAfterTimeout?.state.lastError).toContain("timed out"); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + }); + it("applies timeoutSeconds to manual cron.run isolated executions", async () => { vi.useRealTimers(); const store = await makeStorePath(); diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 3f10e9c29b5..cb403762b56 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -72,8 +72,8 @@ export async function executeJobCoreWithTimeout( executeJobCore(state, job, runAbortController.signal), new Promise((_, reject) => { timeoutId = setTimeout(() => { - runAbortController.abort(new Error("cron: job execution timed out")); - reject(new Error("cron: job execution timed out")); + runAbortController.abort(timeoutErrorMessage()); + reject(new Error(timeoutErrorMessage())); }, jobTimeoutMs); }), ]); @@ -91,6 +91,16 @@ function resolveRunConcurrency(state: CronServiceState): number { } return Math.max(1, Math.floor(raw)); } +function timeoutErrorMessage(): string { + return "cron: job execution timed out"; +} + +function isAbortError(err: unknown): boolean { + if (!(err instanceof Error)) { + return false; + } + return err.name === "AbortError" || err.message === timeoutErrorMessage(); +} /** * Exponential backoff delays (in ms) indexed by consecutive error count. * After the last entry the delay stays constant. @@ -354,14 +364,15 @@ export async function onTimer(state: CronServiceState) { const result = await executeJobCoreWithTimeout(state, job); return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }; } catch (err) { + const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err); state.deps.log.warn( { jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null }, - `cron: job failed: ${String(err)}`, + `cron: job failed: ${errorText}`, ); return { jobId: id, status: "error", - error: String(err), + error: errorText, startedAt, endedAt: state.deps.nowMs(), }; @@ -596,6 +607,9 @@ export async function executeJobCore( job: CronJob, abortSignal?: AbortSignal, ): Promise { + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); if (!text) { @@ -622,6 +636,9 @@ export async function executeJobCore( let heartbeatResult: HeartbeatRunResult; for (;;) { + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } heartbeatResult = await state.deps.runHeartbeatOnce({ reason, agentId: job.agentId, @@ -665,7 +682,7 @@ export async function executeJobCore( return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; } if (abortSignal?.aborted) { - return { status: "error", error: "cron: job execution aborted" }; + return { status: "error", error: timeoutErrorMessage() }; } const res = await state.deps.runIsolatedAgentJob({ @@ -674,6 +691,10 @@ export async function executeJobCore( abortSignal, }); + if (abortSignal?.aborted) { + return { status: "error", error: timeoutErrorMessage() }; + } + // Post a short summary back to the main session — but only when the // isolated run did NOT already deliver its output to the target channel. // When `res.delivered` is true the announce flow (or direct outbound