mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:00:42 +00:00
fix: clear system events on session reset
This commit is contained in:
@@ -101,6 +101,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/plugins: keep `message` startup, `channels logs`, `agents delete`, and `agents set-identity` off broad plugin preloading; message delivery still loads plugins when the action actually runs.
|
||||
- Image understanding: resolve configured image models such as local LM Studio vision entries before reporting `Unknown model` when the discovery registry has not registered that provider. Fixes #66486. Thanks @zhanggpcsu.
|
||||
- Sessions: separate reset freshness from session-store `updatedAt`, so heartbeat, cron, exec, and gateway bookkeeping no longer prevent configured daily/idle resets from rolling long-running channel sessions. Fixes #68315, #63732, #63820, and #69083. Thanks @maxatv, @longhairedsi, @bradfreels, and @akessel56.
|
||||
- Sessions: clear queued system-event notices during `/new`, `/reset`, gateway `sessions.reset`, and daily/idle rollover so stale background updates cannot leak into the first prompt of the fresh session. Fixes #66864. Thanks @opeyio, @Magicray1217, and @cedillarack.
|
||||
- CLI/agents: keep `agents bind`, `agents unbind`, and `agents bindings` on setup-safe channel metadata paths so they do not preload bundled plugin runtimes or stage runtime dependencies. Fixes #71743.
|
||||
- Plugins/registry: preserve explicit disabled plugin records during registry migration without persisting every unused bundled plugin discovered on disk. Thanks @shakkernerd.
|
||||
- Windows/native: keep CLI startup and bundled provider plugin loading off Windows ESM raw-path failure paths, fixing native onboarding/install smoke on Node 24. Thanks @steipete.
|
||||
|
||||
@@ -71,7 +71,10 @@ Sessions are reused until they expire:
|
||||
|
||||
When both daily and idle resets are configured, whichever expires first wins.
|
||||
Heartbeat, cron, exec, and other system-event turns may write session metadata,
|
||||
but those writes do not extend daily or idle reset freshness.
|
||||
but those writes do not extend daily or idle reset freshness. When a reset
|
||||
rolls the session, queued system-event notices for the old session are
|
||||
discarded so stale background updates are not prepended to the first prompt in
|
||||
the new session.
|
||||
|
||||
Sessions with an active provider-owned CLI session are not cut by the implicit
|
||||
daily default. Use `/reset` or configure `session.reset` explicitly when those
|
||||
|
||||
@@ -136,7 +136,7 @@ Rules of thumb:
|
||||
- **Reset** (`/new`, `/reset`) creates a new `sessionId` for that `sessionKey`.
|
||||
- **Daily reset** (default 4:00 AM local time on the gateway host) creates a new `sessionId` on the next message after the reset boundary.
|
||||
- **Idle expiry** (`session.reset.idleMinutes` or legacy `session.idleMinutes`) creates a new `sessionId` when a message arrives after the idle window. When daily + idle are both configured, whichever expires first wins.
|
||||
- **System events** (heartbeat, cron wakeups, exec notifications, gateway bookkeeping) may mutate the session row but do not extend daily/idle reset freshness.
|
||||
- **System events** (heartbeat, cron wakeups, exec notifications, gateway bookkeeping) may mutate the session row but do not extend daily/idle reset freshness. Reset rollover discards queued system-event notices for the previous session before the fresh prompt is built.
|
||||
- **Thread parent fork guard** (`session.parentForkMaxTokens`, default `100000`) skips parent transcript forking when the parent session is already too large; the new thread starts fresh. Set `0` to disable.
|
||||
|
||||
Implementation detail: the decision happens in `initSessionState()` in `src/auto-reply/reply/session.ts`.
|
||||
|
||||
27
src/auto-reply/reply/session-reset-cleanup.test.ts
Normal file
27
src/auto-reply/reply/session-reset-cleanup.test.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
enqueueSystemEvent,
|
||||
peekSystemEvents,
|
||||
resetSystemEventsForTest,
|
||||
} from "../../infra/system-events.js";
|
||||
import { clearSessionResetRuntimeState } from "./session-reset-cleanup.js";
|
||||
|
||||
afterEach(() => {
|
||||
resetSystemEventsForTest();
|
||||
});
|
||||
|
||||
describe("clearSessionResetRuntimeState", () => {
|
||||
it("clears reset queues and drains system events for normalized keys", () => {
|
||||
enqueueSystemEvent("stale alpha", { sessionKey: "alpha" });
|
||||
enqueueSystemEvent("stale beta", { sessionKey: "beta" });
|
||||
enqueueSystemEvent("fresh gamma", { sessionKey: "gamma" });
|
||||
|
||||
const result = clearSessionResetRuntimeState([" alpha ", undefined, " ", "alpha", "beta"]);
|
||||
|
||||
expect(result.keys).toEqual(["alpha", "beta"]);
|
||||
expect(result.systemEventsCleared).toBe(2);
|
||||
expect(peekSystemEvents("alpha")).toEqual([]);
|
||||
expect(peekSystemEvents("beta")).toEqual([]);
|
||||
expect(peekSystemEvents("gamma")).toEqual(["fresh gamma"]);
|
||||
});
|
||||
});
|
||||
22
src/auto-reply/reply/session-reset-cleanup.ts
Normal file
22
src/auto-reply/reply/session-reset-cleanup.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { drainSystemEventEntries } from "../../infra/system-events.js";
|
||||
import { clearSessionQueues, type ClearSessionQueueResult } from "./queue.js";
|
||||
|
||||
export type ClearSessionResetRuntimeStateResult = ClearSessionQueueResult & {
|
||||
systemEventsCleared: number;
|
||||
};
|
||||
|
||||
export function clearSessionResetRuntimeState(
|
||||
keys: Array<string | undefined>,
|
||||
): ClearSessionResetRuntimeStateResult {
|
||||
const cleared = clearSessionQueues(keys);
|
||||
let systemEventsCleared = 0;
|
||||
|
||||
for (const key of cleared.keys) {
|
||||
systemEventsCleared += drainSystemEventEntries(key).length;
|
||||
}
|
||||
|
||||
return {
|
||||
...cleared,
|
||||
systemEventsCleared,
|
||||
};
|
||||
}
|
||||
@@ -15,7 +15,11 @@ import {
|
||||
getSessionBindingService,
|
||||
registerSessionBindingAdapter,
|
||||
} from "../../infra/outbound/session-binding-service.js";
|
||||
import { enqueueSystemEvent, resetSystemEventsForTest } from "../../infra/system-events.js";
|
||||
import {
|
||||
enqueueSystemEvent,
|
||||
peekSystemEvents,
|
||||
resetSystemEventsForTest,
|
||||
} from "../../infra/system-events.js";
|
||||
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js";
|
||||
import {
|
||||
createChannelTestPluginBase,
|
||||
@@ -292,6 +296,7 @@ beforeEach(() => {
|
||||
});
|
||||
});
|
||||
afterEach(async () => {
|
||||
resetSystemEventsForTest();
|
||||
await sessionMcpTesting.resetSessionMcpRuntimeManager();
|
||||
});
|
||||
describe("initSessionState thread forking", () => {
|
||||
@@ -787,6 +792,53 @@ describe("initSessionState RawBody", () => {
|
||||
expect(result.triggerBodyNormalized).toBe("/NEW KeepThisCase");
|
||||
});
|
||||
|
||||
it("drains stale system events when /new rotates an existing session", async () => {
|
||||
const root = await makeCaseDir("openclaw-rawbody-reset-system-events-");
|
||||
const storePath = path.join(root, "sessions.json");
|
||||
const sessionKey = "agent:main:whatsapp:dm:system-events";
|
||||
const existingSessionId = "session-with-stale-events";
|
||||
|
||||
await writeSessionStoreFast(storePath, {
|
||||
[sessionKey]: {
|
||||
sessionId: existingSessionId,
|
||||
updatedAt: Date.now(),
|
||||
systemSent: true,
|
||||
},
|
||||
});
|
||||
enqueueSystemEvent("stale session-key event", { sessionKey });
|
||||
enqueueSystemEvent("stale session-id event", { sessionKey: existingSessionId });
|
||||
|
||||
const cfg = {
|
||||
session: {
|
||||
store: storePath,
|
||||
resetTriggers: ["/new"],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
const result = await initSessionState({
|
||||
ctx: {
|
||||
RawBody: "/new continue",
|
||||
ChatType: "direct",
|
||||
SessionKey: sessionKey,
|
||||
},
|
||||
cfg,
|
||||
commandAuthorized: true,
|
||||
});
|
||||
|
||||
expect(result.isNewSession).toBe(true);
|
||||
expect(result.resetTriggered).toBe(true);
|
||||
expect(result.sessionId).not.toBe(existingSessionId);
|
||||
await expect(
|
||||
drainFormattedSystemEvents({
|
||||
cfg,
|
||||
sessionKey,
|
||||
isMainSession: false,
|
||||
isNewSession: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(peekSystemEvents(existingSessionId)).toEqual([]);
|
||||
});
|
||||
|
||||
it("rotates local session state for /new on bound ACP sessions", async () => {
|
||||
const root = await makeCaseDir("openclaw-rawbody-acp-reset-");
|
||||
const storePath = path.join(root, "sessions.json");
|
||||
@@ -1334,6 +1386,10 @@ describe("initSessionState reset policy", () => {
|
||||
updatedAt: new Date(2026, 0, 18, 3, 0, 0).getTime(),
|
||||
},
|
||||
});
|
||||
enqueueSystemEvent("stale daily rollover event", { sessionKey });
|
||||
enqueueSystemEvent("stale daily rollover session-id event", {
|
||||
sessionKey: existingSessionId,
|
||||
});
|
||||
|
||||
const cfg = { session: { store: storePath } } as OpenClawConfig;
|
||||
const result = await initSessionState({
|
||||
@@ -1348,6 +1404,15 @@ describe("initSessionState reset policy", () => {
|
||||
sessionKey,
|
||||
previousSessionId: existingSessionId,
|
||||
});
|
||||
await expect(
|
||||
drainFormattedSystemEvents({
|
||||
cfg,
|
||||
sessionKey,
|
||||
isMainSession: false,
|
||||
isNewSession: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(peekSystemEvents(existingSessionId)).toEqual([]);
|
||||
});
|
||||
|
||||
it("treats sessions as stale before the daily reset when updated before yesterday's boundary", async () => {
|
||||
@@ -1405,6 +1470,50 @@ describe("initSessionState reset policy", () => {
|
||||
expect(result.sessionId).not.toBe(existingSessionId);
|
||||
});
|
||||
|
||||
it("drains stale system events when idle rollover creates a new session", async () => {
|
||||
vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0));
|
||||
const root = await makeCaseDir("openclaw-reset-idle-system-events-");
|
||||
const storePath = path.join(root, "sessions.json");
|
||||
const sessionKey = "agent:main:whatsapp:dm:idle-system-events";
|
||||
const existingSessionId = "idle-system-events-session";
|
||||
|
||||
await writeSessionStoreFast(storePath, {
|
||||
[sessionKey]: {
|
||||
sessionId: existingSessionId,
|
||||
updatedAt: new Date(2026, 0, 18, 4, 45, 0).getTime(),
|
||||
},
|
||||
});
|
||||
enqueueSystemEvent("stale idle rollover event", { sessionKey });
|
||||
enqueueSystemEvent("stale idle rollover session-id event", {
|
||||
sessionKey: existingSessionId,
|
||||
});
|
||||
|
||||
const cfg = {
|
||||
session: {
|
||||
store: storePath,
|
||||
reset: { mode: "idle", idleMinutes: 30 },
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
const result = await initSessionState({
|
||||
ctx: { Body: "hello", SessionKey: sessionKey },
|
||||
cfg,
|
||||
commandAuthorized: true,
|
||||
});
|
||||
|
||||
expect(result.isNewSession).toBe(true);
|
||||
expect(result.resetTriggered).toBe(false);
|
||||
expect(result.sessionId).not.toBe(existingSessionId);
|
||||
await expect(
|
||||
drainFormattedSystemEvents({
|
||||
cfg,
|
||||
sessionKey,
|
||||
isMainSession: false,
|
||||
isNewSession: true,
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
expect(peekSystemEvents(existingSessionId)).toEqual([]);
|
||||
});
|
||||
|
||||
it("keeps the existing stale session for /reset soft", async () => {
|
||||
vi.setSystemTime(new Date(2026, 0, 18, 5, 30, 0));
|
||||
const root = await makeCaseDir("openclaw-reset-soft-stale-");
|
||||
|
||||
@@ -66,6 +66,7 @@ import {
|
||||
resolveParentForkTokenCount,
|
||||
} from "./session-fork.js";
|
||||
import { buildSessionEndHookPayload, buildSessionStartHookPayload } from "./session-hooks.js";
|
||||
import { clearSessionResetRuntimeState } from "./session-reset-cleanup.js";
|
||||
|
||||
const log = createSubsystemLogger("session-init");
|
||||
let sessionArchiveRuntimePromise: Promise<
|
||||
@@ -486,6 +487,9 @@ export async function initSessionState(params: {
|
||||
sessionKey,
|
||||
previousSessionId: previousSessionEntry?.sessionId,
|
||||
});
|
||||
if (previousSessionEntry) {
|
||||
clearSessionResetRuntimeState([sessionKey, previousSessionEntry.sessionId]);
|
||||
}
|
||||
|
||||
if (!isNewSession && freshEntry && canReuseExistingEntry) {
|
||||
sessionId = entry.sessionId;
|
||||
|
||||
@@ -6,6 +6,11 @@ import type { AssistantMessage, UserMessage } from "@mariozechner/pi-ai";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { isSessionPatchEvent, type InternalHookEvent } from "../hooks/internal-hooks.js";
|
||||
import {
|
||||
enqueueSystemEvent,
|
||||
peekSystemEvents,
|
||||
resetSystemEventsForTest,
|
||||
} from "../infra/system-events.js";
|
||||
import { withEnvAsync } from "../test-utils/env.js";
|
||||
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
|
||||
import { startGatewayServerHarness, type GatewayServerHarness } from "./server.e2e-ws-harness.js";
|
||||
@@ -42,7 +47,16 @@ async function getSessionsHandlers() {
|
||||
}
|
||||
|
||||
const sessionCleanupMocks = vi.hoisted(() => ({
|
||||
clearSessionQueues: vi.fn(() => ({ followupCleared: 0, laneCleared: 0, keys: [] })),
|
||||
clearSessionQueues: vi.fn((keys: Array<string | undefined>) => {
|
||||
const clearedKeys = Array.from(
|
||||
new Set(
|
||||
keys
|
||||
.map((key) => (typeof key === "string" ? key.trim() : ""))
|
||||
.filter((key) => key.length > 0),
|
||||
),
|
||||
);
|
||||
return { followupCleared: 0, laneCleared: 0, keys: clearedKeys };
|
||||
}),
|
||||
stopSubagentsForRequester: vi.fn(() => ({ stopped: 0 })),
|
||||
}));
|
||||
|
||||
@@ -437,6 +451,7 @@ describe("gateway server sessions", () => {
|
||||
subagentLifecycleHookMocks.runSubagentEnded.mockClear();
|
||||
subagentLifecycleHookState.hasSubagentEndedHook = true;
|
||||
threadBindingMocks.unbindThreadBindingsBySessionKey.mockClear();
|
||||
resetSystemEventsForTest();
|
||||
acpRuntimeMocks.cancel.mockClear();
|
||||
acpRuntimeMocks.close.mockClear();
|
||||
acpRuntimeMocks.getAcpRuntimeBackend.mockReset();
|
||||
@@ -2688,6 +2703,9 @@ describe("gateway server sessions", () => {
|
||||
|
||||
test("sessions.reset aborts active runs and clears queues", async () => {
|
||||
await seedActiveMainSession();
|
||||
enqueueSystemEvent("stale event via alias", { sessionKey: "main" });
|
||||
enqueueSystemEvent("stale event via canonical key", { sessionKey: "agent:main:main" });
|
||||
enqueueSystemEvent("stale event via session id", { sessionKey: "sess-main" });
|
||||
const waitCallCountAtSnapshotClear: number[] = [];
|
||||
bootstrapCacheMocks.clearBootstrapSnapshot.mockImplementation(() => {
|
||||
waitCallCountAtSnapshotClear.push(embeddedRunMock.waitCalls.length);
|
||||
@@ -2710,6 +2728,9 @@ describe("gateway server sessions", () => {
|
||||
["main", "agent:main:main", "sess-main"],
|
||||
"sess-main",
|
||||
);
|
||||
expect(peekSystemEvents("main")).toEqual([]);
|
||||
expect(peekSystemEvents("agent:main:main")).toEqual([]);
|
||||
expect(peekSystemEvents("sess-main")).toEqual([]);
|
||||
expect(bundleMcpRuntimeMocks.disposeSessionMcpRuntime).toHaveBeenCalledWith("sess-main");
|
||||
expect(waitCallCountAtSnapshotClear).toEqual([1]);
|
||||
expect(browserSessionTabMocks.closeTrackedBrowserTabsForSessions).toHaveBeenCalledTimes(1);
|
||||
|
||||
@@ -10,11 +10,11 @@ import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js";
|
||||
import { retireSessionMcpRuntime } from "../agents/pi-bundle-mcp-tools.js";
|
||||
import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../agents/pi-embedded.js";
|
||||
import { stopSubagentsForRequester } from "../auto-reply/reply/abort.js";
|
||||
import { clearSessionQueues } from "../auto-reply/reply/queue.js";
|
||||
import {
|
||||
buildSessionEndHookPayload,
|
||||
buildSessionStartHookPayload,
|
||||
} from "../auto-reply/reply/session-hooks.js";
|
||||
import { clearSessionResetRuntimeState } from "../auto-reply/reply/session-reset-cleanup.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
snapshotSessionOrigin,
|
||||
@@ -215,7 +215,7 @@ async function ensureSessionRuntimeCleanup(params: {
|
||||
if (params.sessionId) {
|
||||
queueKeys.add(params.sessionId);
|
||||
}
|
||||
clearSessionQueues([...queueKeys]);
|
||||
clearSessionResetRuntimeState([...queueKeys]);
|
||||
stopSubagentsForRequester({ cfg: params.cfg, requesterSessionKey: params.target.canonicalKey });
|
||||
if (!params.sessionId) {
|
||||
clearBootstrapSnapshot(params.target.canonicalKey);
|
||||
|
||||
Reference in New Issue
Block a user