diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index 06bbd25d9de..1503bc7e7e1 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -3,10 +3,14 @@ import path from "node:path"; import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { + type EventSessionRoutingPolicy, + resolveEventSessionKeyForPolicy, + scopedHeartbeatWakeOptionsForPolicy, +} from "../infra/event-session-routing.js"; import { requestHeartbeat } from "../infra/heartbeat-wake.js"; import { appendRegularFile } from "../infra/regular-file.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js"; import { normalizeAssistantPhase } from "../shared/chat-message-content.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { recordTaskRunProgressByRunId } from "../tasks/detached-task-runtime.js"; @@ -104,6 +108,7 @@ export function startAcpSpawnParentStreamRelay(params: { * Snapshotted with `mainKey` for the same start-time routing reason. */ sessionScope?: "per-sender" | "global"; + eventRouting?: EventSessionRoutingPolicy; logPath?: string; deliveryContext?: DeliveryContext; surfaceUpdates?: boolean; @@ -204,20 +209,23 @@ export function startAcpSpawnParentStreamRelay(params: { }); }; const shouldSurfaceUpdates = params.surfaceUpdates !== false; + const eventRouting = params.eventRouting ?? { + mainKey: params.mainKey, + sessionScope: params.sessionScope, + }; const wake = () => { if (!shouldSurfaceUpdates) { return; } requestHeartbeat( - scopedHeartbeatWakeOptions( + scopedHeartbeatWakeOptionsForPolicy( parentSessionKey, { source: "acp-spawn", intent: "event", reason: "acp:spawn:stream", }, - params.mainKey, - params.sessionScope, + eventRouting, ), ); }; @@ -231,7 +239,7 @@ export function startAcpSpawnParentStreamRelay(params: { return; } enqueueSystemEvent(cleaned, { - sessionKey: resolveEventSessionKey(parentSessionKey, params.mainKey, params.sessionScope), + sessionKey: resolveEventSessionKeyForPolicy(parentSessionKey, eventRouting), contextKey, deliveryContext: params.deliveryContext, }); diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index cd7747ff043..35df02cbf47 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -41,6 +41,7 @@ import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { callGateway } from "../gateway/call.js"; import { formatErrorMessage } from "../infra/errors.js"; +import { resolveEventSessionRoutingPolicy } from "../infra/event-session-routing.js"; import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js"; import { getSessionBindingService, @@ -1426,6 +1427,9 @@ export async function spawnAcpDirect( : undefined; let parentRelay: AcpSpawnParentRelayHandle | undefined; + const parentEventRouting = parentSessionKey + ? resolveEventSessionRoutingPolicy({ cfg, sessionKey: parentSessionKey }) + : undefined; if (effectiveStreamToParent && parentSessionKey) { // Register relay before dispatch so fast lifecycle failures are not missed. parentRelay = startAcpSpawnParentStreamRelay({ @@ -1435,6 +1439,7 @@ export async function spawnAcpDirect( agentId: targetAgentId, mainKey: cfg.session?.mainKey, sessionScope: cfg.session?.scope, + eventRouting: parentEventRouting, logPath: streamLogPath, deliveryContext: parentDeliveryCtx, emitStartNotice: false, @@ -1491,6 +1496,7 @@ export async function spawnAcpDirect( agentId: targetAgentId, mainKey: cfg.session?.mainKey, sessionScope: cfg.session?.scope, + eventRouting: parentEventRouting, logPath: streamLogPath, deliveryContext: parentDeliveryCtx, emitStartNotice: false, diff --git a/src/agents/bash-process-registry.ts b/src/agents/bash-process-registry.ts index 51cf73bc204..798ad66944f 100644 --- a/src/agents/bash-process-registry.ts +++ b/src/agents/bash-process-registry.ts @@ -1,4 +1,5 @@ import type { ChildProcessWithoutNullStreams } from "node:child_process"; +import type { EventSessionRoutingPolicy } from "../infra/event-session-routing.js"; import type { TerminationReason } from "../process/supervisor/types.js"; import type { DeliveryContext } from "../utils/delivery-context.js"; import { createSessionSlug as createSessionSlugId } from "./session-slug.js"; @@ -46,6 +47,8 @@ export interface ProcessSession { * of an agent-main queue the heartbeat never drains. Snapshotted with * `mainKey` for the same start-time routing reason. */ sessionScope?: "per-sender" | "global"; + /** Start-time routing policy for detached exec system events. */ + eventRouting?: EventSessionRoutingPolicy; notifyDeliveryContext?: DeliveryContext; notifyOnExit?: boolean; notifyOnExitEmptySuccess?: boolean; diff --git a/src/agents/bash-tools.exec-runtime.test.ts b/src/agents/bash-tools.exec-runtime.test.ts index 6b14d2de783..cc4e11173ca 100644 --- a/src/agents/bash-tools.exec-runtime.test.ts +++ b/src/agents/bash-tools.exec-runtime.test.ts @@ -537,6 +537,39 @@ describe("emitExecSystemEvent", () => { expect(requireHeartbeatCall()).not.toHaveProperty("sessionKey"); }); + it("routes single-owner dmScope=main direct exec events to the agent main session", () => { + emitExecSystemEvent("Exec finished", { + sessionKey: "agent:main:telegram:default:direct:123", + contextKey: "exec:run-dm", + deliveryContext: { + channel: "telegram", + to: "123", + }, + eventRouting: { + dmScope: "main", + allowFrom: ["123"], + channel: "telegram", + accountId: "default", + }, + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", { + sessionKey: "agent:main:main", + contextKey: "exec:run-dm", + deliveryContext: { + channel: "telegram", + to: "123", + }, + forceSenderIsOwnerFalse: true, + trusted: false, + }); + expect(requestHeartbeatMock).toHaveBeenCalledTimes(1); + const heartbeat = requireHeartbeatCall(); + expect(heartbeat.coalesceMs).toBe(0); + expect(heartbeat.reason).toBe("exec-event"); + expect(heartbeat.sessionKey).toBe("agent:main:main"); + }); + it("keeps wake unscoped for non-agent session keys", () => { emitExecSystemEvent("Exec finished", { sessionKey: "global", diff --git a/src/agents/bash-tools.exec-runtime.ts b/src/agents/bash-tools.exec-runtime.ts index e6a84134b73..ae22f5061ef 100644 --- a/src/agents/bash-tools.exec-runtime.ts +++ b/src/agents/bash-tools.exec-runtime.ts @@ -1,6 +1,11 @@ import path from "node:path"; import type { AgentToolResult } from "@earendil-works/pi-agent-core"; import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; +import { + type EventSessionRoutingPolicy, + resolveEventSessionKeyForPolicy, + scopedHeartbeatWakeOptionsForPolicy, +} from "../infra/event-session-routing.js"; import { DEFAULT_EXEC_APPROVAL_TIMEOUT_MS, resolveExecApprovalAllowedDecisions, @@ -12,7 +17,6 @@ import { requestHeartbeat } from "../infra/heartbeat-wake.js"; import { isDangerousHostInheritedEnvVarName } from "../infra/host-env-security.js"; import { findPathKey, mergePathPrepend, removePathPrepend } from "../infra/path-prepend.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js"; import { isSubagentSessionKey } from "../sessions/session-key-utils.js"; import type { ProcessSession } from "./bash-process-registry.js"; import type { ExecToolDetails } from "./bash-tools.exec-types.js"; @@ -340,15 +344,19 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile const summary = output ? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}` : `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`; + const eventRouting = session.eventRouting ?? { + mainKey: session.mainKey, + sessionScope: session.sessionScope, + }; enqueueSystemEvent(summary, { - sessionKey: resolveEventSessionKey(sessionKey, session.mainKey, session.sessionScope), + sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting), deliveryContext: session.notifyDeliveryContext, }); // Subagent sessions receive exec results via process poll and announce flow; // the heartbeat would fall back to the main session and cause spurious wakes. if (!isSubagentSessionKey(sessionKey)) { requestHeartbeat( - scopedHeartbeatWakeOptions( + scopedHeartbeatWakeOptionsForPolicy( sessionKey, { source: "exec-event", @@ -356,8 +364,7 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile reason: "exec-event", coalesceMs: 0, }, - session.mainKey, - session.sessionScope, + eventRouting, ), ); } @@ -435,14 +442,19 @@ export function emitExecSystemEvent( /** `session.scope` from the runtime config; needed so global-scope * agents route cron-run events to the "global" queue. */ sessionScope?: "per-sender" | "global"; + eventRouting?: EventSessionRoutingPolicy; }, ) { const sessionKey = opts.sessionKey?.trim(); if (!sessionKey) { return; } + const eventRouting = opts.eventRouting ?? { + mainKey: opts.mainKey, + sessionScope: opts.sessionScope, + }; enqueueSystemEvent(text, { - sessionKey: resolveEventSessionKey(sessionKey, opts.mainKey, opts.sessionScope), + sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting), contextKey: opts.contextKey, deliveryContext: opts.deliveryContext, }); @@ -450,7 +462,7 @@ export function emitExecSystemEvent( // the heartbeat would fall back to the main session and cause spurious wakes. if (!isSubagentSessionKey(sessionKey)) { requestHeartbeat( - scopedHeartbeatWakeOptions( + scopedHeartbeatWakeOptionsForPolicy( sessionKey, { source: "exec-event", @@ -458,8 +470,7 @@ export function emitExecSystemEvent( reason: "exec-event", coalesceMs: 0, }, - opts.mainKey, - opts.sessionScope, + eventRouting, ), ); } @@ -636,6 +647,8 @@ export async function runExecProcess(opts: { * `mainKey` so the cron-run remap can route global-scope agents to * the "global" queue instead of agent-main. */ sessionScope?: "per-sender" | "global"; + /** Start-time routing policy for detached exec system events. */ + eventRouting?: EventSessionRoutingPolicy; notifyDeliveryContext?: DeliveryContext; timeoutSec: number | null; onUpdate?: (partialResult: AgentToolResult) => void; @@ -657,6 +670,7 @@ export async function runExecProcess(opts: { sessionKey: opts.sessionKey, mainKey: opts.mainKey, sessionScope: opts.sessionScope, + eventRouting: opts.eventRouting, notifyDeliveryContext: normalizeDeliveryContext(opts.notifyDeliveryContext), notifyOnExit: opts.notifyOnExit, notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true, diff --git a/src/agents/bash-tools.exec-types.ts b/src/agents/bash-tools.exec-types.ts index a540a1cbdf6..dc24a0acc4d 100644 --- a/src/agents/bash-tools.exec-types.ts +++ b/src/agents/bash-tools.exec-types.ts @@ -1,3 +1,4 @@ +import type { EventSessionRoutingPolicy } from "../infra/event-session-routing.js"; import type { ExecApprovalDecision } from "../infra/exec-approvals.js"; import type { ExecAsk, ExecHost, ExecSecurity, ExecTarget } from "../infra/exec-approvals.js"; import type { SafeBinProfileFixture } from "../infra/exec-safe-bin-policy.js"; @@ -38,6 +39,8 @@ export type ExecToolDefaults = { * so the cron-run remap can route global-scope agents to the "global" * queue instead of agent-main. */ sessionScope?: "per-sender" | "global"; + /** Start-time routing policy for detached exec system events. */ + eventRouting?: EventSessionRoutingPolicy; messageProvider?: string; currentChannelId?: string; currentThreadTs?: string; diff --git a/src/agents/bash-tools.exec.ts b/src/agents/bash-tools.exec.ts index 486887b8c96..08828f68903 100644 --- a/src/agents/bash-tools.exec.ts +++ b/src/agents/bash-tools.exec.ts @@ -1631,6 +1631,7 @@ export function createExecTool( sessionKey: notifySessionKey, mainKey: defaults?.mainKey, sessionScope: defaults?.sessionScope, + eventRouting: defaults?.eventRouting, notifyDeliveryContext, timeoutSec: effectiveTimeout, onUpdate, diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index 52e227f69a3..82610a43010 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -2,11 +2,15 @@ import crypto from "node:crypto"; import { shouldLogVerbose } from "../../globals.js"; import { emitAgentEvent } from "../../infra/agent-events.js"; import { isTruthyEnvValue } from "../../infra/env.js"; +import { + resolveEventSessionKeyForPolicy, + resolveEventSessionRoutingPolicy, + scopedHeartbeatWakeOptionsForPolicy, +} from "../../infra/event-session-routing.js"; import { requestHeartbeat as requestHeartbeatImpl } from "../../infra/heartbeat-wake.js"; import { sanitizeHostExecEnv } from "../../infra/host-env-security.js"; import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system-events.js"; import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js"; -import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../../routing/session-key.js"; import { appendBootstrapPromptWarning } from "../bootstrap-budget.js"; import { createCliJsonlStreamingParser, @@ -635,25 +639,24 @@ export async function executePreparedCliRun( "It may have been waiting for interactive input or an approval prompt.", "For Claude Code, prefer --permission-mode bypassPermissions --print.", ].join(" "); - const watchdogMainKey = params.config?.session?.mainKey; - const watchdogScope = params.config?.session?.scope; + const eventRouting = resolveEventSessionRoutingPolicy({ + cfg: params.config, + sessionKey: params.sessionKey, + channel: params.messageProvider, + accountId: params.agentAccountId, + }); executeDeps.enqueueSystemEvent(stallNotice, { - sessionKey: resolveEventSessionKey( - params.sessionKey, - watchdogMainKey, - watchdogScope, - ), + sessionKey: resolveEventSessionKeyForPolicy(params.sessionKey, eventRouting), }); executeDeps.requestHeartbeat( - scopedHeartbeatWakeOptions( + scopedHeartbeatWakeOptionsForPolicy( params.sessionKey, { source: "cli-watchdog", intent: "event", reason: "cli:watchdog:stall", }, - watchdogMainKey, - watchdogScope, + eventRouting, ), ); } diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 62ae17650ca..b46f60e6735 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -7,6 +7,7 @@ import { resolveExecCommandHighlighting } from "../config/exec-command-highlight import type { ModelCompatConfig } from "../config/types.models.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { DiagnosticTraceContext } from "../infra/diagnostic-trace-context.js"; +import { resolveEventSessionRoutingPolicy } from "../infra/event-session-routing.js"; import { resolveMergedSafeBinProfileFixtures } from "../infra/exec-safe-bin-runtime-policy.js"; import { logWarn } from "../logger.js"; import { getPluginToolMeta } from "../plugins/tools.js"; @@ -743,6 +744,12 @@ export function createOpenClawCodingTools(options?: { sessionKey: options?.sessionKey, mainKey: options?.config?.session?.mainKey, sessionScope: options?.config?.session?.scope, + eventRouting: resolveEventSessionRoutingPolicy({ + cfg: options?.config, + sessionKey: options?.sessionKey, + channel: options?.messageProvider, + accountId: options?.agentAccountId, + }), messageProvider: options?.messageProvider, currentChannelId: options?.currentChannelId, currentThreadTs: options?.currentThreadTs, diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index ac59f71eb74..695f4e2086b 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -23,6 +23,7 @@ import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js"; import { resolveCronStorePath } from "../cron/store.js"; import type { CronJob } from "../cron/types.js"; import { formatErrorMessage } from "../infra/errors.js"; +import { resolveMainScopedEventSessionKey } from "../infra/event-session-routing.js"; import { runHeartbeatOnce } from "../infra/heartbeat-runner.js"; import { requestHeartbeat } from "../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; @@ -204,7 +205,13 @@ export function buildGatewayCronService(params: { }); } } - return canonical; + return ( + resolveMainScopedEventSessionKey({ + cfg: params.runtimeConfig, + sessionKey: canonical, + agentId: params.agentId, + }) ?? canonical + ); }; const resolveCronTarget = (opts?: { diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 6035c80ecc9..460fb020681 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -2,9 +2,13 @@ import { randomUUID } from "node:crypto"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { updatePairedDeviceMetadata } from "../infra/device-pairing.js"; import { formatErrorMessage } from "../infra/errors.js"; +import { + resolveEventSessionKeyForPolicy, + resolveEventSessionRoutingPolicy, + scopedHeartbeatWakeOptionsForPolicy, +} from "../infra/event-session-routing.js"; import { updatePairedNodeMetadata } from "../infra/node-pairing.js"; import type { PromptImageOrderEntry } from "../media/prompt-image-order.js"; -import { resolveEventSessionKey } from "../routing/session-key.js"; import { NODE_PRESENCE_ALIVE_EVENT, normalizeNodePresenceAliveReason, @@ -38,7 +42,6 @@ import { resolveSessionAgentId, resolveSessionModelRef, sanitizeInboundSystemTags, - scopedHeartbeatWakeOptions, sendDurableMessageBatch, updateSessionStore, } from "./server-node-events.runtime.js"; @@ -766,8 +769,9 @@ export const handleNodeEvent = async ( } } + const eventRouting = resolveEventSessionRoutingPolicy({ cfg, sessionKey }); const queued = enqueueSystemEvent(text, { - sessionKey: resolveEventSessionKey(sessionKey, cfg.session?.mainKey, cfg.session?.scope), + sessionKey: resolveEventSessionKeyForPolicy(sessionKey, eventRouting), contextKey: runId ? `exec:${runId}` : "exec", }); if (queued) { @@ -775,7 +779,7 @@ export const handleNodeEvent = async ( // keys should keep legacy unscoped behavior so enabled non-main heartbeat // agents still run when no explicit agent session is provided. requestHeartbeat( - scopedHeartbeatWakeOptions( + scopedHeartbeatWakeOptionsForPolicy( sessionKey, { source: "exec-event", @@ -783,8 +787,7 @@ export const handleNodeEvent = async ( reason: "exec-event", coalesceMs: 0, }, - cfg.session?.mainKey, - cfg.session?.scope, + eventRouting, ), ); } diff --git a/src/infra/event-session-routing.test.ts b/src/infra/event-session-routing.test.ts new file mode 100644 index 00000000000..f2f9f4c6d4b --- /dev/null +++ b/src/infra/event-session-routing.test.ts @@ -0,0 +1,127 @@ +import { describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import { + parseDirectAgentSessionTarget, + resolveEventSessionKeyForPolicy, + resolveEventSessionRoutingPolicy, + resolveMainScopedEventSessionKey, + scopedHeartbeatWakeOptionsForPolicy, +} from "./event-session-routing.js"; + +describe("event session routing", () => { + it("parses per-peer, per-channel, and per-account direct session keys", () => { + expect(parseDirectAgentSessionTarget("agent:main:direct:123")).toEqual({ + agentId: "main", + peerId: "123", + }); + expect(parseDirectAgentSessionTarget("agent:main:telegram:direct:123")).toEqual({ + agentId: "main", + channel: "telegram", + peerId: "123", + }); + expect(parseDirectAgentSessionTarget("agent:main:telegram:work:direct:123")).toEqual({ + agentId: "main", + channel: "telegram", + accountId: "work", + peerId: "123", + }); + }); + + it("routes single-owner dmScope=main direct event keys to the agent main session", () => { + const cfg: OpenClawConfig = { + session: { dmScope: "main" }, + channels: { + telegram: { + accounts: { + work: { allowFrom: ["123"] }, + }, + }, + }, + } as unknown as OpenClawConfig; + const policy = resolveEventSessionRoutingPolicy({ + cfg, + sessionKey: "agent:main:telegram:work:direct:123", + }); + + expect(resolveEventSessionKeyForPolicy("agent:main:telegram:work:direct:123", policy)).toBe( + "agent:main:main", + ); + expect( + scopedHeartbeatWakeOptionsForPolicy( + "agent:main:telegram:work:direct:123", + { reason: "exec-event" }, + policy, + ), + ).toEqual({ reason: "exec-event", sessionKey: "agent:main:main" }); + }); + + it("does not route multi-owner or wildcard direct sessions to main", () => { + const baseCfg: OpenClawConfig = { + session: { dmScope: "main" }, + channels: { + telegram: { allowFrom: ["123", "456"] }, + }, + } as unknown as OpenClawConfig; + + expect( + resolveMainScopedEventSessionKey({ + cfg: baseCfg, + sessionKey: "agent:main:telegram:default:direct:123", + }), + ).toBeNull(); + expect( + resolveMainScopedEventSessionKey({ + cfg: { + ...baseCfg, + channels: { telegram: { allowFrom: ["*"] } }, + } as unknown as OpenClawConfig, + sessionKey: "agent:main:telegram:default:direct:123", + }), + ).toBeNull(); + }); + + it("preserves route-binding direct session overrides under global dmScope=main", () => { + const cfg: OpenClawConfig = { + session: { dmScope: "main" }, + channels: { + telegram: { + accounts: { + work: { allowFrom: ["123"] }, + }, + }, + }, + bindings: [ + { + type: "route", + agentId: "main", + match: { + channel: "telegram", + accountId: "work", + peer: { kind: "direct", id: "123" }, + }, + session: { dmScope: "per-account-channel-peer" }, + }, + ], + } as unknown as OpenClawConfig; + const sessionKey = "agent:main:telegram:work:direct:123"; + const policy = resolveEventSessionRoutingPolicy({ cfg, sessionKey }); + + expect(policy.preserveSessionKey).toBe(true); + expect(resolveEventSessionKeyForPolicy(sessionKey, policy)).toBe(sessionKey); + }); + + it("keeps cron-run remapping behavior unchanged", () => { + const policy = { mainKey: "primary", sessionScope: "per-sender" as const }; + + expect(resolveEventSessionKeyForPolicy("agent:ops:cron:nightly:run:abc", policy)).toBe( + "agent:ops:primary", + ); + expect( + scopedHeartbeatWakeOptionsForPolicy( + "agent:ops:cron:nightly:run:abc", + { reason: "exec-event" }, + policy, + ), + ).toEqual({ reason: "exec-event", sessionKey: "agent:ops:primary" }); + }); +}); diff --git a/src/infra/event-session-routing.ts b/src/infra/event-session-routing.ts new file mode 100644 index 00000000000..3e0e9bc1358 --- /dev/null +++ b/src/infra/event-session-routing.ts @@ -0,0 +1,271 @@ +import type { SessionScope } from "../config/types.base.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { + buildAgentMainSessionKey, + normalizeAgentId, + parseAgentSessionKey, +} from "../routing/session-key.js"; +import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js"; +import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js"; +import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js"; +import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; + +type UnknownRecord = Record; + +export type EventSessionRoutingPolicy = { + mainKey?: string; + sessionScope?: SessionScope; + dmScope?: string | null; + allowFrom?: ReadonlyArray | null; + channel?: string | null; + accountId?: string | null; + preserveSessionKey?: boolean; +}; + +type DirectSessionTarget = { + agentId: string; + channel?: string; + accountId?: string; + peerId: string; +}; + +function isRecord(value: unknown): value is UnknownRecord { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function readAllowFrom(value: unknown): Array | undefined { + if (!isRecord(value)) { + return undefined; + } + const allowFrom = value.allowFrom; + return Array.isArray(allowFrom) ? allowFrom : undefined; +} + +function readDmAllowFrom(value: unknown): Array | undefined { + if (!isRecord(value)) { + return undefined; + } + return readAllowFrom(value.dm); +} + +function readAccountConfig(value: unknown): UnknownRecord | undefined { + return isRecord(value) && isRecord(value.config) ? value.config : undefined; +} + +function firstConfiguredAllowFrom( + ...candidates: Array | undefined> +): Array | undefined { + return candidates.find((candidate) => candidate !== undefined); +} + +function normalizeEntry(value: string): string | undefined { + return normalizeLowercaseStringOrEmpty(value) || undefined; +} + +export function parseDirectAgentSessionTarget( + sessionKey: string | undefined | null, +): DirectSessionTarget | null { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed || deriveSessionChatTypeFromKey(sessionKey) !== "direct") { + return null; + } + const parts = parsed.rest.split(":"); + const directIndex = parts.findIndex((part) => part === "direct" || part === "dm"); + if (directIndex < 0 || directIndex > 2 || directIndex >= parts.length - 1) { + return null; + } + const peerId = normalizeLowercaseStringOrEmpty(parts.slice(directIndex + 1).join(":")); + if (!peerId) { + return null; + } + return { + agentId: parsed.agentId, + ...(directIndex >= 1 ? { channel: normalizeLowercaseStringOrEmpty(parts[0]) } : {}), + ...(directIndex >= 2 ? { accountId: normalizeLowercaseStringOrEmpty(parts[1]) } : {}), + peerId, + }; +} + +export function resolveEventSessionAllowFrom(params: { + cfg?: OpenClawConfig; + sessionKey?: string | null; + channel?: string | null; + accountId?: string | null; +}): Array | undefined { + const cfg = params.cfg; + if (!cfg?.channels) { + return undefined; + } + const target = parseDirectAgentSessionTarget(params.sessionKey); + const channelKey = normalizeLowercaseStringOrEmpty(params.channel ?? target?.channel); + if (!channelKey) { + return undefined; + } + const channelConfig = isRecord(cfg.channels) ? cfg.channels[channelKey] : undefined; + if (!isRecord(channelConfig)) { + return undefined; + } + const accountId = normalizeLowercaseStringOrEmpty(params.accountId ?? target?.accountId); + const accountConfig = + accountId && isRecord(channelConfig.accounts) ? channelConfig.accounts[accountId] : undefined; + const accountNestedConfig = readAccountConfig(accountConfig); + return firstConfiguredAllowFrom( + readDmAllowFrom(accountConfig), + readDmAllowFrom(accountNestedConfig), + readAllowFrom(accountConfig), + readAllowFrom(accountNestedConfig), + readDmAllowFrom(channelConfig), + readAllowFrom(channelConfig), + ); +} + +function shouldPreserveDirectSessionKeyFromRoute(params: { + cfg?: OpenClawConfig; + sessionKey: string; + target: DirectSessionTarget | null; +}): boolean { + if (!params.cfg || !params.target?.channel) { + return false; + } + try { + const route = resolveAgentRoute({ + cfg: params.cfg, + channel: params.target.channel, + accountId: params.target.accountId, + peer: { kind: "direct", id: params.target.peerId }, + }); + return ( + route.lastRoutePolicy === "session" && + normalizeLowercaseStringOrEmpty(route.sessionKey) === + normalizeLowercaseStringOrEmpty(params.sessionKey) + ); + } catch { + return false; + } +} + +export function resolveEventSessionRoutingPolicy(params: { + cfg?: OpenClawConfig; + sessionKey?: string | null; + channel?: string | null; + accountId?: string | null; + dmScope?: string | null; + allowFrom?: ReadonlyArray | null; +}): EventSessionRoutingPolicy { + const target = parseDirectAgentSessionTarget(params.sessionKey); + const channel = normalizeLowercaseStringOrEmpty(params.channel ?? target?.channel) || undefined; + const accountId = + normalizeLowercaseStringOrEmpty(params.accountId ?? target?.accountId) || undefined; + const allowFrom = + params.allowFrom ?? + resolveEventSessionAllowFrom({ + cfg: params.cfg, + sessionKey: params.sessionKey, + channel, + accountId, + }); + return { + mainKey: params.cfg?.session?.mainKey, + sessionScope: params.cfg?.session?.scope, + dmScope: params.dmScope ?? params.cfg?.session?.dmScope, + allowFrom, + channel, + accountId, + preserveSessionKey: params.sessionKey + ? shouldPreserveDirectSessionKeyFromRoute({ + cfg: params.cfg, + sessionKey: params.sessionKey, + target, + }) + : false, + }; +} + +export function resolveMainScopedEventSessionKey(params: { + cfg?: OpenClawConfig; + sessionKey: string; + agentId?: string | null; + policy?: EventSessionRoutingPolicy; +}): string | null { + const sessionKey = params.sessionKey.trim(); + if (!sessionKey || params.policy?.preserveSessionKey === true) { + return null; + } + const parsed = parseAgentSessionKey(sessionKey); + const target = parseDirectAgentSessionTarget(sessionKey); + if (!parsed || !target) { + return null; + } + const resolvedAgentId = normalizeAgentId(params.agentId ?? target.agentId); + if (normalizeAgentId(target.agentId) !== resolvedAgentId) { + return null; + } + const policy = + params.policy ?? + resolveEventSessionRoutingPolicy({ + cfg: params.cfg, + sessionKey, + }); + const allowFrom = Array.from(policy.allowFrom ?? []); + const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({ + dmScope: policy.dmScope ?? params.cfg?.session?.dmScope, + allowFrom, + normalizeEntry, + }); + if (!pinnedOwner || normalizeEntry(target.peerId) !== pinnedOwner) { + return null; + } + if ( + shouldPreserveDirectSessionKeyFromRoute({ + cfg: params.cfg, + sessionKey, + target, + }) + ) { + return null; + } + if (policy.sessionScope === "global") { + return "global"; + } + return buildAgentMainSessionKey({ + agentId: resolvedAgentId, + mainKey: policy.mainKey ?? params.cfg?.session?.mainKey, + }); +} + +export function resolveEventSessionKeyForPolicy( + sessionKey: string, + policy?: EventSessionRoutingPolicy, +): string { + const cronScoped = resolveEventSessionKey(sessionKey, policy?.mainKey, policy?.sessionScope); + if (cronScoped !== sessionKey) { + return cronScoped; + } + return resolveMainScopedEventSessionKey({ sessionKey, policy }) ?? sessionKey; +} + +export function scopedHeartbeatWakeOptionsForPolicy( + sessionKey: string, + wakeOptions: T, + policy?: EventSessionRoutingPolicy, +): T | (T & { sessionKey: string }) | (T & { agentId: string }) { + const cronScoped = resolveEventSessionKey(sessionKey, policy?.mainKey, policy?.sessionScope); + if (cronScoped !== sessionKey) { + return scopedHeartbeatWakeOptions( + sessionKey, + wakeOptions, + policy?.mainKey, + policy?.sessionScope, + ); + } + const mainScoped = resolveMainScopedEventSessionKey({ sessionKey, policy }); + if (mainScoped) { + if (mainScoped === "global") { + const agentId = parseAgentSessionKey(sessionKey)?.agentId; + return agentId ? { ...wakeOptions, agentId } : wakeOptions; + } + return { ...wakeOptions, sessionKey: mainScoped }; + } + return scopedHeartbeatWakeOptions(sessionKey, wakeOptions, policy?.mainKey, policy?.sessionScope); +} diff --git a/src/infra/heartbeat-runner.subagent-session-guard.test.ts b/src/infra/heartbeat-runner.subagent-session-guard.test.ts index 93dfe57e9ba..9c15adaf2b2 100644 --- a/src/infra/heartbeat-runner.subagent-session-guard.test.ts +++ b/src/infra/heartbeat-runner.subagent-session-guard.test.ts @@ -1,13 +1,22 @@ import fs from "node:fs/promises"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { resolveMainSessionKey } from "../config/sessions.js"; import { runHeartbeatOnce } from "./heartbeat-runner.js"; import { installHeartbeatRunnerTestRuntime } from "./heartbeat-runner.test-harness.js"; import { withTempHeartbeatSandbox } from "./heartbeat-runner.test-utils.js"; +import { + enqueueSystemEvent, + peekSystemEventEntries, + resetSystemEventsForTest, +} from "./system-events.js"; installHeartbeatRunnerTestRuntime(); +afterEach(() => { + resetSystemEventsForTest(); +}); + function requireFirstMockCall(mock: { mock: { calls: T[][] } }, label: string): T[] { const call = mock.mock.calls[0]; if (!call) { @@ -86,4 +95,71 @@ describe("runHeartbeatOnce", () => { expect(replyConfig).toBe(cfg); }); }); + + it("routes single-owner dmScope=main direct event wakes to the main session", async () => { + await withTempHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "telegram", + }, + }, + }, + channels: { + telegram: { + allowFrom: ["123"], + }, + }, + session: { store: storePath, dmScope: "main" }, + }; + + const mainSessionKey = resolveMainSessionKey(cfg); + await fs.writeFile( + storePath, + JSON.stringify({ + [mainSessionKey]: { + sessionId: "sid-main", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123", + }, + "agent:main:telegram:default:direct:123": { + sessionId: "sid-orphan", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "456", + }, + }), + ); + enqueueSystemEvent("Exec completed (run-dm, code 0)", { + sessionKey: mainSessionKey, + forceSenderIsOwnerFalse: true, + trusted: false, + }); + replySpy.mockResolvedValue({ text: "NO_REPLY" }); + + await runHeartbeatOnce({ + cfg, + sessionKey: "agent:main:telegram:default:direct:123", + source: "exec-event", + deps: { + getReplyFromConfig: replySpy, + telegram: vi.fn().mockResolvedValue({ messageId: "m1", chatId: "123" }), + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const [replyParams] = requireFirstMockCall(replySpy, "reply") as Parameters; + expect(replyParams?.SessionKey).toBe(mainSessionKey); + expect(replyParams?.Body).toContain("async command completion event"); + expect(peekSystemEventEntries(mainSessionKey)).toStrictEqual([]); + }); + }); }); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index ef24f56f92d..3867a78896f 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -91,6 +91,7 @@ import { escapeRegExp } from "../utils.js"; import { MAX_SAFE_TIMEOUT_DELAY_MS, resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js"; import { loadOrCreateDeviceIdentity } from "./device-identity.js"; import { formatErrorMessage, hasErrnoCode } from "./errors.js"; +import { resolveMainScopedEventSessionKey } from "./event-session-routing.js"; import { isWithinActiveHours, resolveActiveHoursTimezone } from "./heartbeat-active-hours.js"; import { recordRunStart, shouldDeferWake, type DeferDecision } from "./heartbeat-cooldown.js"; import { @@ -556,11 +557,17 @@ function resolveHeartbeatSession( if (forcedCanonical !== "global" && !isSubagentSessionKey(forcedCanonical)) { const sessionAgentId = resolveAgentIdFromSessionKey(forcedCanonical); if (sessionAgentId === normalizeAgentId(resolvedAgentId)) { + const routedSessionKey = + resolveMainScopedEventSessionKey({ + cfg, + sessionKey: forcedCanonical, + agentId: resolvedAgentId, + }) ?? forcedCanonical; return { - sessionKey: forcedCanonical, + sessionKey: routedSessionKey, storePath, store, - entry: store[forcedCanonical], + entry: store[routedSessionKey], suppressOriginatingContext: false, }; }