From a3c51f91c58a76fa3dbd9e808071d0b41c981649 Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Sun, 26 Apr 2026 14:21:01 -0700 Subject: [PATCH] fix: isolate cron context-engine session keys (#72292) --- CHANGELOG.md | 1 + .../isolated-agent.session-identity.test.ts | 2 +- .../delivery-dispatch.double-announce.test.ts | 38 ++++++ src/cron/isolated-agent/delivery-dispatch.ts | 12 +- src/cron/isolated-agent/run-executor.ts | 13 +- .../isolated-agent/run-session-state.test.ts | 20 +-- src/cron/isolated-agent/run-session-state.ts | 12 -- .../isolated-agent/run.interim-retry.test.ts | 6 + .../run.message-tool-policy.test.ts | 1 + .../run.session-key-isolation.test.ts | 119 ++++++++++++++++++ src/cron/isolated-agent/run.ts | 3 +- 11 files changed, 187 insertions(+), 40 deletions(-) create mode 100644 src/cron/isolated-agent/run.session-key-isolation.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 36215c8a7fb..1d493f110f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @codex. - WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis. - Discord/gateway: count failed health-monitor restart attempts toward cooldown and hourly caps, and evict stale account lifecycle state during channel reloads so repeated Discord gateway recovery cannot loop on old status. Fixes #38596. (#40413) Thanks @jellyAI-dev and @vashquez. +- Cron/context engine: run isolated cron jobs under run-scoped context-engine session keys so prior runs of the same job are not inherited unless the job is explicitly session-bound. (#72292) Thanks @jalehman. ## 2026.4.26 diff --git a/src/cron/isolated-agent.session-identity.test.ts b/src/cron/isolated-agent.session-identity.test.ts index 3bdf4851017..1b9bc292a61 100644 --- a/src/cron/isolated-agent.session-identity.test.ts +++ b/src/cron/isolated-agent.session-identity.test.ts @@ -102,7 +102,7 @@ describe("runCronIsolatedAgentTurn session identity", () => { workspaceDir?: string; sessionFile?: string; }; - expect(call?.sessionKey).toBe("agent:ops:cron:job-ops"); + expect(call?.sessionKey).toMatch(/^agent:ops:cron:job-ops:run:/); expect(call?.workspaceDir).toBe(opsWorkspace); expect(call?.sessionFile).toContain(path.join("agents", "ops")); }); diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index c7237e69c15..526a6ebdf24 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -128,6 +128,7 @@ function makeBaseParams(overrides: { runStartedAt?: number; sessionTarget?: string; deliveryBestEffort?: boolean; + runSessionKey?: string; }): Parameters[0] { const resolvedDelivery = makeResolvedDelivery(); const runStartedAt = overrides.runStartedAt ?? Date.now(); @@ -144,6 +145,7 @@ function makeBaseParams(overrides: { } as never, agentId: "main", agentSessionKey: "agent:main", + runSessionKey: overrides.runSessionKey ?? "agent:main", sessionId: "test-session-id", runStartedAt, runEndedAt: runStartedAt, @@ -271,6 +273,42 @@ describe("dispatchCronDelivery — double-announce guard", () => { ); }); + it("uses the run-scoped session key for isolated cron descendant fallback delivery", async () => { + const runStartedAt = 1_000; + const agentSessionKey = "agent:main:cron:daily-monitor"; + const runSessionKey = "agent:main:cron:daily-monitor:run:test-session-id"; + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true); + vi.mocked(readDescendantSubagentFallbackReply).mockImplementation(async (params) => + params.sessionKey === runSessionKey + ? "Run-scoped child result, everything finished successfully." + : undefined, + ); + + const params = makeBaseParams({ + synthesizedText: "on it", + runStartedAt, + runSessionKey, + }); + params.agentSessionKey = agentSessionKey; + + const state = await dispatchCronDelivery(params); + + expect(countActiveDescendantRuns).toHaveBeenCalledWith(runSessionKey); + expect(countActiveDescendantRuns).not.toHaveBeenCalledWith(agentSessionKey); + expect(readDescendantSubagentFallbackReply).toHaveBeenCalledWith({ + sessionKey: runSessionKey, + runStartedAt, + }); + expect(state.deliveryAttempted).toBe(true); + expect(state.delivered).toBe(true); + expect(deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + payloads: [{ text: "Run-scoped child result, everything finished successfully." }], + }), + ); + }); + it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => { vi.mocked(countActiveDescendantRuns).mockReturnValue(0); vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index cbe0bc0fbf4..25afa7825fd 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -104,6 +104,7 @@ type DispatchCronDeliveryParams = { job: CronJob; agentId: string; agentSessionKey: string; + runSessionKey: string; sessionId: string; runStartedAt: number; runEndedAt: number; @@ -684,8 +685,9 @@ export async function dispatchCronDelivery( const initialSynthesizedText = synthesizedText.trim(); const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText); const subagentRegistryRuntime = await loadDeliverySubagentRegistryRuntime(); + const subagentFollowupSessionKey = params.runSessionKey; let activeSubagentRuns = subagentRegistryRuntime.countActiveDescendantRuns( - params.agentSessionKey, + subagentFollowupSessionKey, ); const shouldCheckCompletedDescendants = activeSubagentRuns === 0 && isLikelyInterimCronMessage(initialSynthesizedText); @@ -701,24 +703,24 @@ export async function dispatchCronDelivery( // descendant's output instead of the interim cron text. const completedDescendantReply = shouldCheckCompletedDescendants ? await subagentFollowupRuntime?.readDescendantSubagentFallbackReply({ - sessionKey: params.agentSessionKey, + sessionKey: subagentFollowupSessionKey, runStartedAt: params.runStartedAt, }) : undefined; const hadDescendants = activeSubagentRuns > 0 || Boolean(completedDescendantReply); if (activeSubagentRuns > 0 || expectedSubagentFollowup) { let finalReply = await subagentFollowupRuntime?.waitForDescendantSubagentSummary({ - sessionKey: params.agentSessionKey, + sessionKey: subagentFollowupSessionKey, initialReply: initialSynthesizedText, timeoutMs: params.timeoutMs, observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup, }); activeSubagentRuns = subagentRegistryRuntime.countActiveDescendantRuns( - params.agentSessionKey, + subagentFollowupSessionKey, ); if (!finalReply && activeSubagentRuns === 0) { finalReply = await subagentFollowupRuntime?.readDescendantSubagentFallbackReply({ - sessionKey: params.agentSessionKey, + sessionKey: subagentFollowupSessionKey, runStartedAt: params.runStartedAt, }); } diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index 9beb0b1cb33..18e48796eb4 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -63,6 +63,7 @@ export function createCronPromptExecutor(params: { agentId: string; agentDir: string; agentSessionKey: string; + runSessionKey: string; workspaceDir: string; lane?: string; resolvedVerboseLevel: VerboseLevel; @@ -127,7 +128,7 @@ export function createCronPromptExecutor(params: { : await getCliSessionId(params.cronSession.sessionEntry, providerOverride); const result = await runCliAgent({ sessionId: params.cronSession.sessionEntry.sessionId, - sessionKey: params.agentSessionKey, + sessionKey: params.runSessionKey, agentId: params.agentId, trigger: "cron", jobId: params.job.id, @@ -162,7 +163,7 @@ export function createCronPromptExecutor(params: { }); const result = await runEmbeddedPiAgent({ sessionId: params.cronSession.sessionEntry.sessionId, - sessionKey: params.agentSessionKey, + sessionKey: params.runSessionKey, agentId: params.agentId, trigger: "cron", jobId: params.job.id, @@ -248,6 +249,7 @@ export async function executeCronRun(params: { agentId: string; agentDir: string; agentSessionKey: string; + runSessionKey: string; workspaceDir: string; lane?: string; resolvedDelivery: { @@ -281,7 +283,7 @@ export async function executeCronRun(params: { normalizeVerboseLevel(params.agentVerboseDefault) ?? "off"; registerAgentRunContext(params.cronSession.sessionEntry.sessionId, { - sessionKey: params.agentSessionKey, + sessionKey: params.runSessionKey, verboseLevel: resolvedVerboseLevel, }); const executor = createCronPromptExecutor({ @@ -291,6 +293,7 @@ export async function executeCronRun(params: { agentId: params.agentId, agentDir: params.agentDir, agentSessionKey: params.agentSessionKey, + runSessionKey: params.runSessionKey, workspaceDir: params.workspaceDir, lane: params.lane, resolvedVerboseLevel, @@ -378,12 +381,12 @@ export async function executeCronRun(params: { if (shouldRetryInterimAck) { const { countActiveDescendantRuns, listDescendantRunsForRequester } = await loadCronSubagentRegistryRuntime(); - hasFreshDescendants = listDescendantRunsForRequester(params.agentSessionKey).some((entry) => { + hasFreshDescendants = listDescendantRunsForRequester(params.runSessionKey).some((entry) => { const descendantStartedAt = typeof entry.startedAt === "number" ? entry.startedAt : entry.createdAt; return typeof descendantStartedAt === "number" && descendantStartedAt >= runStartedAt; }); - hasActiveDescendants = countActiveDescendantRuns(params.agentSessionKey) > 0; + hasActiveDescendants = countActiveDescendantRuns(params.runSessionKey) > 0; } if (shouldRetryInterimAck && !hasFreshDescendants && !hasActiveDescendants) { diff --git a/src/cron/isolated-agent/run-session-state.test.ts b/src/cron/isolated-agent/run-session-state.test.ts index 9415e043a59..816c258967d 100644 --- a/src/cron/isolated-agent/run-session-state.test.ts +++ b/src/cron/isolated-agent/run-session-state.test.ts @@ -23,7 +23,7 @@ function makeCronSession(entry = makeSessionEntry()): MutableCronSession { } describe("createPersistCronSessionEntry", () => { - it("persists a distinct run-session snapshot for isolated cron runs", async () => { + it("persists isolated cron state only under the stable cron session key", async () => { const cronSession = makeCronSession( makeSessionEntry({ status: "running", @@ -39,8 +39,7 @@ describe("createPersistCronSessionEntry", () => { const store: Record = {}; update(store); expect(store["agent:main:cron:job"]).toBe(cronSession.sessionEntry); - expect(store["agent:main:cron:job:run:run-session-id"]).not.toBe(cronSession.sessionEntry); - expect(store["agent:main:cron:job:run:run-session-id"]).toEqual(cronSession.sessionEntry); + expect(store["agent:main:cron:job:run:run-session-id"]).toBeUndefined(); }, ); @@ -48,26 +47,16 @@ describe("createPersistCronSessionEntry", () => { isFastTestEnv: false, cronSession, agentSessionKey: "agent:main:cron:job", - runSessionKey: "agent:main:cron:job:run:run-session-id", updateSessionStore, }); await persist(); expect(cronSession.store["agent:main:cron:job"]).toBe(cronSession.sessionEntry); - expect(cronSession.store["agent:main:cron:job:run:run-session-id"]).not.toBe( - cronSession.sessionEntry, - ); - - cronSession.sessionEntry.status = "done"; - cronSession.sessionEntry.skillsSnapshot!.skills[0].name = "changed"; - expect(cronSession.store["agent:main:cron:job:run:run-session-id"]?.status).toBe("running"); - expect( - cronSession.store["agent:main:cron:job:run:run-session-id"]?.skillsSnapshot?.skills[0]?.name, - ).toBe("memory"); + expect(cronSession.store["agent:main:cron:job:run:run-session-id"]).toBeUndefined(); }); - it("uses the shared session entry when the run key is the agent session key", async () => { + it("persists explicit session-bound cron state under the requested session key", async () => { const cronSession = makeCronSession(); const updateSessionStore = vi.fn( async (_storePath, update: (store: Record) => void) => { @@ -81,7 +70,6 @@ describe("createPersistCronSessionEntry", () => { isFastTestEnv: false, cronSession, agentSessionKey: "agent:main:session", - runSessionKey: "agent:main:session", updateSessionStore, }); diff --git a/src/cron/isolated-agent/run-session-state.ts b/src/cron/isolated-agent/run-session-state.ts index adeeb7ef513..23e9bbd11df 100644 --- a/src/cron/isolated-agent/run-session-state.ts +++ b/src/cron/isolated-agent/run-session-state.ts @@ -19,31 +19,19 @@ type UpdateSessionStore = ( export type PersistCronSessionEntry = () => Promise; -function cloneSessionEntry(entry: MutableCronSessionEntry): MutableCronSessionEntry { - return globalThis.structuredClone(entry); -} - export function createPersistCronSessionEntry(params: { isFastTestEnv: boolean; cronSession: MutableCronSession; agentSessionKey: string; - runSessionKey: string; updateSessionStore: UpdateSessionStore; }): PersistCronSessionEntry { return async () => { if (params.isFastTestEnv) { return; } - const runSessionEntry = cloneSessionEntry(params.cronSession.sessionEntry); params.cronSession.store[params.agentSessionKey] = params.cronSession.sessionEntry; - if (params.runSessionKey !== params.agentSessionKey) { - params.cronSession.store[params.runSessionKey] = runSessionEntry; - } await params.updateSessionStore(params.cronSession.storePath, (store) => { store[params.agentSessionKey] = params.cronSession.sessionEntry; - if (params.runSessionKey !== params.agentSessionKey) { - store[params.runSessionKey] = runSessionEntry; - } }); }; } diff --git a/src/cron/isolated-agent/run.interim-retry.test.ts b/src/cron/isolated-agent/run.interim-retry.test.ts index 6f01a2e9232..2c799869a44 100644 --- a/src/cron/isolated-agent/run.interim-retry.test.ts +++ b/src/cron/isolated-agent/run.interim-retry.test.ts @@ -89,5 +89,11 @@ describe("runCronIsolatedAgentTurn — interim ack retry", () => { mockRunCronFallbackPassthrough(); await runTurnAndExpectOk(1, 1); + expect(listDescendantRunsForRequesterMock).toHaveBeenCalledWith( + "agent:default:cron:test:run:test-session-id", + ); + expect(countActiveDescendantRunsMock).toHaveBeenCalledWith( + "agent:default:cron:test:run:test-session-id", + ); }); }); diff --git a/src/cron/isolated-agent/run.message-tool-policy.test.ts b/src/cron/isolated-agent/run.message-tool-policy.test.ts index aab583a87ea..5e1d68760b6 100644 --- a/src/cron/isolated-agent/run.message-tool-policy.test.ts +++ b/src/cron/isolated-agent/run.message-tool-policy.test.ts @@ -236,6 +236,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => { agentId: "default", agentDir: "/tmp/agent-dir", agentSessionKey: "cron:message-tool-policy", + runSessionKey: "cron:message-tool-policy:run:test-session-id", workspaceDir: "/tmp/workspace", resolvedVerboseLevel: "off", thinkLevel: undefined, diff --git a/src/cron/isolated-agent/run.session-key-isolation.test.ts b/src/cron/isolated-agent/run.session-key-isolation.test.ts new file mode 100644 index 00000000000..b523dc0daaf --- /dev/null +++ b/src/cron/isolated-agent/run.session-key-isolation.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it } from "vitest"; +import { + makeIsolatedAgentTurnJob, + makeIsolatedAgentTurnParams, + setupRunCronIsolatedAgentTurnSuite, +} from "./run.suite-helpers.js"; +import { + isCliProviderMock, + loadRunCronIsolatedAgentTurn, + makeCronSession, + mockRunCronFallbackPassthrough, + resolveCronSessionMock, + runCliAgentMock, + runEmbeddedPiAgentMock, +} from "./run.test-harness.js"; + +const runCronIsolatedAgentTurn = await loadRunCronIsolatedAgentTurn(); + +describe("runCronIsolatedAgentTurn isolated session identity", () => { + setupRunCronIsolatedAgentTurnSuite(); + + it("uses a run-scoped key for embedded isolated cron execution", async () => { + resolveCronSessionMock.mockReturnValue( + makeCronSession({ + sessionEntry: { + ...makeCronSession().sessionEntry, + sessionId: "isolated-run-1", + }, + }), + ); + mockRunCronFallbackPassthrough(); + + const result = await runCronIsolatedAgentTurn( + makeIsolatedAgentTurnParams({ + sessionKey: "cron:daily-monitor", + }), + ); + + expect(result.status).toBe("ok"); + expect(result.sessionKey).toBe("agent:default:cron:daily-monitor:run:isolated-run-1"); + expect(resolveCronSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + forceNew: true, + sessionKey: "agent:default:cron:daily-monitor", + }), + ); + expect(runEmbeddedPiAgentMock).toHaveBeenCalledOnce(); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + sessionId: "isolated-run-1", + sessionKey: "agent:default:cron:daily-monitor:run:isolated-run-1", + }); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.sessionKey).not.toBe( + "agent:default:cron:daily-monitor", + ); + }); + + it("keeps explicit session-bound cron execution on the requested session key", async () => { + resolveCronSessionMock.mockReturnValue( + makeCronSession({ + sessionEntry: { + ...makeCronSession().sessionEntry, + sessionId: "bound-run-1", + }, + }), + ); + mockRunCronFallbackPassthrough(); + + const result = await runCronIsolatedAgentTurn( + makeIsolatedAgentTurnParams({ + sessionKey: "project-alpha-monitor", + job: makeIsolatedAgentTurnJob({ + sessionTarget: "session:project-alpha-monitor", + }), + }), + ); + + expect(result.status).toBe("ok"); + expect(result.sessionKey).toBe("agent:default:project-alpha-monitor"); + expect(runEmbeddedPiAgentMock).toHaveBeenCalledOnce(); + expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({ + sessionId: "bound-run-1", + sessionKey: "agent:default:project-alpha-monitor", + }); + }); + + it("uses a run-scoped key for CLI isolated cron execution", async () => { + isCliProviderMock.mockReturnValue(true); + resolveCronSessionMock.mockReturnValue( + makeCronSession({ + sessionEntry: { + ...makeCronSession().sessionEntry, + sessionId: "isolated-cli-run-1", + }, + }), + ); + mockRunCronFallbackPassthrough(); + runCliAgentMock.mockResolvedValue({ + payloads: [{ text: "done" }], + meta: { agentMeta: { usage: { input: 10, output: 20 } } }, + }); + + const result = await runCronIsolatedAgentTurn( + makeIsolatedAgentTurnParams({ + sessionKey: "cron:cli-monitor", + }), + ); + + expect(result.status).toBe("ok"); + expect(result.sessionKey).toBe("agent:default:cron:cli-monitor:run:isolated-cli-run-1"); + expect(runCliAgentMock).toHaveBeenCalledOnce(); + expect(runCliAgentMock.mock.calls[0]?.[0]).toMatchObject({ + sessionId: "isolated-cli-run-1", + sessionKey: "agent:default:cron:cli-monitor:run:isolated-cli-run-1", + }); + expect(runCliAgentMock.mock.calls[0]?.[0]?.sessionKey).not.toBe( + "agent:default:cron:cli-monitor", + ); + }); +}); diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index f1168a700e6..577d0544493 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -521,7 +521,6 @@ async function prepareCronRunContext(params: { isFastTestEnv: params.isFastTestEnv, cronSession, agentSessionKey, - runSessionKey, updateSessionStore: async (storePath, update) => { const { updateSessionStore } = await loadSessionStoreRuntime(); await updateSessionStore(storePath, update); @@ -894,6 +893,7 @@ async function finalizeCronRun(params: { job: prepared.input.job, agentId: prepared.agentId, agentSessionKey: prepared.agentSessionKey, + runSessionKey: prepared.runSessionKey, sessionId: prepared.runSessionId, runStartedAt: execution.runStartedAt, runEndedAt: execution.runEndedAt, @@ -982,6 +982,7 @@ export async function runCronIsolatedAgentTurn(params: { agentId: prepared.context.agentId, agentDir: prepared.context.agentDir, agentSessionKey: prepared.context.agentSessionKey, + runSessionKey: prepared.context.runSessionKey, workspaceDir: prepared.context.workspaceDir, lane: params.lane, resolvedDelivery: {