mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(cron): cancel timed-out runs before side effects (openclaw#22411) thanks @Takhoffman
Verified: - pnpm check - pnpm vitest run src/memory/qmd-manager.test.ts src/cron/service.issue-regressions.test.ts src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts --maxWorkers=1 Co-authored-by: Takhoffman <781889+Takhoffman@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -556,7 +556,11 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
triggerMessage: string;
|
||||
summaryLine?: string;
|
||||
requesterOrigin?: DeliveryContext;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<"steered" | "queued" | "none"> {
|
||||
if (params.signal?.aborted) {
|
||||
return "none";
|
||||
}
|
||||
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
||||
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
||||
const sessionId = entry?.sessionId;
|
||||
@@ -637,7 +641,14 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
completionDirectOrigin?: DeliveryContext;
|
||||
directOrigin?: DeliveryContext;
|
||||
requesterIsSubagent: boolean;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
const cfg = loadConfig();
|
||||
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
|
||||
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
|
||||
@@ -691,6 +702,12 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
|
||||
? String(completionDirectOrigin.threadId)
|
||||
: undefined;
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
await callGateway({
|
||||
method: "send",
|
||||
params: {
|
||||
@@ -717,6 +734,12 @@ async function sendSubagentAnnounceDirectly(params: {
|
||||
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
||||
? String(directOrigin.threadId)
|
||||
: undefined;
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
await callGateway({
|
||||
method: "agent",
|
||||
params: {
|
||||
@@ -761,7 +784,14 @@ async function deliverSubagentAnnouncement(params: {
|
||||
completionRouteMode?: "bound" | "fallback" | "hook";
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
directIdempotencyKey: string;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<SubagentAnnounceDeliveryResult> {
|
||||
if (params.signal?.aborted) {
|
||||
return {
|
||||
delivered: false,
|
||||
path: "none",
|
||||
};
|
||||
}
|
||||
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
|
||||
// then (only if not queued) attempt direct delivery.
|
||||
if (!params.expectsCompletionMessage) {
|
||||
@@ -771,6 +801,7 @@ async function deliverSubagentAnnouncement(params: {
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
signal: params.signal,
|
||||
});
|
||||
const queued = queueOutcomeToDeliveryResult(queueOutcome);
|
||||
if (queued.delivered) {
|
||||
@@ -791,6 +822,7 @@ async function deliverSubagentAnnouncement(params: {
|
||||
directOrigin: params.directOrigin,
|
||||
requesterIsSubagent: params.requesterIsSubagent,
|
||||
expectsCompletionMessage: params.expectsCompletionMessage,
|
||||
signal: params.signal,
|
||||
});
|
||||
if (direct.delivered || !params.expectsCompletionMessage) {
|
||||
return direct;
|
||||
@@ -804,6 +836,7 @@ async function deliverSubagentAnnouncement(params: {
|
||||
triggerMessage: params.triggerMessage,
|
||||
summaryLine: params.summaryLine,
|
||||
requesterOrigin: params.requesterOrigin,
|
||||
signal: params.signal,
|
||||
});
|
||||
if (queueOutcome === "steered" || queueOutcome === "queued") {
|
||||
return queueOutcomeToDeliveryResult(queueOutcome);
|
||||
@@ -956,6 +989,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
announceType?: SubagentAnnounceType;
|
||||
expectsCompletionMessage?: boolean;
|
||||
spawnMode?: SpawnSubagentMode;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<boolean> {
|
||||
let didAnnounce = false;
|
||||
const expectsCompletionMessage = params.expectsCompletionMessage === true;
|
||||
@@ -1216,6 +1250,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
completionRouteMode: completionResolution.routeMode,
|
||||
spawnMode: params.spawnMode,
|
||||
directIdempotencyKey,
|
||||
signal: params.signal,
|
||||
});
|
||||
didAnnounce = delivery.delivered;
|
||||
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
|
||||
|
||||
@@ -133,4 +133,56 @@ describe("runCronIsolatedAgentTurn", () => {
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("skips structured outbound delivery when timeout abort is already set", async () => {
|
||||
await withTempCronHome(async (home) => {
|
||||
const storePath = await writeSessionStore(home, {
|
||||
lastProvider: "telegram",
|
||||
lastChannel: "telegram",
|
||||
lastTo: "123",
|
||||
});
|
||||
const deps: CliDeps = {
|
||||
sendMessageSlack: vi.fn(),
|
||||
sendMessageWhatsApp: vi.fn(),
|
||||
sendMessageTelegram: vi.fn().mockResolvedValue({
|
||||
messageId: "t1",
|
||||
chatId: "123",
|
||||
}),
|
||||
sendMessageDiscord: vi.fn(),
|
||||
sendMessageSignal: vi.fn(),
|
||||
sendMessageIMessage: vi.fn(),
|
||||
};
|
||||
const controller = new AbortController();
|
||||
controller.abort("cron: job execution timed out");
|
||||
|
||||
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||
payloads: [{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }],
|
||||
meta: {
|
||||
durationMs: 5,
|
||||
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||
},
|
||||
});
|
||||
|
||||
const res = await runCronIsolatedAgentTurn({
|
||||
cfg: makeCfg(home, storePath),
|
||||
deps,
|
||||
job: {
|
||||
...makeJob({
|
||||
kind: "agentTurn",
|
||||
message: "do it",
|
||||
}),
|
||||
delivery: { mode: "announce", channel: "telegram", to: "123" },
|
||||
},
|
||||
message: "do it",
|
||||
sessionKey: "cron:job-1",
|
||||
signal: controller.signal,
|
||||
lane: "cron",
|
||||
});
|
||||
|
||||
expect(res.status).toBe("error");
|
||||
expect(res.error).toContain("timed out");
|
||||
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
|
||||
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -156,10 +156,19 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
job: CronJob;
|
||||
message: string;
|
||||
abortSignal?: AbortSignal;
|
||||
signal?: AbortSignal;
|
||||
sessionKey: string;
|
||||
agentId?: string;
|
||||
lane?: string;
|
||||
}): Promise<RunCronAgentTurnResult> {
|
||||
const abortSignal = params.abortSignal ?? params.signal;
|
||||
const isAborted = () => abortSignal?.aborted === true;
|
||||
const abortReason = () => {
|
||||
const reason = abortSignal?.reason;
|
||||
return typeof reason === "string" && reason.trim()
|
||||
? reason.trim()
|
||||
: "cron: job execution timed out";
|
||||
};
|
||||
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
|
||||
const defaultAgentId = resolveDefaultAgentId(params.cfg);
|
||||
const requestedAgentId =
|
||||
@@ -473,8 +482,8 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
agentDir,
|
||||
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
|
||||
run: (providerOverride, modelOverride) => {
|
||||
if (params.abortSignal?.aborted) {
|
||||
throw new Error("cron: isolated run aborted");
|
||||
if (abortSignal?.aborted) {
|
||||
throw new Error(abortReason());
|
||||
}
|
||||
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
|
||||
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
|
||||
@@ -517,7 +526,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
runId: cronSession.sessionEntry.sessionId,
|
||||
requireExplicitMessageTarget: true,
|
||||
disableMessageTool: deliveryRequested,
|
||||
abortSignal: params.abortSignal,
|
||||
abortSignal,
|
||||
});
|
||||
},
|
||||
});
|
||||
@@ -529,6 +538,10 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
return withRunSession({ status: "error", error: String(err) });
|
||||
}
|
||||
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason() });
|
||||
}
|
||||
|
||||
const payloads = runResult.payloads ?? [];
|
||||
|
||||
// Update token+model fields in the session store.
|
||||
@@ -584,6 +597,10 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
}
|
||||
await persistSessionEntry();
|
||||
}
|
||||
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
||||
}
|
||||
const firstText = payloads[0]?.text ?? "";
|
||||
let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
|
||||
let outputText = pickLastNonEmptyTextFromPayloads(payloads);
|
||||
@@ -672,6 +689,9 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
? [{ text: synthesizedText }]
|
||||
: [];
|
||||
if (payloadsForDelivery.length > 0) {
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
||||
}
|
||||
const deliveryResults = await deliverOutboundPayloads({
|
||||
cfg: cfgWithAgentDefaults,
|
||||
channel: resolvedDelivery.channel,
|
||||
@@ -683,6 +703,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
identity,
|
||||
bestEffort: deliveryBestEffort,
|
||||
deps: createOutboundSendDeps(params.deps),
|
||||
abortSignal,
|
||||
});
|
||||
delivered = deliveryResults.length > 0;
|
||||
}
|
||||
@@ -765,6 +786,9 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry });
|
||||
}
|
||||
try {
|
||||
if (isAborted()) {
|
||||
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
|
||||
}
|
||||
const didAnnounce = await runSubagentAnnounceFlow({
|
||||
childSessionKey: agentSessionKey,
|
||||
childRunId: `${params.job.id}:${runSessionId}`,
|
||||
@@ -785,6 +809,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
endedAt: runEndedAt,
|
||||
outcome: { status: "ok" },
|
||||
announceType: "cron job",
|
||||
signal: abortSignal,
|
||||
});
|
||||
if (didAnnounce) {
|
||||
delivered = true;
|
||||
|
||||
@@ -731,6 +731,60 @@ describe("Cron issue regressions", () => {
|
||||
expect(job?.state.lastError).toContain("timed out");
|
||||
});
|
||||
|
||||
it("suppresses isolated follow-up side effects after timeout", async () => {
|
||||
vi.useRealTimers();
|
||||
const store = await makeStorePath();
|
||||
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
|
||||
const cronJob = createIsolatedRegressionJob({
|
||||
id: "timeout-side-effects",
|
||||
name: "timeout side effects",
|
||||
scheduledAt,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt },
|
||||
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 },
|
||||
state: { nextRunAtMs: scheduledAt },
|
||||
});
|
||||
await writeCronJobs(store.storePath, [cronJob]);
|
||||
|
||||
let now = scheduledAt;
|
||||
const state = createCronServiceState({
|
||||
cronEnabled: true,
|
||||
storePath: store.storePath,
|
||||
log: noopLogger,
|
||||
nowMs: () => now,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
runIsolatedAgentJob: vi.fn(async (params) => {
|
||||
const abortSignal = params.abortSignal;
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
abortSignal?.removeEventListener("abort", onAbort);
|
||||
now += 100;
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
abortSignal?.addEventListener("abort", onAbort, { once: true });
|
||||
});
|
||||
return {
|
||||
status: "ok" as const,
|
||||
summary: "late-summary",
|
||||
delivered: false,
|
||||
error:
|
||||
abortSignal?.aborted && typeof abortSignal.reason === "string"
|
||||
? abortSignal.reason
|
||||
: undefined,
|
||||
};
|
||||
}),
|
||||
});
|
||||
|
||||
await onTimer(state);
|
||||
|
||||
const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects");
|
||||
expect(jobAfterTimeout?.state.lastStatus).toBe("error");
|
||||
expect(jobAfterTimeout?.state.lastError).toContain("timed out");
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("applies timeoutSeconds to manual cron.run isolated executions", async () => {
|
||||
vi.useRealTimers();
|
||||
const store = await makeStorePath();
|
||||
|
||||
@@ -72,8 +72,8 @@ export async function executeJobCoreWithTimeout(
|
||||
executeJobCore(state, job, runAbortController.signal),
|
||||
new Promise<never>((_, reject) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
runAbortController.abort(new Error("cron: job execution timed out"));
|
||||
reject(new Error("cron: job execution timed out"));
|
||||
runAbortController.abort(timeoutErrorMessage());
|
||||
reject(new Error(timeoutErrorMessage()));
|
||||
}, jobTimeoutMs);
|
||||
}),
|
||||
]);
|
||||
@@ -91,6 +91,16 @@ function resolveRunConcurrency(state: CronServiceState): number {
|
||||
}
|
||||
return Math.max(1, Math.floor(raw));
|
||||
}
|
||||
function timeoutErrorMessage(): string {
|
||||
return "cron: job execution timed out";
|
||||
}
|
||||
|
||||
function isAbortError(err: unknown): boolean {
|
||||
if (!(err instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
return err.name === "AbortError" || err.message === timeoutErrorMessage();
|
||||
}
|
||||
/**
|
||||
* Exponential backoff delays (in ms) indexed by consecutive error count.
|
||||
* After the last entry the delay stays constant.
|
||||
@@ -354,14 +364,15 @@ export async function onTimer(state: CronServiceState) {
|
||||
const result = await executeJobCoreWithTimeout(state, job);
|
||||
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
|
||||
} catch (err) {
|
||||
const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err);
|
||||
state.deps.log.warn(
|
||||
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
|
||||
`cron: job failed: ${String(err)}`,
|
||||
`cron: job failed: ${errorText}`,
|
||||
);
|
||||
return {
|
||||
jobId: id,
|
||||
status: "error",
|
||||
error: String(err),
|
||||
error: errorText,
|
||||
startedAt,
|
||||
endedAt: state.deps.nowMs(),
|
||||
};
|
||||
@@ -596,6 +607,9 @@ export async function executeJobCore(
|
||||
job: CronJob,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<CronRunOutcome & CronRunTelemetry & { delivered?: boolean }> {
|
||||
if (abortSignal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
if (job.sessionTarget === "main") {
|
||||
const text = resolveJobPayloadTextForMain(job);
|
||||
if (!text) {
|
||||
@@ -622,6 +636,9 @@ export async function executeJobCore(
|
||||
|
||||
let heartbeatResult: HeartbeatRunResult;
|
||||
for (;;) {
|
||||
if (abortSignal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
heartbeatResult = await state.deps.runHeartbeatOnce({
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
@@ -665,7 +682,7 @@ export async function executeJobCore(
|
||||
return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" };
|
||||
}
|
||||
if (abortSignal?.aborted) {
|
||||
return { status: "error", error: "cron: job execution aborted" };
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
const res = await state.deps.runIsolatedAgentJob({
|
||||
@@ -674,6 +691,10 @@ export async function executeJobCore(
|
||||
abortSignal,
|
||||
});
|
||||
|
||||
if (abortSignal?.aborted) {
|
||||
return { status: "error", error: timeoutErrorMessage() };
|
||||
}
|
||||
|
||||
// Post a short summary back to the main session — but only when the
|
||||
// isolated run did NOT already deliver its output to the target channel.
|
||||
// When `res.delivered` is true the announce flow (or direct outbound
|
||||
|
||||
Reference in New Issue
Block a user