diff --git a/CHANGELOG.md b/CHANGELOG.md index 01abad8d741..c38e76c5783 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai - Gateway/tailscale: start Tailscale exposure and the gateway update check before awaiting channel and plugin sidecar startup so remote operators are not locked out when startup sidecars stall. - QQBot/streaming: make block streaming configurable per QQ bot account via `streaming.mode` (`"partial"` | `"off"`, default `"partial"`) instead of hardcoding it off, so responses can be delivered incrementally. (#63746) - Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky. +<<<<<<< HEAD - Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras. - Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman. - Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode. @@ -50,6 +51,7 @@ Docs: https://docs.openclaw.ai - Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras - Gateway/agents: fix stale run-context TTL cleanup so the new maintenance sweep compiles and resets orphaned run sequence state correctly. (#52731) thanks @artwalker - Memory/lancedb: accept `dreaming` config when `memory-lancedb` owns the memory slot so Dreaming surfaces can read slot-owner settings without schema rejection. (#63874) Thanks @mbelinky. +- Heartbeats/sessions: remove stale accumulated isolated heartbeat session keys when the next tick converges them back to the canonical sibling, so repaired sessions stop showing orphaned `:heartbeat:heartbeat` variants in session listings. (#59606) Thanks @rogerdigital. ## 2026.4.9 diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 91e6de6b740..17c4199b7a5 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -116,6 +116,12 @@ export type SessionEntry = { lastHeartbeatText?: string; /** Timestamp (ms) when lastHeartbeatText was delivered. */ lastHeartbeatSentAt?: number; + /** + * Base session key for heartbeat-created isolated sessions. + * When present, `:heartbeat` is a synthetic isolated session rather than + * a real user/session-scoped key that merely happens to end with `:heartbeat`. + */ + heartbeatIsolatedBaseSessionKey?: string; /** Heartbeat task state (task name -> last run timestamp ms). */ heartbeatTaskState?: Record; sessionId: string; diff --git a/src/infra/heartbeat-runner.isolated-key-stability.test.ts b/src/infra/heartbeat-runner.isolated-key-stability.test.ts new file mode 100644 index 00000000000..7f951b53d6c --- /dev/null +++ b/src/infra/heartbeat-runner.isolated-key-stability.test.ts @@ -0,0 +1,392 @@ +import fs from "node:fs/promises"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import * as replyModule from "../auto-reply/reply.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; +import { runHeartbeatOnce } from "./heartbeat-runner.js"; +import { seedSessionStore, withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js"; + +vi.mock("./outbound/deliver.js", () => ({ + deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined), +})); + +afterEach(() => { + vi.restoreAllMocks(); +}); + +describe("runHeartbeatOnce – isolated session key stability (#59493)", () => { + /** + * Simulates the wake-request feedback loop: + * 1. Normal heartbeat tick produces sessionKey "agent:main:main:heartbeat" + * 2. An exec/subagent event during that tick calls requestHeartbeatNow() + * with the already-suffixed key "agent:main:main:heartbeat" + * 3. The wake handler passes that key back into runHeartbeatOnce(sessionKey: ...) + * + * Before the fix, step 3 would append another ":heartbeat" producing + * "agent:main:main:heartbeat:heartbeat". After the fix, the key remains + * stable at "agent:main:main:heartbeat". + */ + async function runIsolatedHeartbeat(params: { + tmpDir: string; + storePath: string; + cfg: OpenClawConfig; + sessionKey: string; + }) { + await seedSessionStore(params.storePath, params.sessionKey, { + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + }); + + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg: params.cfg, + sessionKey: params.sessionKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + return replySpy.mock.calls[0]?.[0]; + } + + function makeIsolatedHeartbeatConfig(tmpDir: string, storePath: string): OpenClawConfig { + return { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "whatsapp", + isolatedSession: true, + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + } + + function makeNamedIsolatedHeartbeatConfig( + tmpDir: string, + storePath: string, + heartbeatSession: string, + ): OpenClawConfig { + return { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "whatsapp", + isolatedSession: true, + session: heartbeatSession, + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + } + + it("does not accumulate :heartbeat suffix when wake passes an already-suffixed key", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const baseSessionKey = resolveMainSessionKey(cfg); + + // Simulate wake-request path: key already has :heartbeat from a previous tick. + const alreadySuffixedKey = `${baseSessionKey}:heartbeat`; + await fs.writeFile( + storePath, + JSON.stringify({ + [alreadySuffixedKey]: { + sessionId: "sid", + updatedAt: 1, + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + heartbeatIsolatedBaseSessionKey: baseSessionKey, + }, + }), + "utf-8", + ); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + sessionKey: alreadySuffixedKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // Key must remain stable — no double :heartbeat suffix. + expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(`${baseSessionKey}:heartbeat`); + }); + }); + + it("appends :heartbeat exactly once from a clean base key", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const baseSessionKey = resolveMainSessionKey(cfg); + + const ctx = await runIsolatedHeartbeat({ + tmpDir, + storePath, + cfg, + sessionKey: baseSessionKey, + }); + + expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`); + }); + }); + + it("stays stable even with multiply-accumulated suffixes", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const baseSessionKey = resolveMainSessionKey(cfg); + + // Simulate a key that already accumulated several :heartbeat suffixes + // (from an unpatched gateway running for many ticks). + const deeplyAccumulatedKey = `${baseSessionKey}:heartbeat:heartbeat:heartbeat`; + + const ctx = await runIsolatedHeartbeat({ + tmpDir, + storePath, + cfg, + sessionKey: deeplyAccumulatedKey, + }); + + // After the fix, ALL trailing :heartbeat suffixes are stripped by the + // (:heartbeat)+$ regex in a single pass, then exactly one is re-appended. + // A deeply accumulated key converges to ":heartbeat" in one call. + expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`); + + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { heartbeatIsolatedBaseSessionKey?: string } + >; + expect(store[deeplyAccumulatedKey]).toBeUndefined(); + expect(store[`${baseSessionKey}:heartbeat`]).toMatchObject({ + heartbeatIsolatedBaseSessionKey: baseSessionKey, + }); + }); + }); + + it("keeps isolated keys distinct when the configured base key already ends with :heartbeat", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeNamedIsolatedHeartbeatConfig(tmpDir, storePath, "alerts:heartbeat"); + const baseSessionKey = "agent:main:alerts:heartbeat"; + + const ctx = await runIsolatedHeartbeat({ + tmpDir, + storePath, + cfg, + sessionKey: baseSessionKey, + }); + + expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`); + }); + }); + + it("stays stable for wake re-entry when the configured base key already ends with :heartbeat", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeNamedIsolatedHeartbeatConfig(tmpDir, storePath, "alerts:heartbeat"); + const baseSessionKey = "agent:main:alerts:heartbeat"; + const alreadyIsolatedKey = `${baseSessionKey}:heartbeat`; + await fs.writeFile( + storePath, + JSON.stringify({ + [alreadyIsolatedKey]: { + sessionId: "sid", + updatedAt: 1, + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + heartbeatIsolatedBaseSessionKey: baseSessionKey, + }, + }), + "utf-8", + ); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + sessionKey: alreadyIsolatedKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(alreadyIsolatedKey); + }); + }); + + it("keeps a forced real :heartbeat session distinct from the heartbeat-isolated sibling", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const realSessionKey = "agent:main:alerts:heartbeat"; + + const ctx = await runIsolatedHeartbeat({ + tmpDir, + storePath, + cfg, + sessionKey: realSessionKey, + }); + + expect(ctx?.SessionKey).toBe(`${realSessionKey}:heartbeat`); + }); + }); + + it("stays stable when a forced real :heartbeat session re-enters through its isolated sibling", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const realSessionKey = "agent:main:alerts:heartbeat"; + const isolatedSessionKey = `${realSessionKey}:heartbeat`; + + await fs.writeFile( + storePath, + JSON.stringify({ + [isolatedSessionKey]: { + sessionId: "sid", + updatedAt: 1, + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + heartbeatIsolatedBaseSessionKey: realSessionKey, + }, + }), + "utf-8", + ); + + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + sessionKey: isolatedSessionKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(isolatedSessionKey); + }); + }); + + it("does not create an isolated session when task-based heartbeat skips for no-tasks-due", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + isolatedSession: true, + target: "whatsapp", + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const baseSessionKey = resolveMainSessionKey(cfg); + const isolatedSessionKey = `${baseSessionKey}:heartbeat`; + await fs.writeFile( + `${tmpDir}/HEARTBEAT.md`, + `tasks: + - name: daily-check + interval: 1d + prompt: "Check status" +`, + "utf-8", + ); + + await fs.writeFile( + storePath, + JSON.stringify({ + [baseSessionKey]: { + sessionId: "sid", + updatedAt: 1, + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + heartbeatTaskState: { + "daily-check": 1, + }, + }, + }), + "utf-8", + ); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + const result = await runHeartbeatOnce({ + cfg, + sessionKey: baseSessionKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 2, + }, + }); + + expect(result).toEqual({ status: "skipped", reason: "no-tasks-due" }); + expect(replySpy).not.toHaveBeenCalled(); + + const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record; + expect(store[isolatedSessionKey]).toBeUndefined(); + }); + }); + + it("converges a legacy isolated key that lacks the stored marker (single :heartbeat suffix)", async () => { + // Regression for: when an isolated session was created before + // heartbeatIsolatedBaseSessionKey was introduced, sessionKey already equals + // ":heartbeat" but the stored entry has no marker. The fallback used to + // treat ":heartbeat" as the new base and persist it as the marker, so + // the next wake re-entry would stabilise at ":heartbeat:heartbeat" + // instead of converging back to ":heartbeat". + await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => { + const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath); + const baseSessionKey = resolveMainSessionKey(cfg); + const legacyIsolatedKey = `${baseSessionKey}:heartbeat`; + + // Legacy entry: has :heartbeat suffix but no heartbeatIsolatedBaseSessionKey marker. + await fs.writeFile( + storePath, + JSON.stringify({ + [legacyIsolatedKey]: { + sessionId: "sid", + updatedAt: 1, + lastChannel: "whatsapp", + lastProvider: "whatsapp", + lastTo: "+1555", + }, + }), + "utf-8", + ); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + sessionKey: legacyIsolatedKey, + deps: { + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // Must converge to the same canonical isolated key, not produce :heartbeat:heartbeat. + expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(legacyIsolatedKey); + }); + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 11390af43a4..8c23cf82d41 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -35,7 +35,11 @@ import { } from "../config/sessions/main-session.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import { loadSessionStore } from "../config/sessions/store-load.js"; -import { saveSessionStore, updateSessionStore } from "../config/sessions/store.js"; +import { + archiveRemovedSessionTranscripts, + saveSessionStore, + updateSessionStore, +} from "../config/sessions/store.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { resolveCronSession } from "../cron/isolated-agent/session.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; @@ -312,6 +316,68 @@ function resolveHeartbeatSession( }; } +function resolveIsolatedHeartbeatSessionKey(params: { + sessionKey: string; + configuredSessionKey: string; + sessionEntry?: { heartbeatIsolatedBaseSessionKey?: string }; +}) { + const storedBaseSessionKey = params.sessionEntry?.heartbeatIsolatedBaseSessionKey?.trim(); + if (storedBaseSessionKey) { + const suffix = params.sessionKey.slice(storedBaseSessionKey.length); + if ( + params.sessionKey.startsWith(storedBaseSessionKey) && + suffix.length > 0 && + /^(:heartbeat)+$/.test(suffix) + ) { + return { + isolatedSessionKey: `${storedBaseSessionKey}:heartbeat`, + isolatedBaseSessionKey: storedBaseSessionKey, + }; + } + } + + // Collapse repeated `:heartbeat` suffixes introduced by wake-triggered re-entry. + // The guard on configuredSessionKey ensures we do not strip a legitimate single + // `:heartbeat` suffix that is part of the user-configured base key itself + // (e.g. heartbeat.session: "alerts:heartbeat"). When the configured key already + // ends with `:heartbeat`, a forced wake passes `configuredKey:heartbeat` which + // must be treated as a new base rather than an existing isolated key. + const configuredSuffix = params.sessionKey.slice(params.configuredSessionKey.length); + if ( + params.sessionKey.startsWith(params.configuredSessionKey) && + /^(:heartbeat)+$/.test(configuredSuffix) && + !params.configuredSessionKey.endsWith(":heartbeat") + ) { + return { + isolatedSessionKey: `${params.configuredSessionKey}:heartbeat`, + isolatedBaseSessionKey: params.configuredSessionKey, + }; + } + return { + isolatedSessionKey: `${params.sessionKey}:heartbeat`, + isolatedBaseSessionKey: params.sessionKey, + }; +} + +function resolveStaleHeartbeatIsolatedSessionKey(params: { + sessionKey: string; + isolatedSessionKey: string; + isolatedBaseSessionKey: string; +}) { + if (params.sessionKey === params.isolatedSessionKey) { + return undefined; + } + const suffix = params.sessionKey.slice(params.isolatedBaseSessionKey.length); + if ( + params.sessionKey.startsWith(params.isolatedBaseSessionKey) && + suffix.length > 0 && + /^(:heartbeat)+$/.test(suffix) + ) { + return params.sessionKey; + } + return undefined; +} + function resolveHeartbeatReasoningPayloads( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload[] { @@ -717,17 +783,60 @@ export async function runHeartbeatOnce(opts: { let runSessionKey = sessionKey; if (useIsolatedSession) { - const isolatedKey = `${sessionKey}:heartbeat`; + const configuredSession = resolveHeartbeatSession(cfg, agentId, heartbeat); + // Collapse only the repeated `:heartbeat` suffixes introduced by wake-triggered + // re-entry for heartbeat-created isolated sessions. Real session keys that + // happen to end with `:heartbeat` still get a distinct isolated sibling. + const { isolatedSessionKey, isolatedBaseSessionKey } = resolveIsolatedHeartbeatSessionKey({ + sessionKey, + configuredSessionKey: configuredSession.sessionKey, + sessionEntry: entry, + }); const cronSession = resolveCronSession({ cfg, - sessionKey: isolatedKey, + sessionKey: isolatedSessionKey, agentId, nowMs: startedAt, forceNew: true, }); - cronSession.store[isolatedKey] = cronSession.sessionEntry; + const staleIsolatedSessionKey = resolveStaleHeartbeatIsolatedSessionKey({ + sessionKey, + isolatedSessionKey, + isolatedBaseSessionKey, + }); + const removedSessionFiles = new Map(); + if (staleIsolatedSessionKey) { + const staleEntry = cronSession.store[staleIsolatedSessionKey]; + if (staleEntry?.sessionId) { + removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile); + } + delete cronSession.store[staleIsolatedSessionKey]; + } + cronSession.sessionEntry.heartbeatIsolatedBaseSessionKey = isolatedBaseSessionKey; + cronSession.store[isolatedSessionKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); - runSessionKey = isolatedKey; + if (removedSessionFiles.size > 0) { + try { + const referencedSessionIds = new Set( + Object.values(cronSession.store) + .map((sessionEntry) => sessionEntry?.sessionId) + .filter((sessionId): sessionId is string => Boolean(sessionId)), + ); + await archiveRemovedSessionTranscripts({ + removedSessionFiles, + referencedSessionIds, + storePath: cronSession.storePath, + reason: "deleted", + restrictToStoreDir: true, + }); + } catch (err) { + log.warn("heartbeat: failed to archive stale isolated session transcript", { + err: String(err), + sessionKey: staleIsolatedSessionKey, + }); + } + } + runSessionKey = isolatedSessionKey; } // Update task last run times AFTER successful heartbeat completion