mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-15 11:11:09 +00:00
fix: prevent isolated heartbeat session key :heartbeat suffix accumulation (#59606)
Merged via squash.
Prepared head SHA: c276211a8b
Co-authored-by: rogerdigital <13251150+rogerdigital@users.noreply.github.com>
Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
Reviewed-by: @jalehman
This commit is contained in:
@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/tailscale: start Tailscale exposure and the gateway update check before awaiting channel and plugin sidecar startup so remote operators are not locked out when startup sidecars stall.
|
||||
- QQBot/streaming: make block streaming configurable per QQ bot account via `streaming.mode` (`"partial"` | `"off"`, default `"partial"`) instead of hardcoding it off, so responses can be delivered incrementally. (#63746)
|
||||
- Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky.
|
||||
<<<<<<< HEAD
|
||||
- Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras.
|
||||
- Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman.
|
||||
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
|
||||
@@ -50,6 +51,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras
|
||||
- Gateway/agents: fix stale run-context TTL cleanup so the new maintenance sweep compiles and resets orphaned run sequence state correctly. (#52731) thanks @artwalker
|
||||
- Memory/lancedb: accept `dreaming` config when `memory-lancedb` owns the memory slot so Dreaming surfaces can read slot-owner settings without schema rejection. (#63874) Thanks @mbelinky.
|
||||
- Heartbeats/sessions: remove stale accumulated isolated heartbeat session keys when the next tick converges them back to the canonical sibling, so repaired sessions stop showing orphaned `:heartbeat:heartbeat` variants in session listings. (#59606) Thanks @rogerdigital.
|
||||
|
||||
## 2026.4.9
|
||||
|
||||
|
||||
@@ -116,6 +116,12 @@ export type SessionEntry = {
|
||||
lastHeartbeatText?: string;
|
||||
/** Timestamp (ms) when lastHeartbeatText was delivered. */
|
||||
lastHeartbeatSentAt?: number;
|
||||
/**
|
||||
* Base session key for heartbeat-created isolated sessions.
|
||||
* When present, `<base>:heartbeat` is a synthetic isolated session rather than
|
||||
* a real user/session-scoped key that merely happens to end with `:heartbeat`.
|
||||
*/
|
||||
heartbeatIsolatedBaseSessionKey?: string;
|
||||
/** Heartbeat task state (task name -> last run timestamp ms). */
|
||||
heartbeatTaskState?: Record<string, number>;
|
||||
sessionId: string;
|
||||
|
||||
392
src/infra/heartbeat-runner.isolated-key-stability.test.ts
Normal file
392
src/infra/heartbeat-runner.isolated-key-stability.test.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import * as replyModule from "../auto-reply/reply.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resolveMainSessionKey } from "../config/sessions.js";
|
||||
import { runHeartbeatOnce } from "./heartbeat-runner.js";
|
||||
import { seedSessionStore, withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js";
|
||||
|
||||
vi.mock("./outbound/deliver.js", () => ({
|
||||
deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined),
|
||||
}));
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("runHeartbeatOnce – isolated session key stability (#59493)", () => {
|
||||
/**
|
||||
* Simulates the wake-request feedback loop:
|
||||
* 1. Normal heartbeat tick produces sessionKey "agent:main:main:heartbeat"
|
||||
* 2. An exec/subagent event during that tick calls requestHeartbeatNow()
|
||||
* with the already-suffixed key "agent:main:main:heartbeat"
|
||||
* 3. The wake handler passes that key back into runHeartbeatOnce(sessionKey: ...)
|
||||
*
|
||||
* Before the fix, step 3 would append another ":heartbeat" producing
|
||||
* "agent:main:main:heartbeat:heartbeat". After the fix, the key remains
|
||||
* stable at "agent:main:main:heartbeat".
|
||||
*/
|
||||
async function runIsolatedHeartbeat(params: {
|
||||
tmpDir: string;
|
||||
storePath: string;
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey: string;
|
||||
}) {
|
||||
await seedSessionStore(params.storePath, params.sessionKey, {
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
});
|
||||
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
return replySpy.mock.calls[0]?.[0];
|
||||
}
|
||||
|
||||
function makeIsolatedHeartbeatConfig(tmpDir: string, storePath: string): OpenClawConfig {
|
||||
return {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
isolatedSession: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
}
|
||||
|
||||
function makeNamedIsolatedHeartbeatConfig(
|
||||
tmpDir: string,
|
||||
storePath: string,
|
||||
heartbeatSession: string,
|
||||
): OpenClawConfig {
|
||||
return {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: {
|
||||
every: "5m",
|
||||
target: "whatsapp",
|
||||
isolatedSession: true,
|
||||
session: heartbeatSession,
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
}
|
||||
|
||||
it("does not accumulate :heartbeat suffix when wake passes an already-suffixed key", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const baseSessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
// Simulate wake-request path: key already has :heartbeat from a previous tick.
|
||||
const alreadySuffixedKey = `${baseSessionKey}:heartbeat`;
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
[alreadySuffixedKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: 1,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
heartbeatIsolatedBaseSessionKey: baseSessionKey,
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
sessionKey: alreadySuffixedKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// Key must remain stable — no double :heartbeat suffix.
|
||||
expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(`${baseSessionKey}:heartbeat`);
|
||||
});
|
||||
});
|
||||
|
||||
it("appends :heartbeat exactly once from a clean base key", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const baseSessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
const ctx = await runIsolatedHeartbeat({
|
||||
tmpDir,
|
||||
storePath,
|
||||
cfg,
|
||||
sessionKey: baseSessionKey,
|
||||
});
|
||||
|
||||
expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`);
|
||||
});
|
||||
});
|
||||
|
||||
it("stays stable even with multiply-accumulated suffixes", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const baseSessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
// Simulate a key that already accumulated several :heartbeat suffixes
|
||||
// (from an unpatched gateway running for many ticks).
|
||||
const deeplyAccumulatedKey = `${baseSessionKey}:heartbeat:heartbeat:heartbeat`;
|
||||
|
||||
const ctx = await runIsolatedHeartbeat({
|
||||
tmpDir,
|
||||
storePath,
|
||||
cfg,
|
||||
sessionKey: deeplyAccumulatedKey,
|
||||
});
|
||||
|
||||
// After the fix, ALL trailing :heartbeat suffixes are stripped by the
|
||||
// (:heartbeat)+$ regex in a single pass, then exactly one is re-appended.
|
||||
// A deeply accumulated key converges to "<base>:heartbeat" in one call.
|
||||
expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`);
|
||||
|
||||
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
|
||||
string,
|
||||
{ heartbeatIsolatedBaseSessionKey?: string }
|
||||
>;
|
||||
expect(store[deeplyAccumulatedKey]).toBeUndefined();
|
||||
expect(store[`${baseSessionKey}:heartbeat`]).toMatchObject({
|
||||
heartbeatIsolatedBaseSessionKey: baseSessionKey,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps isolated keys distinct when the configured base key already ends with :heartbeat", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeNamedIsolatedHeartbeatConfig(tmpDir, storePath, "alerts:heartbeat");
|
||||
const baseSessionKey = "agent:main:alerts:heartbeat";
|
||||
|
||||
const ctx = await runIsolatedHeartbeat({
|
||||
tmpDir,
|
||||
storePath,
|
||||
cfg,
|
||||
sessionKey: baseSessionKey,
|
||||
});
|
||||
|
||||
expect(ctx?.SessionKey).toBe(`${baseSessionKey}:heartbeat`);
|
||||
});
|
||||
});
|
||||
|
||||
it("stays stable for wake re-entry when the configured base key already ends with :heartbeat", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeNamedIsolatedHeartbeatConfig(tmpDir, storePath, "alerts:heartbeat");
|
||||
const baseSessionKey = "agent:main:alerts:heartbeat";
|
||||
const alreadyIsolatedKey = `${baseSessionKey}:heartbeat`;
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
[alreadyIsolatedKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: 1,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
heartbeatIsolatedBaseSessionKey: baseSessionKey,
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
sessionKey: alreadyIsolatedKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(alreadyIsolatedKey);
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps a forced real :heartbeat session distinct from the heartbeat-isolated sibling", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const realSessionKey = "agent:main:alerts:heartbeat";
|
||||
|
||||
const ctx = await runIsolatedHeartbeat({
|
||||
tmpDir,
|
||||
storePath,
|
||||
cfg,
|
||||
sessionKey: realSessionKey,
|
||||
});
|
||||
|
||||
expect(ctx?.SessionKey).toBe(`${realSessionKey}:heartbeat`);
|
||||
});
|
||||
});
|
||||
|
||||
it("stays stable when a forced real :heartbeat session re-enters through its isolated sibling", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const realSessionKey = "agent:main:alerts:heartbeat";
|
||||
const isolatedSessionKey = `${realSessionKey}:heartbeat`;
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
[isolatedSessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: 1,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
heartbeatIsolatedBaseSessionKey: realSessionKey,
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
sessionKey: isolatedSessionKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
expect(replySpy).toHaveBeenCalledTimes(1);
|
||||
expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(isolatedSessionKey);
|
||||
});
|
||||
});
|
||||
|
||||
it("does not create an isolated session when task-based heartbeat skips for no-tasks-due", async () => {
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg: OpenClawConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: {
|
||||
isolatedSession: true,
|
||||
target: "whatsapp",
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const baseSessionKey = resolveMainSessionKey(cfg);
|
||||
const isolatedSessionKey = `${baseSessionKey}:heartbeat`;
|
||||
await fs.writeFile(
|
||||
`${tmpDir}/HEARTBEAT.md`,
|
||||
`tasks:
|
||||
- name: daily-check
|
||||
interval: 1d
|
||||
prompt: "Check status"
|
||||
`,
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
[baseSessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: 1,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
heartbeatTaskState: {
|
||||
"daily-check": 1,
|
||||
},
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
const result = await runHeartbeatOnce({
|
||||
cfg,
|
||||
sessionKey: baseSessionKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 2,
|
||||
},
|
||||
});
|
||||
|
||||
expect(result).toEqual({ status: "skipped", reason: "no-tasks-due" });
|
||||
expect(replySpy).not.toHaveBeenCalled();
|
||||
|
||||
const store = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<string, unknown>;
|
||||
expect(store[isolatedSessionKey]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("converges a legacy isolated key that lacks the stored marker (single :heartbeat suffix)", async () => {
|
||||
// Regression for: when an isolated session was created before
|
||||
// heartbeatIsolatedBaseSessionKey was introduced, sessionKey already equals
|
||||
// "<base>:heartbeat" but the stored entry has no marker. The fallback used to
|
||||
// treat "<base>:heartbeat" as the new base and persist it as the marker, so
|
||||
// the next wake re-entry would stabilise at "<base>:heartbeat:heartbeat"
|
||||
// instead of converging back to "<base>:heartbeat".
|
||||
await withTempHeartbeatSandbox(async ({ tmpDir, storePath }) => {
|
||||
const cfg = makeIsolatedHeartbeatConfig(tmpDir, storePath);
|
||||
const baseSessionKey = resolveMainSessionKey(cfg);
|
||||
const legacyIsolatedKey = `${baseSessionKey}:heartbeat`;
|
||||
|
||||
// Legacy entry: has :heartbeat suffix but no heartbeatIsolatedBaseSessionKey marker.
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify({
|
||||
[legacyIsolatedKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: 1,
|
||||
lastChannel: "whatsapp",
|
||||
lastProvider: "whatsapp",
|
||||
lastTo: "+1555",
|
||||
},
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
sessionKey: legacyIsolatedKey,
|
||||
deps: {
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// Must converge to the same canonical isolated key, not produce :heartbeat:heartbeat.
|
||||
expect(replySpy.mock.calls[0]?.[0]?.SessionKey).toBe(legacyIsolatedKey);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -35,7 +35,11 @@ import {
|
||||
} from "../config/sessions/main-session.js";
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import { loadSessionStore } from "../config/sessions/store-load.js";
|
||||
import { saveSessionStore, updateSessionStore } from "../config/sessions/store.js";
|
||||
import {
|
||||
archiveRemovedSessionTranscripts,
|
||||
saveSessionStore,
|
||||
updateSessionStore,
|
||||
} from "../config/sessions/store.js";
|
||||
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
|
||||
import { resolveCronSession } from "../cron/isolated-agent/session.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
@@ -312,6 +316,68 @@ function resolveHeartbeatSession(
|
||||
};
|
||||
}
|
||||
|
||||
function resolveIsolatedHeartbeatSessionKey(params: {
|
||||
sessionKey: string;
|
||||
configuredSessionKey: string;
|
||||
sessionEntry?: { heartbeatIsolatedBaseSessionKey?: string };
|
||||
}) {
|
||||
const storedBaseSessionKey = params.sessionEntry?.heartbeatIsolatedBaseSessionKey?.trim();
|
||||
if (storedBaseSessionKey) {
|
||||
const suffix = params.sessionKey.slice(storedBaseSessionKey.length);
|
||||
if (
|
||||
params.sessionKey.startsWith(storedBaseSessionKey) &&
|
||||
suffix.length > 0 &&
|
||||
/^(:heartbeat)+$/.test(suffix)
|
||||
) {
|
||||
return {
|
||||
isolatedSessionKey: `${storedBaseSessionKey}:heartbeat`,
|
||||
isolatedBaseSessionKey: storedBaseSessionKey,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Collapse repeated `:heartbeat` suffixes introduced by wake-triggered re-entry.
|
||||
// The guard on configuredSessionKey ensures we do not strip a legitimate single
|
||||
// `:heartbeat` suffix that is part of the user-configured base key itself
|
||||
// (e.g. heartbeat.session: "alerts:heartbeat"). When the configured key already
|
||||
// ends with `:heartbeat`, a forced wake passes `configuredKey:heartbeat` which
|
||||
// must be treated as a new base rather than an existing isolated key.
|
||||
const configuredSuffix = params.sessionKey.slice(params.configuredSessionKey.length);
|
||||
if (
|
||||
params.sessionKey.startsWith(params.configuredSessionKey) &&
|
||||
/^(:heartbeat)+$/.test(configuredSuffix) &&
|
||||
!params.configuredSessionKey.endsWith(":heartbeat")
|
||||
) {
|
||||
return {
|
||||
isolatedSessionKey: `${params.configuredSessionKey}:heartbeat`,
|
||||
isolatedBaseSessionKey: params.configuredSessionKey,
|
||||
};
|
||||
}
|
||||
return {
|
||||
isolatedSessionKey: `${params.sessionKey}:heartbeat`,
|
||||
isolatedBaseSessionKey: params.sessionKey,
|
||||
};
|
||||
}
|
||||
|
||||
function resolveStaleHeartbeatIsolatedSessionKey(params: {
|
||||
sessionKey: string;
|
||||
isolatedSessionKey: string;
|
||||
isolatedBaseSessionKey: string;
|
||||
}) {
|
||||
if (params.sessionKey === params.isolatedSessionKey) {
|
||||
return undefined;
|
||||
}
|
||||
const suffix = params.sessionKey.slice(params.isolatedBaseSessionKey.length);
|
||||
if (
|
||||
params.sessionKey.startsWith(params.isolatedBaseSessionKey) &&
|
||||
suffix.length > 0 &&
|
||||
/^(:heartbeat)+$/.test(suffix)
|
||||
) {
|
||||
return params.sessionKey;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveHeartbeatReasoningPayloads(
|
||||
replyResult: ReplyPayload | ReplyPayload[] | undefined,
|
||||
): ReplyPayload[] {
|
||||
@@ -717,17 +783,60 @@ export async function runHeartbeatOnce(opts: {
|
||||
|
||||
let runSessionKey = sessionKey;
|
||||
if (useIsolatedSession) {
|
||||
const isolatedKey = `${sessionKey}:heartbeat`;
|
||||
const configuredSession = resolveHeartbeatSession(cfg, agentId, heartbeat);
|
||||
// Collapse only the repeated `:heartbeat` suffixes introduced by wake-triggered
|
||||
// re-entry for heartbeat-created isolated sessions. Real session keys that
|
||||
// happen to end with `:heartbeat` still get a distinct isolated sibling.
|
||||
const { isolatedSessionKey, isolatedBaseSessionKey } = resolveIsolatedHeartbeatSessionKey({
|
||||
sessionKey,
|
||||
configuredSessionKey: configuredSession.sessionKey,
|
||||
sessionEntry: entry,
|
||||
});
|
||||
const cronSession = resolveCronSession({
|
||||
cfg,
|
||||
sessionKey: isolatedKey,
|
||||
sessionKey: isolatedSessionKey,
|
||||
agentId,
|
||||
nowMs: startedAt,
|
||||
forceNew: true,
|
||||
});
|
||||
cronSession.store[isolatedKey] = cronSession.sessionEntry;
|
||||
const staleIsolatedSessionKey = resolveStaleHeartbeatIsolatedSessionKey({
|
||||
sessionKey,
|
||||
isolatedSessionKey,
|
||||
isolatedBaseSessionKey,
|
||||
});
|
||||
const removedSessionFiles = new Map<string, string | undefined>();
|
||||
if (staleIsolatedSessionKey) {
|
||||
const staleEntry = cronSession.store[staleIsolatedSessionKey];
|
||||
if (staleEntry?.sessionId) {
|
||||
removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile);
|
||||
}
|
||||
delete cronSession.store[staleIsolatedSessionKey];
|
||||
}
|
||||
cronSession.sessionEntry.heartbeatIsolatedBaseSessionKey = isolatedBaseSessionKey;
|
||||
cronSession.store[isolatedSessionKey] = cronSession.sessionEntry;
|
||||
await saveSessionStore(cronSession.storePath, cronSession.store);
|
||||
runSessionKey = isolatedKey;
|
||||
if (removedSessionFiles.size > 0) {
|
||||
try {
|
||||
const referencedSessionIds = new Set(
|
||||
Object.values(cronSession.store)
|
||||
.map((sessionEntry) => sessionEntry?.sessionId)
|
||||
.filter((sessionId): sessionId is string => Boolean(sessionId)),
|
||||
);
|
||||
await archiveRemovedSessionTranscripts({
|
||||
removedSessionFiles,
|
||||
referencedSessionIds,
|
||||
storePath: cronSession.storePath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn("heartbeat: failed to archive stale isolated session transcript", {
|
||||
err: String(err),
|
||||
sessionKey: staleIsolatedSessionKey,
|
||||
});
|
||||
}
|
||||
}
|
||||
runSessionKey = isolatedSessionKey;
|
||||
}
|
||||
|
||||
// Update task last run times AFTER successful heartbeat completion
|
||||
|
||||
Reference in New Issue
Block a user