fix heartbeat event routing for main-scoped DMs

This commit is contained in:
Kaspre
2026-05-18 16:02:04 -04:00
committed by Peter Steinberger
parent d24cfcfa21
commit e53612a639
15 changed files with 604 additions and 35 deletions

View File

@@ -3,10 +3,14 @@ import path from "node:path";
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js";
import { onAgentEvent } from "../infra/agent-events.js";
import {
type EventSessionRoutingPolicy,
resolveEventSessionKeyForPolicy,
scopedHeartbeatWakeOptionsForPolicy,
} from "../infra/event-session-routing.js";
import { requestHeartbeat } from "../infra/heartbeat-wake.js";
import { appendRegularFile } from "../infra/regular-file.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { normalizeAssistantPhase } from "../shared/chat-message-content.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { recordTaskRunProgressByRunId } from "../tasks/detached-task-runtime.js";
@@ -104,6 +108,7 @@ export function startAcpSpawnParentStreamRelay(params: {
* Snapshotted with `mainKey` for the same start-time routing reason.
*/
sessionScope?: "per-sender" | "global";
eventRouting?: EventSessionRoutingPolicy;
logPath?: string;
deliveryContext?: DeliveryContext;
surfaceUpdates?: boolean;
@@ -204,20 +209,23 @@ export function startAcpSpawnParentStreamRelay(params: {
});
};
const shouldSurfaceUpdates = params.surfaceUpdates !== false;
const eventRouting = params.eventRouting ?? {
mainKey: params.mainKey,
sessionScope: params.sessionScope,
};
const wake = () => {
if (!shouldSurfaceUpdates) {
return;
}
requestHeartbeat(
scopedHeartbeatWakeOptions(
scopedHeartbeatWakeOptionsForPolicy(
parentSessionKey,
{
source: "acp-spawn",
intent: "event",
reason: "acp:spawn:stream",
},
params.mainKey,
params.sessionScope,
eventRouting,
),
);
};
@@ -231,7 +239,7 @@ export function startAcpSpawnParentStreamRelay(params: {
return;
}
enqueueSystemEvent(cleaned, {
sessionKey: resolveEventSessionKey(parentSessionKey, params.mainKey, params.sessionScope),
sessionKey: resolveEventSessionKeyForPolicy(parentSessionKey, eventRouting),
contextKey,
deliveryContext: params.deliveryContext,
});

View File

@@ -41,6 +41,7 @@ import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { callGateway } from "../gateway/call.js";
import { formatErrorMessage } from "../infra/errors.js";
import { resolveEventSessionRoutingPolicy } from "../infra/event-session-routing.js";
import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js";
import {
getSessionBindingService,
@@ -1426,6 +1427,9 @@ export async function spawnAcpDirect(
: undefined;
let parentRelay: AcpSpawnParentRelayHandle | undefined;
const parentEventRouting = parentSessionKey
? resolveEventSessionRoutingPolicy({ cfg, sessionKey: parentSessionKey })
: undefined;
if (effectiveStreamToParent && parentSessionKey) {
// Register relay before dispatch so fast lifecycle failures are not missed.
parentRelay = startAcpSpawnParentStreamRelay({
@@ -1435,6 +1439,7 @@ export async function spawnAcpDirect(
agentId: targetAgentId,
mainKey: cfg.session?.mainKey,
sessionScope: cfg.session?.scope,
eventRouting: parentEventRouting,
logPath: streamLogPath,
deliveryContext: parentDeliveryCtx,
emitStartNotice: false,
@@ -1491,6 +1496,7 @@ export async function spawnAcpDirect(
agentId: targetAgentId,
mainKey: cfg.session?.mainKey,
sessionScope: cfg.session?.scope,
eventRouting: parentEventRouting,
logPath: streamLogPath,
deliveryContext: parentDeliveryCtx,
emitStartNotice: false,

View File

@@ -1,4 +1,5 @@
import type { ChildProcessWithoutNullStreams } from "node:child_process";
import type { EventSessionRoutingPolicy } from "../infra/event-session-routing.js";
import type { TerminationReason } from "../process/supervisor/types.js";
import type { DeliveryContext } from "../utils/delivery-context.js";
import { createSessionSlug as createSessionSlugId } from "./session-slug.js";
@@ -46,6 +47,8 @@ export interface ProcessSession {
* of an agent-main queue the heartbeat never drains. Snapshotted with
* `mainKey` for the same start-time routing reason. */
sessionScope?: "per-sender" | "global";
/** Start-time routing policy for detached exec system events. */
eventRouting?: EventSessionRoutingPolicy;
notifyDeliveryContext?: DeliveryContext;
notifyOnExit?: boolean;
notifyOnExitEmptySuccess?: boolean;

View File

@@ -537,6 +537,39 @@ describe("emitExecSystemEvent", () => {
expect(requireHeartbeatCall()).not.toHaveProperty("sessionKey");
});
it("routes single-owner dmScope=main direct exec events to the agent main session", () => {
emitExecSystemEvent("Exec finished", {
sessionKey: "agent:main:telegram:default:direct:123",
contextKey: "exec:run-dm",
deliveryContext: {
channel: "telegram",
to: "123",
},
eventRouting: {
dmScope: "main",
allowFrom: ["123"],
channel: "telegram",
accountId: "default",
},
});
expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", {
sessionKey: "agent:main:main",
contextKey: "exec:run-dm",
deliveryContext: {
channel: "telegram",
to: "123",
},
forceSenderIsOwnerFalse: true,
trusted: false,
});
expect(requestHeartbeatMock).toHaveBeenCalledTimes(1);
const heartbeat = requireHeartbeatCall();
expect(heartbeat.coalesceMs).toBe(0);
expect(heartbeat.reason).toBe("exec-event");
expect(heartbeat.sessionKey).toBe("agent:main:main");
});
it("keeps wake unscoped for non-agent session keys", () => {
emitExecSystemEvent("Exec finished", {
sessionKey: "global",

View File

@@ -1,6 +1,11 @@
import path from "node:path";
import type { AgentToolResult } from "@earendil-works/pi-agent-core";
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import {
type EventSessionRoutingPolicy,
resolveEventSessionKeyForPolicy,
scopedHeartbeatWakeOptionsForPolicy,
} from "../infra/event-session-routing.js";
import {
DEFAULT_EXEC_APPROVAL_TIMEOUT_MS,
resolveExecApprovalAllowedDecisions,
@@ -12,7 +17,6 @@ import { requestHeartbeat } from "../infra/heartbeat-wake.js";
import { isDangerousHostInheritedEnvVarName } from "../infra/host-env-security.js";
import { findPathKey, mergePathPrepend, removePathPrepend } from "../infra/path-prepend.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { isSubagentSessionKey } from "../sessions/session-key-utils.js";
import type { ProcessSession } from "./bash-process-registry.js";
import type { ExecToolDetails } from "./bash-tools.exec-types.js";
@@ -340,15 +344,19 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile
const summary = output
? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}`
: `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`;
const eventRouting = session.eventRouting ?? {
mainKey: session.mainKey,
sessionScope: session.sessionScope,
};
enqueueSystemEvent(summary, {
sessionKey: resolveEventSessionKey(sessionKey, session.mainKey, session.sessionScope),
sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting),
deliveryContext: session.notifyDeliveryContext,
});
// Subagent sessions receive exec results via process poll and announce flow;
// the heartbeat would fall back to the main session and cause spurious wakes.
if (!isSubagentSessionKey(sessionKey)) {
requestHeartbeat(
scopedHeartbeatWakeOptions(
scopedHeartbeatWakeOptionsForPolicy(
sessionKey,
{
source: "exec-event",
@@ -356,8 +364,7 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile
reason: "exec-event",
coalesceMs: 0,
},
session.mainKey,
session.sessionScope,
eventRouting,
),
);
}
@@ -435,14 +442,19 @@ export function emitExecSystemEvent(
/** `session.scope` from the runtime config; needed so global-scope
* agents route cron-run events to the "global" queue. */
sessionScope?: "per-sender" | "global";
eventRouting?: EventSessionRoutingPolicy;
},
) {
const sessionKey = opts.sessionKey?.trim();
if (!sessionKey) {
return;
}
const eventRouting = opts.eventRouting ?? {
mainKey: opts.mainKey,
sessionScope: opts.sessionScope,
};
enqueueSystemEvent(text, {
sessionKey: resolveEventSessionKey(sessionKey, opts.mainKey, opts.sessionScope),
sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting),
contextKey: opts.contextKey,
deliveryContext: opts.deliveryContext,
});
@@ -450,7 +462,7 @@ export function emitExecSystemEvent(
// the heartbeat would fall back to the main session and cause spurious wakes.
if (!isSubagentSessionKey(sessionKey)) {
requestHeartbeat(
scopedHeartbeatWakeOptions(
scopedHeartbeatWakeOptionsForPolicy(
sessionKey,
{
source: "exec-event",
@@ -458,8 +470,7 @@ export function emitExecSystemEvent(
reason: "exec-event",
coalesceMs: 0,
},
opts.mainKey,
opts.sessionScope,
eventRouting,
),
);
}
@@ -636,6 +647,8 @@ export async function runExecProcess(opts: {
* `mainKey` so the cron-run remap can route global-scope agents to
* the "global" queue instead of agent-main. */
sessionScope?: "per-sender" | "global";
/** Start-time routing policy for detached exec system events. */
eventRouting?: EventSessionRoutingPolicy;
notifyDeliveryContext?: DeliveryContext;
timeoutSec: number | null;
onUpdate?: (partialResult: AgentToolResult<ExecToolDetails>) => void;
@@ -657,6 +670,7 @@ export async function runExecProcess(opts: {
sessionKey: opts.sessionKey,
mainKey: opts.mainKey,
sessionScope: opts.sessionScope,
eventRouting: opts.eventRouting,
notifyDeliveryContext: normalizeDeliveryContext(opts.notifyDeliveryContext),
notifyOnExit: opts.notifyOnExit,
notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true,

View File

@@ -1,3 +1,4 @@
import type { EventSessionRoutingPolicy } from "../infra/event-session-routing.js";
import type { ExecApprovalDecision } from "../infra/exec-approvals.js";
import type { ExecAsk, ExecHost, ExecSecurity, ExecTarget } from "../infra/exec-approvals.js";
import type { SafeBinProfileFixture } from "../infra/exec-safe-bin-policy.js";
@@ -38,6 +39,8 @@ export type ExecToolDefaults = {
* so the cron-run remap can route global-scope agents to the "global"
* queue instead of agent-main. */
sessionScope?: "per-sender" | "global";
/** Start-time routing policy for detached exec system events. */
eventRouting?: EventSessionRoutingPolicy;
messageProvider?: string;
currentChannelId?: string;
currentThreadTs?: string;

View File

@@ -1631,6 +1631,7 @@ export function createExecTool(
sessionKey: notifySessionKey,
mainKey: defaults?.mainKey,
sessionScope: defaults?.sessionScope,
eventRouting: defaults?.eventRouting,
notifyDeliveryContext,
timeoutSec: effectiveTimeout,
onUpdate,

View File

@@ -2,11 +2,15 @@ import crypto from "node:crypto";
import { shouldLogVerbose } from "../../globals.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { isTruthyEnvValue } from "../../infra/env.js";
import {
resolveEventSessionKeyForPolicy,
resolveEventSessionRoutingPolicy,
scopedHeartbeatWakeOptionsForPolicy,
} from "../../infra/event-session-routing.js";
import { requestHeartbeat as requestHeartbeatImpl } from "../../infra/heartbeat-wake.js";
import { sanitizeHostExecEnv } from "../../infra/host-env-security.js";
import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system-events.js";
import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js";
import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../../routing/session-key.js";
import { appendBootstrapPromptWarning } from "../bootstrap-budget.js";
import {
createCliJsonlStreamingParser,
@@ -635,25 +639,24 @@ export async function executePreparedCliRun(
"It may have been waiting for interactive input or an approval prompt.",
"For Claude Code, prefer --permission-mode bypassPermissions --print.",
].join(" ");
const watchdogMainKey = params.config?.session?.mainKey;
const watchdogScope = params.config?.session?.scope;
const eventRouting = resolveEventSessionRoutingPolicy({
cfg: params.config,
sessionKey: params.sessionKey,
channel: params.messageProvider,
accountId: params.agentAccountId,
});
executeDeps.enqueueSystemEvent(stallNotice, {
sessionKey: resolveEventSessionKey(
params.sessionKey,
watchdogMainKey,
watchdogScope,
),
sessionKey: resolveEventSessionKeyForPolicy(params.sessionKey, eventRouting),
});
executeDeps.requestHeartbeat(
scopedHeartbeatWakeOptions(
scopedHeartbeatWakeOptionsForPolicy(
params.sessionKey,
{
source: "cli-watchdog",
intent: "event",
reason: "cli:watchdog:stall",
},
watchdogMainKey,
watchdogScope,
eventRouting,
),
);
}

View File

@@ -7,6 +7,7 @@ import { resolveExecCommandHighlighting } from "../config/exec-command-highlight
import type { ModelCompatConfig } from "../config/types.models.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { DiagnosticTraceContext } from "../infra/diagnostic-trace-context.js";
import { resolveEventSessionRoutingPolicy } from "../infra/event-session-routing.js";
import { resolveMergedSafeBinProfileFixtures } from "../infra/exec-safe-bin-runtime-policy.js";
import { logWarn } from "../logger.js";
import { getPluginToolMeta } from "../plugins/tools.js";
@@ -743,6 +744,12 @@ export function createOpenClawCodingTools(options?: {
sessionKey: options?.sessionKey,
mainKey: options?.config?.session?.mainKey,
sessionScope: options?.config?.session?.scope,
eventRouting: resolveEventSessionRoutingPolicy({
cfg: options?.config,
sessionKey: options?.sessionKey,
channel: options?.messageProvider,
accountId: options?.agentAccountId,
}),
messageProvider: options?.messageProvider,
currentChannelId: options?.currentChannelId,
currentThreadTs: options?.currentThreadTs,

View File

@@ -23,6 +23,7 @@ import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
import { resolveCronStorePath } from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { resolveMainScopedEventSessionKey } from "../infra/event-session-routing.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeat } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
@@ -204,7 +205,13 @@ export function buildGatewayCronService(params: {
});
}
}
return canonical;
return (
resolveMainScopedEventSessionKey({
cfg: params.runtimeConfig,
sessionKey: canonical,
agentId: params.agentId,
}) ?? canonical
);
};
const resolveCronTarget = (opts?: {

View File

@@ -2,9 +2,13 @@ import { randomUUID } from "node:crypto";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { updatePairedDeviceMetadata } from "../infra/device-pairing.js";
import { formatErrorMessage } from "../infra/errors.js";
import {
resolveEventSessionKeyForPolicy,
resolveEventSessionRoutingPolicy,
scopedHeartbeatWakeOptionsForPolicy,
} from "../infra/event-session-routing.js";
import { updatePairedNodeMetadata } from "../infra/node-pairing.js";
import type { PromptImageOrderEntry } from "../media/prompt-image-order.js";
import { resolveEventSessionKey } from "../routing/session-key.js";
import {
NODE_PRESENCE_ALIVE_EVENT,
normalizeNodePresenceAliveReason,
@@ -38,7 +42,6 @@ import {
resolveSessionAgentId,
resolveSessionModelRef,
sanitizeInboundSystemTags,
scopedHeartbeatWakeOptions,
sendDurableMessageBatch,
updateSessionStore,
} from "./server-node-events.runtime.js";
@@ -766,8 +769,9 @@ export const handleNodeEvent = async (
}
}
const eventRouting = resolveEventSessionRoutingPolicy({ cfg, sessionKey });
const queued = enqueueSystemEvent(text, {
sessionKey: resolveEventSessionKey(sessionKey, cfg.session?.mainKey, cfg.session?.scope),
sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting),
contextKey: runId ? `exec:${runId}` : "exec",
});
if (queued) {
@@ -775,7 +779,7 @@ export const handleNodeEvent = async (
// keys should keep legacy unscoped behavior so enabled non-main heartbeat
// agents still run when no explicit agent session is provided.
requestHeartbeat(
scopedHeartbeatWakeOptions(
scopedHeartbeatWakeOptionsForPolicy(
sessionKey,
{
source: "exec-event",
@@ -783,8 +787,7 @@ export const handleNodeEvent = async (
reason: "exec-event",
coalesceMs: 0,
},
cfg.session?.mainKey,
cfg.session?.scope,
eventRouting,
),
);
}

View File

@@ -0,0 +1,127 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import {
parseDirectAgentSessionTarget,
resolveEventSessionKeyForPolicy,
resolveEventSessionRoutingPolicy,
resolveMainScopedEventSessionKey,
scopedHeartbeatWakeOptionsForPolicy,
} from "./event-session-routing.js";
describe("event session routing", () => {
it("parses per-peer, per-channel, and per-account direct session keys", () => {
expect(parseDirectAgentSessionTarget("agent:main:direct:123")).toEqual({
agentId: "main",
peerId: "123",
});
expect(parseDirectAgentSessionTarget("agent:main:telegram:direct:123")).toEqual({
agentId: "main",
channel: "telegram",
peerId: "123",
});
expect(parseDirectAgentSessionTarget("agent:main:telegram:work:direct:123")).toEqual({
agentId: "main",
channel: "telegram",
accountId: "work",
peerId: "123",
});
});
it("routes single-owner dmScope=main direct event keys to the agent main session", () => {
const cfg: OpenClawConfig = {
session: { dmScope: "main" },
channels: {
telegram: {
accounts: {
work: { allowFrom: ["123"] },
},
},
},
} as unknown as OpenClawConfig;
const policy = resolveEventSessionRoutingPolicy({
cfg,
sessionKey: "agent:main:telegram:work:direct:123",
});
expect(resolveEventSessionKeyForPolicy("agent:main:telegram:work:direct:123", policy)).toBe(
"agent:main:main",
);
expect(
scopedHeartbeatWakeOptionsForPolicy(
"agent:main:telegram:work:direct:123",
{ reason: "exec-event" },
policy,
),
).toEqual({ reason: "exec-event", sessionKey: "agent:main:main" });
});
it("does not route multi-owner or wildcard direct sessions to main", () => {
const baseCfg: OpenClawConfig = {
session: { dmScope: "main" },
channels: {
telegram: { allowFrom: ["123", "456"] },
},
} as unknown as OpenClawConfig;
expect(
resolveMainScopedEventSessionKey({
cfg: baseCfg,
sessionKey: "agent:main:telegram:default:direct:123",
}),
).toBeNull();
expect(
resolveMainScopedEventSessionKey({
cfg: {
...baseCfg,
channels: { telegram: { allowFrom: ["*"] } },
} as unknown as OpenClawConfig,
sessionKey: "agent:main:telegram:default:direct:123",
}),
).toBeNull();
});
it("preserves route-binding direct session overrides under global dmScope=main", () => {
const cfg: OpenClawConfig = {
session: { dmScope: "main" },
channels: {
telegram: {
accounts: {
work: { allowFrom: ["123"] },
},
},
},
bindings: [
{
type: "route",
agentId: "main",
match: {
channel: "telegram",
accountId: "work",
peer: { kind: "direct", id: "123" },
},
session: { dmScope: "per-account-channel-peer" },
},
],
} as unknown as OpenClawConfig;
const sessionKey = "agent:main:telegram:work:direct:123";
const policy = resolveEventSessionRoutingPolicy({ cfg, sessionKey });
expect(policy.preserveSessionKey).toBe(true);
expect(resolveEventSessionKeyForPolicy(sessionKey, policy)).toBe(sessionKey);
});
it("keeps cron-run remapping behavior unchanged", () => {
const policy = { mainKey: "primary", sessionScope: "per-sender" as const };
expect(resolveEventSessionKeyForPolicy("agent:ops:cron:nightly:run:abc", policy)).toBe(
"agent:ops:primary",
);
expect(
scopedHeartbeatWakeOptionsForPolicy(
"agent:ops:cron:nightly:run:abc",
{ reason: "exec-event" },
policy,
),
).toEqual({ reason: "exec-event", sessionKey: "agent:ops:primary" });
});
});

View File

@@ -0,0 +1,271 @@
import type { SessionScope } from "../config/types.base.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import {
buildAgentMainSessionKey,
normalizeAgentId,
parseAgentSessionKey,
} from "../routing/session-key.js";
import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js";
import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js";
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
type UnknownRecord = Record<string, unknown>;
export type EventSessionRoutingPolicy = {
mainKey?: string;
sessionScope?: SessionScope;
dmScope?: string | null;
allowFrom?: ReadonlyArray<string | number> | null;
channel?: string | null;
accountId?: string | null;
preserveSessionKey?: boolean;
};
type DirectSessionTarget = {
agentId: string;
channel?: string;
accountId?: string;
peerId: string;
};
function isRecord(value: unknown): value is UnknownRecord {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
function readAllowFrom(value: unknown): Array<string | number> | undefined {
if (!isRecord(value)) {
return undefined;
}
const allowFrom = value.allowFrom;
return Array.isArray(allowFrom) ? allowFrom : undefined;
}
function readDmAllowFrom(value: unknown): Array<string | number> | undefined {
if (!isRecord(value)) {
return undefined;
}
return readAllowFrom(value.dm);
}
function readAccountConfig(value: unknown): UnknownRecord | undefined {
return isRecord(value) && isRecord(value.config) ? value.config : undefined;
}
function firstConfiguredAllowFrom(
...candidates: Array<Array<string | number> | undefined>
): Array<string | number> | undefined {
return candidates.find((candidate) => candidate !== undefined);
}
function normalizeEntry(value: string): string | undefined {
return normalizeLowercaseStringOrEmpty(value) || undefined;
}
export function parseDirectAgentSessionTarget(
sessionKey: string | undefined | null,
): DirectSessionTarget | null {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed || deriveSessionChatTypeFromKey(sessionKey) !== "direct") {
return null;
}
const parts = parsed.rest.split(":");
const directIndex = parts.findIndex((part) => part === "direct" || part === "dm");
if (directIndex < 0 || directIndex > 2 || directIndex >= parts.length - 1) {
return null;
}
const peerId = normalizeLowercaseStringOrEmpty(parts.slice(directIndex + 1).join(":"));
if (!peerId) {
return null;
}
return {
agentId: parsed.agentId,
...(directIndex >= 1 ? { channel: normalizeLowercaseStringOrEmpty(parts[0]) } : {}),
...(directIndex >= 2 ? { accountId: normalizeLowercaseStringOrEmpty(parts[1]) } : {}),
peerId,
};
}
export function resolveEventSessionAllowFrom(params: {
cfg?: OpenClawConfig;
sessionKey?: string | null;
channel?: string | null;
accountId?: string | null;
}): Array<string | number> | undefined {
const cfg = params.cfg;
if (!cfg?.channels) {
return undefined;
}
const target = parseDirectAgentSessionTarget(params.sessionKey);
const channelKey = normalizeLowercaseStringOrEmpty(params.channel ?? target?.channel);
if (!channelKey) {
return undefined;
}
const channelConfig = isRecord(cfg.channels) ? cfg.channels[channelKey] : undefined;
if (!isRecord(channelConfig)) {
return undefined;
}
const accountId = normalizeLowercaseStringOrEmpty(params.accountId ?? target?.accountId);
const accountConfig =
accountId && isRecord(channelConfig.accounts) ? channelConfig.accounts[accountId] : undefined;
const accountNestedConfig = readAccountConfig(accountConfig);
return firstConfiguredAllowFrom(
readDmAllowFrom(accountConfig),
readDmAllowFrom(accountNestedConfig),
readAllowFrom(accountConfig),
readAllowFrom(accountNestedConfig),
readDmAllowFrom(channelConfig),
readAllowFrom(channelConfig),
);
}
function shouldPreserveDirectSessionKeyFromRoute(params: {
cfg?: OpenClawConfig;
sessionKey: string;
target: DirectSessionTarget | null;
}): boolean {
if (!params.cfg || !params.target?.channel) {
return false;
}
try {
const route = resolveAgentRoute({
cfg: params.cfg,
channel: params.target.channel,
accountId: params.target.accountId,
peer: { kind: "direct", id: params.target.peerId },
});
return (
route.lastRoutePolicy === "session" &&
normalizeLowercaseStringOrEmpty(route.sessionKey) ===
normalizeLowercaseStringOrEmpty(params.sessionKey)
);
} catch {
return false;
}
}
export function resolveEventSessionRoutingPolicy(params: {
cfg?: OpenClawConfig;
sessionKey?: string | null;
channel?: string | null;
accountId?: string | null;
dmScope?: string | null;
allowFrom?: ReadonlyArray<string | number> | null;
}): EventSessionRoutingPolicy {
const target = parseDirectAgentSessionTarget(params.sessionKey);
const channel = normalizeLowercaseStringOrEmpty(params.channel ?? target?.channel) || undefined;
const accountId =
normalizeLowercaseStringOrEmpty(params.accountId ?? target?.accountId) || undefined;
const allowFrom =
params.allowFrom ??
resolveEventSessionAllowFrom({
cfg: params.cfg,
sessionKey: params.sessionKey,
channel,
accountId,
});
return {
mainKey: params.cfg?.session?.mainKey,
sessionScope: params.cfg?.session?.scope,
dmScope: params.dmScope ?? params.cfg?.session?.dmScope,
allowFrom,
channel,
accountId,
preserveSessionKey: params.sessionKey
? shouldPreserveDirectSessionKeyFromRoute({
cfg: params.cfg,
sessionKey: params.sessionKey,
target,
})
: false,
};
}
export function resolveMainScopedEventSessionKey(params: {
cfg?: OpenClawConfig;
sessionKey: string;
agentId?: string | null;
policy?: EventSessionRoutingPolicy;
}): string | null {
const sessionKey = params.sessionKey.trim();
if (!sessionKey || params.policy?.preserveSessionKey === true) {
return null;
}
const parsed = parseAgentSessionKey(sessionKey);
const target = parseDirectAgentSessionTarget(sessionKey);
if (!parsed || !target) {
return null;
}
const resolvedAgentId = normalizeAgentId(params.agentId ?? target.agentId);
if (normalizeAgentId(target.agentId) !== resolvedAgentId) {
return null;
}
const policy =
params.policy ??
resolveEventSessionRoutingPolicy({
cfg: params.cfg,
sessionKey,
});
const allowFrom = Array.from(policy.allowFrom ?? []);
const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({
dmScope: policy.dmScope ?? params.cfg?.session?.dmScope,
allowFrom,
normalizeEntry,
});
if (!pinnedOwner || normalizeEntry(target.peerId) !== pinnedOwner) {
return null;
}
if (
shouldPreserveDirectSessionKeyFromRoute({
cfg: params.cfg,
sessionKey,
target,
})
) {
return null;
}
if (policy.sessionScope === "global") {
return "global";
}
return buildAgentMainSessionKey({
agentId: resolvedAgentId,
mainKey: policy.mainKey ?? params.cfg?.session?.mainKey,
});
}
export function resolveEventSessionKeyForPolicy(
sessionKey: string,
policy?: EventSessionRoutingPolicy,
): string {
const cronScoped = resolveEventSessionKey(sessionKey, policy?.mainKey, policy?.sessionScope);
if (cronScoped !== sessionKey) {
return cronScoped;
}
return resolveMainScopedEventSessionKey({ sessionKey, policy }) ?? sessionKey;
}
export function scopedHeartbeatWakeOptionsForPolicy<T extends object>(
sessionKey: string,
wakeOptions: T,
policy?: EventSessionRoutingPolicy,
): T | (T & { sessionKey: string }) | (T & { agentId: string }) {
const cronScoped = resolveEventSessionKey(sessionKey, policy?.mainKey, policy?.sessionScope);
if (cronScoped !== sessionKey) {
return scopedHeartbeatWakeOptions(
sessionKey,
wakeOptions,
policy?.mainKey,
policy?.sessionScope,
);
}
const mainScoped = resolveMainScopedEventSessionKey({ sessionKey, policy });
if (mainScoped) {
if (mainScoped === "global") {
const agentId = parseAgentSessionKey(sessionKey)?.agentId;
return agentId ? { ...wakeOptions, agentId } : wakeOptions;
}
return { ...wakeOptions, sessionKey: mainScoped };
}
return scopedHeartbeatWakeOptions(sessionKey, wakeOptions, policy?.mainKey, policy?.sessionScope);
}

View File

@@ -1,13 +1,22 @@
import fs from "node:fs/promises";
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { resolveMainSessionKey } from "../config/sessions.js";
import { runHeartbeatOnce } from "./heartbeat-runner.js";
import { installHeartbeatRunnerTestRuntime } from "./heartbeat-runner.test-harness.js";
import { withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js";
import {
enqueueSystemEvent,
peekSystemEventEntries,
resetSystemEventsForTest,
} from "./system-events.js";
installHeartbeatRunnerTestRuntime();
afterEach(() => {
resetSystemEventsForTest();
});
function requireFirstMockCall<T>(mock: { mock: { calls: T[][] } }, label: string): T[] {
const call = mock.mock.calls[0];
if (!call) {
@@ -86,4 +95,71 @@ describe("runHeartbeatOnce", () => {
expect(replyConfig).toBe(cfg);
});
});
it("routes single-owner dmScope=main direct event wakes to the main session", async () => {
await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => {
const cfg: OpenClawConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: {
every: "5m",
target: "telegram",
},
},
},
channels: {
telegram: {
allowFrom: ["123"],
},
},
session: { store: storePath, dmScope: "main" },
};
const mainSessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify({
[mainSessionKey]: {
sessionId: "sid-main",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123",
},
"agent:main:telegram:default:direct:123": {
sessionId: "sid-orphan",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "456",
},
}),
);
enqueueSystemEvent("Exec completed (run-dm, code 0)", {
sessionKey: mainSessionKey,
forceSenderIsOwnerFalse: true,
trusted: false,
});
replySpy.mockResolvedValue({ text: "NO_REPLY" });
await runHeartbeatOnce({
cfg,
sessionKey: "agent:main:telegram:default:direct:123",
source: "exec-event",
deps: {
getReplyFromConfig: replySpy,
telegram: vi.fn().mockResolvedValue({ messageId: "m1", chatId: "123" }),
getQueueSize: () => 0,
nowMs: () => 0,
},
});
expect(replySpy).toHaveBeenCalledTimes(1);
const [replyParams] = requireFirstMockCall(replySpy, "reply") as Parameters<typeof replySpy>;
expect(replyParams?.SessionKey).toBe(mainSessionKey);
expect(replyParams?.Body).toContain("async command completion event");
expect(peekSystemEventEntries(mainSessionKey)).toStrictEqual([]);
});
});
});

View File

@@ -91,6 +91,7 @@ import { escapeRegExp } from "../utils.js";
import { MAX_SAFE_TIMEOUT_DELAY_MS, resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js";
import { loadOrCreateDeviceIdentity } from "./device-identity.js";
import { formatErrorMessage, hasErrnoCode } from "./errors.js";
import { resolveMainScopedEventSessionKey } from "./event-session-routing.js";
import { isWithinActiveHours, resolveActiveHoursTimezone } from "./heartbeat-active-hours.js";
import { recordRunStart, shouldDeferWake, type DeferDecision } from "./heartbeat-cooldown.js";
import {
@@ -556,11 +557,17 @@ function resolveHeartbeatSession(
if (forcedCanonical !== "global" && !isSubagentSessionKey(forcedCanonical)) {
const sessionAgentId = resolveAgentIdFromSessionKey(forcedCanonical);
if (sessionAgentId === normalizeAgentId(resolvedAgentId)) {
const routedSessionKey =
resolveMainScopedEventSessionKey({
cfg,
sessionKey: forcedCanonical,
agentId: resolvedAgentId,
}) ?? forcedCanonical;
return {
sessionKey: forcedCanonical,
sessionKey: routedSessionKey,
storePath,
store,
entry: store[forcedCanonical],
entry: store[routedSessionKey],
suppressOriginatingContext: false,
};
}