mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix: isolate cron context-engine session keys (#72292)
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugins: share package entrypoint resolution between install and discovery, reject mismatched `runtimeExtensions`, and cache bundled runtime-dependency manifest reads during scans. Thanks @codex.
|
||||
- WhatsApp/Web: keep quiet but healthy linked-device sessions connected by basing the watchdog on WhatsApp Web transport activity, while retaining a longer app-silence cap so frame activity cannot mask a stuck session forever. Fixes #70678; carries forward the focused #71466 approach and keeps #63939 as related configurable-timeout follow-up. Thanks @vincentkoc and @oromeis.
|
||||
- Discord/gateway: count failed health-monitor restart attempts toward cooldown and hourly caps, and evict stale account lifecycle state during channel reloads so repeated Discord gateway recovery cannot loop on old status. Fixes #38596. (#40413) Thanks @jellyAI-dev and @vashquez.
|
||||
- Cron/context engine: run isolated cron jobs under run-scoped context-engine session keys so prior runs of the same job are not inherited unless the job is explicitly session-bound. (#72292) Thanks @jalehman.
|
||||
|
||||
## 2026.4.26
|
||||
|
||||
|
||||
@@ -102,7 +102,7 @@ describe("runCronIsolatedAgentTurn session identity", () => {
|
||||
workspaceDir?: string;
|
||||
sessionFile?: string;
|
||||
};
|
||||
expect(call?.sessionKey).toBe("agent:ops:cron:job-ops");
|
||||
expect(call?.sessionKey).toMatch(/^agent:ops:cron:job-ops:run:/);
|
||||
expect(call?.workspaceDir).toBe(opsWorkspace);
|
||||
expect(call?.sessionFile).toContain(path.join("agents", "ops"));
|
||||
});
|
||||
|
||||
@@ -128,6 +128,7 @@ function makeBaseParams(overrides: {
|
||||
runStartedAt?: number;
|
||||
sessionTarget?: string;
|
||||
deliveryBestEffort?: boolean;
|
||||
runSessionKey?: string;
|
||||
}): Parameters<typeof dispatchCronDelivery>[0] {
|
||||
const resolvedDelivery = makeResolvedDelivery();
|
||||
const runStartedAt = overrides.runStartedAt ?? Date.now();
|
||||
@@ -144,6 +145,7 @@ function makeBaseParams(overrides: {
|
||||
} as never,
|
||||
agentId: "main",
|
||||
agentSessionKey: "agent:main",
|
||||
runSessionKey: overrides.runSessionKey ?? "agent:main",
|
||||
sessionId: "test-session-id",
|
||||
runStartedAt,
|
||||
runEndedAt: runStartedAt,
|
||||
@@ -271,6 +273,42 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the run-scoped session key for isolated cron descendant fallback delivery", async () => {
|
||||
const runStartedAt = 1_000;
|
||||
const agentSessionKey = "agent:main:cron:daily-monitor";
|
||||
const runSessionKey = "agent:main:cron:daily-monitor:run:test-session-id";
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
|
||||
vi.mocked(readDescendantSubagentFallbackReply).mockImplementation(async (params) =>
|
||||
params.sessionKey === runSessionKey
|
||||
? "Run-scoped child result, everything finished successfully."
|
||||
: undefined,
|
||||
);
|
||||
|
||||
const params = makeBaseParams({
|
||||
synthesizedText: "on it",
|
||||
runStartedAt,
|
||||
runSessionKey,
|
||||
});
|
||||
params.agentSessionKey = agentSessionKey;
|
||||
|
||||
const state = await dispatchCronDelivery(params);
|
||||
|
||||
expect(countActiveDescendantRuns).toHaveBeenCalledWith(runSessionKey);
|
||||
expect(countActiveDescendantRuns).not.toHaveBeenCalledWith(agentSessionKey);
|
||||
expect(readDescendantSubagentFallbackReply).toHaveBeenCalledWith({
|
||||
sessionKey: runSessionKey,
|
||||
runStartedAt,
|
||||
});
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
expect(state.delivered).toBe(true);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
payloads: [{ text: "Run-scoped child result, everything finished successfully." }],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("normal text delivery sends exactly once and sets deliveryAttempted=true", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
|
||||
@@ -104,6 +104,7 @@ type DispatchCronDeliveryParams = {
|
||||
job: CronJob;
|
||||
agentId: string;
|
||||
agentSessionKey: string;
|
||||
runSessionKey: string;
|
||||
sessionId: string;
|
||||
runStartedAt: number;
|
||||
runEndedAt: number;
|
||||
@@ -684,8 +685,9 @@ export async function dispatchCronDelivery(
|
||||
const initialSynthesizedText = synthesizedText.trim();
|
||||
const expectedSubagentFollowup = expectsSubagentFollowup(initialSynthesizedText);
|
||||
const subagentRegistryRuntime = await loadDeliverySubagentRegistryRuntime();
|
||||
const subagentFollowupSessionKey = params.runSessionKey;
|
||||
let activeSubagentRuns = subagentRegistryRuntime.countActiveDescendantRuns(
|
||||
params.agentSessionKey,
|
||||
subagentFollowupSessionKey,
|
||||
);
|
||||
const shouldCheckCompletedDescendants =
|
||||
activeSubagentRuns === 0 && isLikelyInterimCronMessage(initialSynthesizedText);
|
||||
@@ -701,24 +703,24 @@ export async function dispatchCronDelivery(
|
||||
// descendant's output instead of the interim cron text.
|
||||
const completedDescendantReply = shouldCheckCompletedDescendants
|
||||
? await subagentFollowupRuntime?.readDescendantSubagentFallbackReply({
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: subagentFollowupSessionKey,
|
||||
runStartedAt: params.runStartedAt,
|
||||
})
|
||||
: undefined;
|
||||
const hadDescendants = activeSubagentRuns > 0 || Boolean(completedDescendantReply);
|
||||
if (activeSubagentRuns > 0 || expectedSubagentFollowup) {
|
||||
let finalReply = await subagentFollowupRuntime?.waitForDescendantSubagentSummary({
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: subagentFollowupSessionKey,
|
||||
initialReply: initialSynthesizedText,
|
||||
timeoutMs: params.timeoutMs,
|
||||
observedActiveDescendants: activeSubagentRuns > 0 || expectedSubagentFollowup,
|
||||
});
|
||||
activeSubagentRuns = subagentRegistryRuntime.countActiveDescendantRuns(
|
||||
params.agentSessionKey,
|
||||
subagentFollowupSessionKey,
|
||||
);
|
||||
if (!finalReply && activeSubagentRuns === 0) {
|
||||
finalReply = await subagentFollowupRuntime?.readDescendantSubagentFallbackReply({
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: subagentFollowupSessionKey,
|
||||
runStartedAt: params.runStartedAt,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ export function createCronPromptExecutor(params: {
|
||||
agentId: string;
|
||||
agentDir: string;
|
||||
agentSessionKey: string;
|
||||
runSessionKey: string;
|
||||
workspaceDir: string;
|
||||
lane?: string;
|
||||
resolvedVerboseLevel: VerboseLevel;
|
||||
@@ -127,7 +128,7 @@ export function createCronPromptExecutor(params: {
|
||||
: await getCliSessionId(params.cronSession.sessionEntry, providerOverride);
|
||||
const result = await runCliAgent({
|
||||
sessionId: params.cronSession.sessionEntry.sessionId,
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: params.runSessionKey,
|
||||
agentId: params.agentId,
|
||||
trigger: "cron",
|
||||
jobId: params.job.id,
|
||||
@@ -162,7 +163,7 @@ export function createCronPromptExecutor(params: {
|
||||
});
|
||||
const result = await runEmbeddedPiAgent({
|
||||
sessionId: params.cronSession.sessionEntry.sessionId,
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: params.runSessionKey,
|
||||
agentId: params.agentId,
|
||||
trigger: "cron",
|
||||
jobId: params.job.id,
|
||||
@@ -248,6 +249,7 @@ export async function executeCronRun(params: {
|
||||
agentId: string;
|
||||
agentDir: string;
|
||||
agentSessionKey: string;
|
||||
runSessionKey: string;
|
||||
workspaceDir: string;
|
||||
lane?: string;
|
||||
resolvedDelivery: {
|
||||
@@ -281,7 +283,7 @@ export async function executeCronRun(params: {
|
||||
normalizeVerboseLevel(params.agentVerboseDefault) ??
|
||||
"off";
|
||||
registerAgentRunContext(params.cronSession.sessionEntry.sessionId, {
|
||||
sessionKey: params.agentSessionKey,
|
||||
sessionKey: params.runSessionKey,
|
||||
verboseLevel: resolvedVerboseLevel,
|
||||
});
|
||||
const executor = createCronPromptExecutor({
|
||||
@@ -291,6 +293,7 @@ export async function executeCronRun(params: {
|
||||
agentId: params.agentId,
|
||||
agentDir: params.agentDir,
|
||||
agentSessionKey: params.agentSessionKey,
|
||||
runSessionKey: params.runSessionKey,
|
||||
workspaceDir: params.workspaceDir,
|
||||
lane: params.lane,
|
||||
resolvedVerboseLevel,
|
||||
@@ -378,12 +381,12 @@ export async function executeCronRun(params: {
|
||||
if (shouldRetryInterimAck) {
|
||||
const { countActiveDescendantRuns, listDescendantRunsForRequester } =
|
||||
await loadCronSubagentRegistryRuntime();
|
||||
hasFreshDescendants = listDescendantRunsForRequester(params.agentSessionKey).some((entry) => {
|
||||
hasFreshDescendants = listDescendantRunsForRequester(params.runSessionKey).some((entry) => {
|
||||
const descendantStartedAt =
|
||||
typeof entry.startedAt === "number" ? entry.startedAt : entry.createdAt;
|
||||
return typeof descendantStartedAt === "number" && descendantStartedAt >= runStartedAt;
|
||||
});
|
||||
hasActiveDescendants = countActiveDescendantRuns(params.agentSessionKey) > 0;
|
||||
hasActiveDescendants = countActiveDescendantRuns(params.runSessionKey) > 0;
|
||||
}
|
||||
|
||||
if (shouldRetryInterimAck && !hasFreshDescendants && !hasActiveDescendants) {
|
||||
|
||||
@@ -23,7 +23,7 @@ function makeCronSession(entry = makeSessionEntry()): MutableCronSession {
|
||||
}
|
||||
|
||||
describe("createPersistCronSessionEntry", () => {
|
||||
it("persists a distinct run-session snapshot for isolated cron runs", async () => {
|
||||
it("persists isolated cron state only under the stable cron session key", async () => {
|
||||
const cronSession = makeCronSession(
|
||||
makeSessionEntry({
|
||||
status: "running",
|
||||
@@ -39,8 +39,7 @@ describe("createPersistCronSessionEntry", () => {
|
||||
const store: Record<string, SessionEntry> = {};
|
||||
update(store);
|
||||
expect(store["agent:main:cron:job"]).toBe(cronSession.sessionEntry);
|
||||
expect(store["agent:main:cron:job:run:run-session-id"]).not.toBe(cronSession.sessionEntry);
|
||||
expect(store["agent:main:cron:job:run:run-session-id"]).toEqual(cronSession.sessionEntry);
|
||||
expect(store["agent:main:cron:job:run:run-session-id"]).toBeUndefined();
|
||||
},
|
||||
);
|
||||
|
||||
@@ -48,26 +47,16 @@ describe("createPersistCronSessionEntry", () => {
|
||||
isFastTestEnv: false,
|
||||
cronSession,
|
||||
agentSessionKey: "agent:main:cron:job",
|
||||
runSessionKey: "agent:main:cron:job:run:run-session-id",
|
||||
updateSessionStore,
|
||||
});
|
||||
|
||||
await persist();
|
||||
|
||||
expect(cronSession.store["agent:main:cron:job"]).toBe(cronSession.sessionEntry);
|
||||
expect(cronSession.store["agent:main:cron:job:run:run-session-id"]).not.toBe(
|
||||
cronSession.sessionEntry,
|
||||
);
|
||||
|
||||
cronSession.sessionEntry.status = "done";
|
||||
cronSession.sessionEntry.skillsSnapshot!.skills[0].name = "changed";
|
||||
expect(cronSession.store["agent:main:cron:job:run:run-session-id"]?.status).toBe("running");
|
||||
expect(
|
||||
cronSession.store["agent:main:cron:job:run:run-session-id"]?.skillsSnapshot?.skills[0]?.name,
|
||||
).toBe("memory");
|
||||
expect(cronSession.store["agent:main:cron:job:run:run-session-id"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses the shared session entry when the run key is the agent session key", async () => {
|
||||
it("persists explicit session-bound cron state under the requested session key", async () => {
|
||||
const cronSession = makeCronSession();
|
||||
const updateSessionStore = vi.fn(
|
||||
async (_storePath, update: (store: Record<string, SessionEntry>) => void) => {
|
||||
@@ -81,7 +70,6 @@ describe("createPersistCronSessionEntry", () => {
|
||||
isFastTestEnv: false,
|
||||
cronSession,
|
||||
agentSessionKey: "agent:main:session",
|
||||
runSessionKey: "agent:main:session",
|
||||
updateSessionStore,
|
||||
});
|
||||
|
||||
|
||||
@@ -19,31 +19,19 @@ type UpdateSessionStore = (
|
||||
|
||||
export type PersistCronSessionEntry = () => Promise<void>;
|
||||
|
||||
function cloneSessionEntry(entry: MutableCronSessionEntry): MutableCronSessionEntry {
|
||||
return globalThis.structuredClone(entry);
|
||||
}
|
||||
|
||||
export function createPersistCronSessionEntry(params: {
|
||||
isFastTestEnv: boolean;
|
||||
cronSession: MutableCronSession;
|
||||
agentSessionKey: string;
|
||||
runSessionKey: string;
|
||||
updateSessionStore: UpdateSessionStore;
|
||||
}): PersistCronSessionEntry {
|
||||
return async () => {
|
||||
if (params.isFastTestEnv) {
|
||||
return;
|
||||
}
|
||||
const runSessionEntry = cloneSessionEntry(params.cronSession.sessionEntry);
|
||||
params.cronSession.store[params.agentSessionKey] = params.cronSession.sessionEntry;
|
||||
if (params.runSessionKey !== params.agentSessionKey) {
|
||||
params.cronSession.store[params.runSessionKey] = runSessionEntry;
|
||||
}
|
||||
await params.updateSessionStore(params.cronSession.storePath, (store) => {
|
||||
store[params.agentSessionKey] = params.cronSession.sessionEntry;
|
||||
if (params.runSessionKey !== params.agentSessionKey) {
|
||||
store[params.runSessionKey] = runSessionEntry;
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
@@ -89,5 +89,11 @@ describe("runCronIsolatedAgentTurn — interim ack retry", () => {
|
||||
|
||||
mockRunCronFallbackPassthrough();
|
||||
await runTurnAndExpectOk(1, 1);
|
||||
expect(listDescendantRunsForRequesterMock).toHaveBeenCalledWith(
|
||||
"agent:default:cron:test:run:test-session-id",
|
||||
);
|
||||
expect(countActiveDescendantRunsMock).toHaveBeenCalledWith(
|
||||
"agent:default:cron:test:run:test-session-id",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -236,6 +236,7 @@ describe("runCronIsolatedAgentTurn message tool policy", () => {
|
||||
agentId: "default",
|
||||
agentDir: "/tmp/agent-dir",
|
||||
agentSessionKey: "cron:message-tool-policy",
|
||||
runSessionKey: "cron:message-tool-policy:run:test-session-id",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
resolvedVerboseLevel: "off",
|
||||
thinkLevel: undefined,
|
||||
|
||||
119
src/cron/isolated-agent/run.session-key-isolation.test.ts
Normal file
119
src/cron/isolated-agent/run.session-key-isolation.test.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
makeIsolatedAgentTurnJob,
|
||||
makeIsolatedAgentTurnParams,
|
||||
setupRunCronIsolatedAgentTurnSuite,
|
||||
} from "./run.suite-helpers.js";
|
||||
import {
|
||||
isCliProviderMock,
|
||||
loadRunCronIsolatedAgentTurn,
|
||||
makeCronSession,
|
||||
mockRunCronFallbackPassthrough,
|
||||
resolveCronSessionMock,
|
||||
runCliAgentMock,
|
||||
runEmbeddedPiAgentMock,
|
||||
} from "./run.test-harness.js";
|
||||
|
||||
const runCronIsolatedAgentTurn = await loadRunCronIsolatedAgentTurn();
|
||||
|
||||
describe("runCronIsolatedAgentTurn isolated session identity", () => {
|
||||
setupRunCronIsolatedAgentTurnSuite();
|
||||
|
||||
it("uses a run-scoped key for embedded isolated cron execution", async () => {
|
||||
resolveCronSessionMock.mockReturnValue(
|
||||
makeCronSession({
|
||||
sessionEntry: {
|
||||
...makeCronSession().sessionEntry,
|
||||
sessionId: "isolated-run-1",
|
||||
},
|
||||
}),
|
||||
);
|
||||
mockRunCronFallbackPassthrough();
|
||||
|
||||
const result = await runCronIsolatedAgentTurn(
|
||||
makeIsolatedAgentTurnParams({
|
||||
sessionKey: "cron:daily-monitor",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result.status).toBe("ok");
|
||||
expect(result.sessionKey).toBe("agent:default:cron:daily-monitor:run:isolated-run-1");
|
||||
expect(resolveCronSessionMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
forceNew: true,
|
||||
sessionKey: "agent:default:cron:daily-monitor",
|
||||
}),
|
||||
);
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledOnce();
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({
|
||||
sessionId: "isolated-run-1",
|
||||
sessionKey: "agent:default:cron:daily-monitor:run:isolated-run-1",
|
||||
});
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]?.sessionKey).not.toBe(
|
||||
"agent:default:cron:daily-monitor",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps explicit session-bound cron execution on the requested session key", async () => {
|
||||
resolveCronSessionMock.mockReturnValue(
|
||||
makeCronSession({
|
||||
sessionEntry: {
|
||||
...makeCronSession().sessionEntry,
|
||||
sessionId: "bound-run-1",
|
||||
},
|
||||
}),
|
||||
);
|
||||
mockRunCronFallbackPassthrough();
|
||||
|
||||
const result = await runCronIsolatedAgentTurn(
|
||||
makeIsolatedAgentTurnParams({
|
||||
sessionKey: "project-alpha-monitor",
|
||||
job: makeIsolatedAgentTurnJob({
|
||||
sessionTarget: "session:project-alpha-monitor",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result.status).toBe("ok");
|
||||
expect(result.sessionKey).toBe("agent:default:project-alpha-monitor");
|
||||
expect(runEmbeddedPiAgentMock).toHaveBeenCalledOnce();
|
||||
expect(runEmbeddedPiAgentMock.mock.calls[0]?.[0]).toMatchObject({
|
||||
sessionId: "bound-run-1",
|
||||
sessionKey: "agent:default:project-alpha-monitor",
|
||||
});
|
||||
});
|
||||
|
||||
it("uses a run-scoped key for CLI isolated cron execution", async () => {
|
||||
isCliProviderMock.mockReturnValue(true);
|
||||
resolveCronSessionMock.mockReturnValue(
|
||||
makeCronSession({
|
||||
sessionEntry: {
|
||||
...makeCronSession().sessionEntry,
|
||||
sessionId: "isolated-cli-run-1",
|
||||
},
|
||||
}),
|
||||
);
|
||||
mockRunCronFallbackPassthrough();
|
||||
runCliAgentMock.mockResolvedValue({
|
||||
payloads: [{ text: "done" }],
|
||||
meta: { agentMeta: { usage: { input: 10, output: 20 } } },
|
||||
});
|
||||
|
||||
const result = await runCronIsolatedAgentTurn(
|
||||
makeIsolatedAgentTurnParams({
|
||||
sessionKey: "cron:cli-monitor",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(result.status).toBe("ok");
|
||||
expect(result.sessionKey).toBe("agent:default:cron:cli-monitor:run:isolated-cli-run-1");
|
||||
expect(runCliAgentMock).toHaveBeenCalledOnce();
|
||||
expect(runCliAgentMock.mock.calls[0]?.[0]).toMatchObject({
|
||||
sessionId: "isolated-cli-run-1",
|
||||
sessionKey: "agent:default:cron:cli-monitor:run:isolated-cli-run-1",
|
||||
});
|
||||
expect(runCliAgentMock.mock.calls[0]?.[0]?.sessionKey).not.toBe(
|
||||
"agent:default:cron:cli-monitor",
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -521,7 +521,6 @@ async function prepareCronRunContext(params: {
|
||||
isFastTestEnv: params.isFastTestEnv,
|
||||
cronSession,
|
||||
agentSessionKey,
|
||||
runSessionKey,
|
||||
updateSessionStore: async (storePath, update) => {
|
||||
const { updateSessionStore } = await loadSessionStoreRuntime();
|
||||
await updateSessionStore(storePath, update);
|
||||
@@ -894,6 +893,7 @@ async function finalizeCronRun(params: {
|
||||
job: prepared.input.job,
|
||||
agentId: prepared.agentId,
|
||||
agentSessionKey: prepared.agentSessionKey,
|
||||
runSessionKey: prepared.runSessionKey,
|
||||
sessionId: prepared.runSessionId,
|
||||
runStartedAt: execution.runStartedAt,
|
||||
runEndedAt: execution.runEndedAt,
|
||||
@@ -982,6 +982,7 @@ export async function runCronIsolatedAgentTurn(params: {
|
||||
agentId: prepared.context.agentId,
|
||||
agentDir: prepared.context.agentDir,
|
||||
agentSessionKey: prepared.context.agentSessionKey,
|
||||
runSessionKey: prepared.context.runSessionKey,
|
||||
workspaceDir: prepared.context.workspaceDir,
|
||||
lane: params.lane,
|
||||
resolvedDelivery: {
|
||||
|
||||
Reference in New Issue
Block a user