mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(session): retire stale dm main route after dmScope migration (#31010)
This commit is contained in:
@@ -101,6 +101,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/DM command auth: unify DM allowlist + pairing-store authorization across message preflight and native command interactions so DM command gating is consistent for `open`/`pairing`/`allowlist` policies.
|
||||
- Sessions/Usage accounting: persist `cacheRead`/`cacheWrite` from the latest call snapshot (`lastCallUsage`) instead of accumulated multi-call totals, preventing inflated token/cost reporting in long tool/compaction runs. (#31005)
|
||||
- Sessions/Followup queue: always schedule followup drain even when unexpected runtime exceptions escape `runReplyAgent`, preventing silent stuck followup backlogs after failed turns. (#30627)
|
||||
- Sessions/DM scope migration: when `session.dmScope` is non-`main`, retire stale `agent:*:main` delivery routing metadata once the matching direct-chat peer session is active, preventing duplicate Telegram/DM announce deliveries from legacy main sessions after scope migration. (#31010)
|
||||
- Sessions/Compaction safety: add transcript-size forced pre-compaction memory flush (`agents.defaults.compaction.memoryFlush.forceFlushTranscriptBytes`, default 2MB) so long sessions recover without manual transcript deletion when token snapshots are stale. (#30655)
|
||||
- Diagnostics/Stuck session signal: add configurable stuck-session warning threshold via `diagnostics.stuckSessionWarnMs` (default 120000ms) to reduce false-positive warnings on long multi-tool turns. (#31032)
|
||||
- ACP/Harness thread spawn routing: force ACP harness thread creation through `sessions_spawn` (`runtime: "acp"`, `thread: true`) and explicitly forbid `message action=thread-create` for ACP harness requests, avoiding misrouted `Unknown channel` errors. (#30957) Thanks @dutifulbob.
|
||||
|
||||
@@ -16,6 +16,11 @@ vi.mock("openclaw/plugin-sdk", () => ({
|
||||
setAccountEnabledInConfigSection: vi.fn((_opts: any) => ({})),
|
||||
registerPluginHttpRoute: registerPluginHttpRouteMock,
|
||||
buildChannelConfigSchema: vi.fn((schema: any) => ({ schema })),
|
||||
createFixedWindowRateLimiter: vi.fn(() => ({
|
||||
isRateLimited: vi.fn(() => false),
|
||||
size: vi.fn(() => 0),
|
||||
clear: vi.fn(),
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("./runtime.js", () => ({
|
||||
|
||||
@@ -6,6 +6,11 @@ vi.mock("openclaw/plugin-sdk", () => ({
|
||||
setAccountEnabledInConfigSection: vi.fn((_opts: any) => ({})),
|
||||
registerPluginHttpRoute: vi.fn(() => vi.fn()),
|
||||
buildChannelConfigSchema: vi.fn((schema: any) => ({ schema })),
|
||||
createFixedWindowRateLimiter: vi.fn(() => ({
|
||||
isRateLimited: vi.fn(() => false),
|
||||
size: vi.fn(() => 0),
|
||||
clear: vi.fn(),
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("./client.js", () => ({
|
||||
|
||||
@@ -1359,6 +1359,98 @@ describe("initSessionState stale threadId fallback", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("initSessionState dmScope delivery migration", () => {
|
||||
it("retires stale main-session delivery route when dmScope uses per-channel DM keys", async () => {
|
||||
const storePath = await createStorePath("dm-scope-retire-main-route-");
|
||||
await saveSessionStore(storePath, {
|
||||
"agent:main:main": {
|
||||
sessionId: "legacy-main",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastTo: "6101296751",
|
||||
lastAccountId: "default",
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "6101296751",
|
||||
accountId: "default",
|
||||
},
|
||||
},
|
||||
});
|
||||
const cfg = {
|
||||
session: { store: storePath, dmScope: "per-channel-peer" },
|
||||
} as OpenClawConfig;
|
||||
|
||||
const result = await initSessionState({
|
||||
ctx: {
|
||||
Body: "hello",
|
||||
SessionKey: "agent:main:telegram:direct:6101296751",
|
||||
OriginatingChannel: "telegram",
|
||||
OriginatingTo: "6101296751",
|
||||
AccountId: "default",
|
||||
},
|
||||
cfg,
|
||||
commandAuthorized: true,
|
||||
});
|
||||
|
||||
expect(result.sessionKey).toBe("agent:main:telegram:direct:6101296751");
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
expect(persisted["agent:main:main"]?.sessionId).toBe("legacy-main");
|
||||
expect(persisted["agent:main:main"]?.deliveryContext).toBeUndefined();
|
||||
expect(persisted["agent:main:main"]?.lastChannel).toBeUndefined();
|
||||
expect(persisted["agent:main:main"]?.lastTo).toBeUndefined();
|
||||
expect(persisted["agent:main:telegram:direct:6101296751"]?.deliveryContext?.to).toBe(
|
||||
"6101296751",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps legacy main-session delivery route when current DM target does not match", async () => {
|
||||
const storePath = await createStorePath("dm-scope-keep-main-route-");
|
||||
await saveSessionStore(storePath, {
|
||||
"agent:main:main": {
|
||||
sessionId: "legacy-main",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastTo: "1111",
|
||||
lastAccountId: "default",
|
||||
deliveryContext: {
|
||||
channel: "telegram",
|
||||
to: "1111",
|
||||
accountId: "default",
|
||||
},
|
||||
},
|
||||
});
|
||||
const cfg = {
|
||||
session: { store: storePath, dmScope: "per-channel-peer" },
|
||||
} as OpenClawConfig;
|
||||
|
||||
await initSessionState({
|
||||
ctx: {
|
||||
Body: "hello",
|
||||
SessionKey: "agent:main:telegram:direct:6101296751",
|
||||
OriginatingChannel: "telegram",
|
||||
OriginatingTo: "6101296751",
|
||||
AccountId: "default",
|
||||
},
|
||||
cfg,
|
||||
commandAuthorized: true,
|
||||
});
|
||||
|
||||
const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
expect(persisted["agent:main:main"]?.deliveryContext).toEqual({
|
||||
channel: "telegram",
|
||||
to: "1111",
|
||||
accountId: "default",
|
||||
});
|
||||
expect(persisted["agent:main:main"]?.lastTo).toBe("1111");
|
||||
});
|
||||
});
|
||||
|
||||
describe("initSessionState internal channel routing preservation", () => {
|
||||
it("keeps persisted external lastChannel when OriginatingChannel is internal webchat", async () => {
|
||||
const storePath = await createStorePath("preserve-external-channel-");
|
||||
|
||||
@@ -30,9 +30,14 @@ import { archiveSessionTranscripts } from "../../gateway/session-utils.fs.js";
|
||||
import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { normalizeMainKey } from "../../routing/session-key.js";
|
||||
import { buildAgentMainSessionKey, normalizeMainKey } from "../../routing/session-key.js";
|
||||
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
|
||||
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
|
||||
import {
|
||||
deliveryContextFromSession,
|
||||
deliveryContextKey,
|
||||
normalizeDeliveryContext,
|
||||
normalizeSessionDeliveryFields,
|
||||
} from "../../utils/delivery-context.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
isDeliverableMessageChannel,
|
||||
@@ -112,6 +117,11 @@ export type SessionInitResult = {
|
||||
*/
|
||||
const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000;
|
||||
|
||||
type LegacyMainDeliveryRetirement = {
|
||||
key: string;
|
||||
entry: SessionEntry;
|
||||
};
|
||||
|
||||
function resolveParentForkMaxTokens(cfg: OpenClawConfig): number {
|
||||
const configured = cfg.session?.parentForkMaxTokens;
|
||||
if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) {
|
||||
@@ -120,6 +130,67 @@ function resolveParentForkMaxTokens(cfg: OpenClawConfig): number {
|
||||
return DEFAULT_PARENT_FORK_MAX_TOKENS;
|
||||
}
|
||||
|
||||
function maybeRetireLegacyMainDeliveryRoute(params: {
|
||||
sessionCfg: OpenClawConfig["session"] | undefined;
|
||||
sessionKey: string;
|
||||
sessionStore: Record<string, SessionEntry>;
|
||||
agentId: string;
|
||||
mainKey: string;
|
||||
isGroup: boolean;
|
||||
ctx: MsgContext;
|
||||
}): LegacyMainDeliveryRetirement | undefined {
|
||||
const dmScope = params.sessionCfg?.dmScope ?? "main";
|
||||
if (dmScope === "main" || params.isGroup) {
|
||||
return undefined;
|
||||
}
|
||||
const canonicalMainSessionKey = buildAgentMainSessionKey({
|
||||
agentId: params.agentId,
|
||||
mainKey: params.mainKey,
|
||||
}).toLowerCase();
|
||||
if (params.sessionKey === canonicalMainSessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
const legacyMain = params.sessionStore[canonicalMainSessionKey];
|
||||
if (!legacyMain) {
|
||||
return undefined;
|
||||
}
|
||||
const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain));
|
||||
if (!legacyRouteKey) {
|
||||
return undefined;
|
||||
}
|
||||
const activeDirectRouteKey = deliveryContextKey(
|
||||
normalizeDeliveryContext({
|
||||
channel: params.ctx.OriginatingChannel as string | undefined,
|
||||
to: params.ctx.OriginatingTo || params.ctx.To,
|
||||
accountId: params.ctx.AccountId,
|
||||
threadId: params.ctx.MessageThreadId,
|
||||
}),
|
||||
);
|
||||
if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) {
|
||||
return undefined;
|
||||
}
|
||||
if (
|
||||
legacyMain.deliveryContext === undefined &&
|
||||
legacyMain.lastChannel === undefined &&
|
||||
legacyMain.lastTo === undefined &&
|
||||
legacyMain.lastAccountId === undefined &&
|
||||
legacyMain.lastThreadId === undefined
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
key: canonicalMainSessionKey,
|
||||
entry: {
|
||||
...legacyMain,
|
||||
deliveryContext: undefined,
|
||||
lastChannel: undefined,
|
||||
lastTo: undefined,
|
||||
lastAccountId: undefined,
|
||||
lastThreadId: undefined,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function forkSessionFromParent(params: {
|
||||
parentEntry: SessionEntry;
|
||||
agentId: string;
|
||||
@@ -273,6 +344,18 @@ export async function initSessionState(params: {
|
||||
}
|
||||
|
||||
sessionKey = resolveSessionKey(sessionScope, sessionCtxForState, mainKey);
|
||||
const retiredLegacyMainDelivery = maybeRetireLegacyMainDeliveryRoute({
|
||||
sessionCfg,
|
||||
sessionKey,
|
||||
sessionStore,
|
||||
agentId,
|
||||
mainKey,
|
||||
isGroup,
|
||||
ctx,
|
||||
});
|
||||
if (retiredLegacyMainDelivery) {
|
||||
sessionStore[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
const entry = sessionStore[sessionKey];
|
||||
const previousSessionEntry = resetTriggered && entry ? { ...entry } : undefined;
|
||||
const now = Date.now();
|
||||
@@ -477,6 +560,9 @@ export async function initSessionState(params: {
|
||||
(store) => {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
|
||||
if (retiredLegacyMainDelivery) {
|
||||
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
},
|
||||
{
|
||||
activeSessionKey: sessionKey,
|
||||
|
||||
Reference in New Issue
Block a user