From 412eabc42b5502f57bf0def5bbee71242ec52012 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 00:33:40 +0000 Subject: [PATCH] fix(session): retire stale dm main route after dmScope migration (#31010) --- CHANGELOG.md | 1 + .../src/channel.integration.test.ts | 5 + extensions/synology-chat/src/channel.test.ts | 5 + src/auto-reply/reply/session.test.ts | 92 +++++++++++++++++++ src/auto-reply/reply/session.ts | 90 +++++++++++++++++- 5 files changed, 191 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a7d7552a33..ee0617b6ad8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ Docs: https://docs.openclaw.ai - Discord/DM command auth: unify DM allowlist + pairing-store authorization across message preflight and native command interactions so DM command gating is consistent for `open`/`pairing`/`allowlist` policies. - Sessions/Usage accounting: persist `cacheRead`/`cacheWrite` from the latest call snapshot (`lastCallUsage`) instead of accumulated multi-call totals, preventing inflated token/cost reporting in long tool/compaction runs. (#31005) - Sessions/Followup queue: always schedule followup drain even when unexpected runtime exceptions escape `runReplyAgent`, preventing silent stuck followup backlogs after failed turns. (#30627) +- Sessions/DM scope migration: when `session.dmScope` is non-`main`, retire stale `agent:*:main` delivery routing metadata once the matching direct-chat peer session is active, preventing duplicate Telegram/DM announce deliveries from legacy main sessions after scope migration. (#31010) - Sessions/Compaction safety: add transcript-size forced pre-compaction memory flush (`agents.defaults.compaction.memoryFlush.forceFlushTranscriptBytes`, default 2MB) so long sessions recover without manual transcript deletion when token snapshots are stale. (#30655) - Diagnostics/Stuck session signal: add configurable stuck-session warning threshold via `diagnostics.stuckSessionWarnMs` (default 120000ms) to reduce false-positive warnings on long multi-tool turns. (#31032) - ACP/Harness thread spawn routing: force ACP harness thread creation through `sessions_spawn` (`runtime: "acp"`, `thread: true`) and explicitly forbid `message action=thread-create` for ACP harness requests, avoiding misrouted `Unknown channel` errors. (#30957) Thanks @dutifulbob. diff --git a/extensions/synology-chat/src/channel.integration.test.ts b/extensions/synology-chat/src/channel.integration.test.ts index 6005cbd923b..2032a83512a 100644 --- a/extensions/synology-chat/src/channel.integration.test.ts +++ b/extensions/synology-chat/src/channel.integration.test.ts @@ -16,6 +16,11 @@ vi.mock("openclaw/plugin-sdk", () => ({ setAccountEnabledInConfigSection: vi.fn((_opts: any) => ({})), registerPluginHttpRoute: registerPluginHttpRouteMock, buildChannelConfigSchema: vi.fn((schema: any) => ({ schema })), + createFixedWindowRateLimiter: vi.fn(() => ({ + isRateLimited: vi.fn(() => false), + size: vi.fn(() => 0), + clear: vi.fn(), + })), })); vi.mock("./runtime.js", () => ({ diff --git a/extensions/synology-chat/src/channel.test.ts b/extensions/synology-chat/src/channel.test.ts index bc6c00a4712..89a96013200 100644 --- a/extensions/synology-chat/src/channel.test.ts +++ b/extensions/synology-chat/src/channel.test.ts @@ -6,6 +6,11 @@ vi.mock("openclaw/plugin-sdk", () => ({ setAccountEnabledInConfigSection: vi.fn((_opts: any) => ({})), registerPluginHttpRoute: vi.fn(() => vi.fn()), buildChannelConfigSchema: vi.fn((schema: any) => ({ schema })), + createFixedWindowRateLimiter: vi.fn(() => ({ + isRateLimited: vi.fn(() => false), + size: vi.fn(() => 0), + clear: vi.fn(), + })), })); vi.mock("./client.js", () => ({ diff --git a/src/auto-reply/reply/session.test.ts b/src/auto-reply/reply/session.test.ts index 1594f0ac662..3a25c5d1552 100644 --- a/src/auto-reply/reply/session.test.ts +++ b/src/auto-reply/reply/session.test.ts @@ -1359,6 +1359,98 @@ describe("initSessionState stale threadId fallback", () => { }); }); +describe("initSessionState dmScope delivery migration", () => { + it("retires stale main-session delivery route when dmScope uses per-channel DM keys", async () => { + const storePath = await createStorePath("dm-scope-retire-main-route-"); + await saveSessionStore(storePath, { + "agent:main:main": { + sessionId: "legacy-main", + updatedAt: Date.now(), + lastChannel: "telegram", + lastTo: "6101296751", + lastAccountId: "default", + deliveryContext: { + channel: "telegram", + to: "6101296751", + accountId: "default", + }, + }, + }); + const cfg = { + session: { store: storePath, dmScope: "per-channel-peer" }, + } as OpenClawConfig; + + const result = await initSessionState({ + ctx: { + Body: "hello", + SessionKey: "agent:main:telegram:direct:6101296751", + OriginatingChannel: "telegram", + OriginatingTo: "6101296751", + AccountId: "default", + }, + cfg, + commandAuthorized: true, + }); + + expect(result.sessionKey).toBe("agent:main:telegram:direct:6101296751"); + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + SessionEntry + >; + expect(persisted["agent:main:main"]?.sessionId).toBe("legacy-main"); + expect(persisted["agent:main:main"]?.deliveryContext).toBeUndefined(); + expect(persisted["agent:main:main"]?.lastChannel).toBeUndefined(); + expect(persisted["agent:main:main"]?.lastTo).toBeUndefined(); + expect(persisted["agent:main:telegram:direct:6101296751"]?.deliveryContext?.to).toBe( + "6101296751", + ); + }); + + it("keeps legacy main-session delivery route when current DM target does not match", async () => { + const storePath = await createStorePath("dm-scope-keep-main-route-"); + await saveSessionStore(storePath, { + "agent:main:main": { + sessionId: "legacy-main", + updatedAt: Date.now(), + lastChannel: "telegram", + lastTo: "1111", + lastAccountId: "default", + deliveryContext: { + channel: "telegram", + to: "1111", + accountId: "default", + }, + }, + }); + const cfg = { + session: { store: storePath, dmScope: "per-channel-peer" }, + } as OpenClawConfig; + + await initSessionState({ + ctx: { + Body: "hello", + SessionKey: "agent:main:telegram:direct:6101296751", + OriginatingChannel: "telegram", + OriginatingTo: "6101296751", + AccountId: "default", + }, + cfg, + commandAuthorized: true, + }); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + SessionEntry + >; + expect(persisted["agent:main:main"]?.deliveryContext).toEqual({ + channel: "telegram", + to: "1111", + accountId: "default", + }); + expect(persisted["agent:main:main"]?.lastTo).toBe("1111"); + }); +}); + describe("initSessionState internal channel routing preservation", () => { it("keeps persisted external lastChannel when OriginatingChannel is internal webchat", async () => { const storePath = await createStorePath("preserve-external-channel-"); diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 59b0c7ba379..ed789ece8da 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -30,9 +30,14 @@ import { archiveSessionTranscripts } from "../../gateway/session-utils.fs.js"; import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; -import { normalizeMainKey } from "../../routing/session-key.js"; +import { buildAgentMainSessionKey, normalizeMainKey } from "../../routing/session-key.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; -import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; +import { + deliveryContextFromSession, + deliveryContextKey, + normalizeDeliveryContext, + normalizeSessionDeliveryFields, +} from "../../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, isDeliverableMessageChannel, @@ -112,6 +117,11 @@ export type SessionInitResult = { */ const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000; +type LegacyMainDeliveryRetirement = { + key: string; + entry: SessionEntry; +}; + function resolveParentForkMaxTokens(cfg: OpenClawConfig): number { const configured = cfg.session?.parentForkMaxTokens; if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) { @@ -120,6 +130,67 @@ function resolveParentForkMaxTokens(cfg: OpenClawConfig): number { return DEFAULT_PARENT_FORK_MAX_TOKENS; } +function maybeRetireLegacyMainDeliveryRoute(params: { + sessionCfg: OpenClawConfig["session"] | undefined; + sessionKey: string; + sessionStore: Record; + agentId: string; + mainKey: string; + isGroup: boolean; + ctx: MsgContext; +}): LegacyMainDeliveryRetirement | undefined { + const dmScope = params.sessionCfg?.dmScope ?? "main"; + if (dmScope === "main" || params.isGroup) { + return undefined; + } + const canonicalMainSessionKey = buildAgentMainSessionKey({ + agentId: params.agentId, + mainKey: params.mainKey, + }).toLowerCase(); + if (params.sessionKey === canonicalMainSessionKey) { + return undefined; + } + const legacyMain = params.sessionStore[canonicalMainSessionKey]; + if (!legacyMain) { + return undefined; + } + const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain)); + if (!legacyRouteKey) { + return undefined; + } + const activeDirectRouteKey = deliveryContextKey( + normalizeDeliveryContext({ + channel: params.ctx.OriginatingChannel as string | undefined, + to: params.ctx.OriginatingTo || params.ctx.To, + accountId: params.ctx.AccountId, + threadId: params.ctx.MessageThreadId, + }), + ); + if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) { + return undefined; + } + if ( + legacyMain.deliveryContext === undefined && + legacyMain.lastChannel === undefined && + legacyMain.lastTo === undefined && + legacyMain.lastAccountId === undefined && + legacyMain.lastThreadId === undefined + ) { + return undefined; + } + return { + key: canonicalMainSessionKey, + entry: { + ...legacyMain, + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + lastThreadId: undefined, + }, + }; +} + function forkSessionFromParent(params: { parentEntry: SessionEntry; agentId: string; @@ -273,6 +344,18 @@ export async function initSessionState(params: { } sessionKey = resolveSessionKey(sessionScope, sessionCtxForState, mainKey); + const retiredLegacyMainDelivery = maybeRetireLegacyMainDeliveryRoute({ + sessionCfg, + sessionKey, + sessionStore, + agentId, + mainKey, + isGroup, + ctx, + }); + if (retiredLegacyMainDelivery) { + sessionStore[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry; + } const entry = sessionStore[sessionKey]; const previousSessionEntry = resetTriggered && entry ? { ...entry } : undefined; const now = Date.now(); @@ -477,6 +560,9 @@ export async function initSessionState(params: { (store) => { // Preserve per-session overrides while resetting compaction state on /new. store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; + if (retiredLegacyMainDelivery) { + store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry; + } }, { activeSessionKey: sessionKey,