diff --git a/src/auto-reply/reply/session-delivery.ts b/src/auto-reply/reply/session-delivery.ts new file mode 100644 index 00000000000..f49ab9b0182 --- /dev/null +++ b/src/auto-reply/reply/session-delivery.ts @@ -0,0 +1,145 @@ +import type { SessionEntry } from "../../config/sessions.js"; +import { buildAgentMainSessionKey } from "../../routing/session-key.js"; +import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; +import { + deliveryContextFromSession, + deliveryContextKey, + normalizeDeliveryContext, +} from "../../utils/delivery-context.js"; +import { + INTERNAL_MESSAGE_CHANNEL, + isDeliverableMessageChannel, + normalizeMessageChannel, +} from "../../utils/message-channel.js"; +import type { MsgContext } from "../templating.js"; + +export type LegacyMainDeliveryRetirement = { + key: string; + entry: SessionEntry; +}; + +function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed?.rest) { + return undefined; + } + const head = parsed.rest.split(":")[0]?.trim().toLowerCase(); + if (!head || head === "main" || head === "cron" || head === "subagent" || head === "acp") { + return undefined; + } + return normalizeMessageChannel(head); +} + +function isExternalRoutingChannel(channel?: string): channel is string { + return Boolean( + channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel), + ); +} + +export function resolveLastChannelRaw(params: { + originatingChannelRaw?: string; + persistedLastChannel?: string; + sessionKey?: string; +}): string | undefined { + const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw); + const persistedChannel = normalizeMessageChannel(params.persistedLastChannel); + const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey); + let resolved = params.originatingChannelRaw || params.persistedLastChannel; + // Internal/non-deliverable sources should not overwrite previously known + // external delivery routes (or explicit channel hints from the session key). + if (!isExternalRoutingChannel(originatingChannel)) { + if (isExternalRoutingChannel(persistedChannel)) { + resolved = persistedChannel; + } else if (isExternalRoutingChannel(sessionKeyChannelHint)) { + resolved = sessionKeyChannelHint; + } + } + return resolved; +} + +export function resolveLastToRaw(params: { + originatingChannelRaw?: string; + originatingToRaw?: string; + toRaw?: string; + persistedLastTo?: string; + persistedLastChannel?: string; + sessionKey?: string; +}): string | undefined { + const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw); + const persistedChannel = normalizeMessageChannel(params.persistedLastChannel); + const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey); + + // When the turn originates from an internal/non-deliverable source, do not + // replace an established external destination with internal routing ids + // (e.g., session/webchat ids). + if (!isExternalRoutingChannel(originatingChannel)) { + const hasExternalFallback = + isExternalRoutingChannel(persistedChannel) || isExternalRoutingChannel(sessionKeyChannelHint); + if (hasExternalFallback && params.persistedLastTo) { + return params.persistedLastTo; + } + } + + return params.originatingToRaw || params.toRaw || params.persistedLastTo; +} + +export function maybeRetireLegacyMainDeliveryRoute(params: { + sessionCfg: { dmScope?: string } | undefined; + sessionKey: string; + sessionStore: Record; + agentId: string; + mainKey: string; + isGroup: boolean; + ctx: MsgContext; +}): LegacyMainDeliveryRetirement | undefined { + const dmScope = params.sessionCfg?.dmScope ?? "main"; + if (dmScope === "main" || params.isGroup) { + return undefined; + } + const canonicalMainSessionKey = buildAgentMainSessionKey({ + agentId: params.agentId, + mainKey: params.mainKey, + }).toLowerCase(); + if (params.sessionKey === canonicalMainSessionKey) { + return undefined; + } + const legacyMain = params.sessionStore[canonicalMainSessionKey]; + if (!legacyMain) { + return undefined; + } + const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain)); + if (!legacyRouteKey) { + return undefined; + } + const activeDirectRouteKey = deliveryContextKey( + normalizeDeliveryContext({ + channel: params.ctx.OriginatingChannel as string | undefined, + to: params.ctx.OriginatingTo || params.ctx.To, + accountId: params.ctx.AccountId, + threadId: params.ctx.MessageThreadId, + }), + ); + if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) { + return undefined; + } + if ( + legacyMain.deliveryContext === undefined && + legacyMain.lastChannel === undefined && + legacyMain.lastTo === undefined && + legacyMain.lastAccountId === undefined && + legacyMain.lastThreadId === undefined + ) { + return undefined; + } + return { + key: canonicalMainSessionKey, + entry: { + ...legacyMain, + deliveryContext: undefined, + lastChannel: undefined, + lastTo: undefined, + lastAccountId: undefined, + lastThreadId: undefined, + }, + }; +} diff --git a/src/auto-reply/reply/session-fork.ts b/src/auto-reply/reply/session-fork.ts new file mode 100644 index 00000000000..84c5eb0079d --- /dev/null +++ b/src/auto-reply/reply/session-fork.ts @@ -0,0 +1,63 @@ +import crypto from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; +import type { OpenClawConfig } from "../../config/config.js"; +import { resolveSessionFilePath, type SessionEntry } from "../../config/sessions.js"; + +/** + * Default max parent token count beyond which thread/session parent forking is skipped. + * This prevents new thread sessions from inheriting near-full parent context. + * See #26905. + */ +const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000; + +export function resolveParentForkMaxTokens(cfg: OpenClawConfig): number { + const configured = cfg.session?.parentForkMaxTokens; + if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) { + return Math.floor(configured); + } + return DEFAULT_PARENT_FORK_MAX_TOKENS; +} + +export function forkSessionFromParent(params: { + parentEntry: SessionEntry; + agentId: string; + sessionsDir: string; +}): { sessionId: string; sessionFile: string } | null { + const parentSessionFile = resolveSessionFilePath( + params.parentEntry.sessionId, + params.parentEntry, + { agentId: params.agentId, sessionsDir: params.sessionsDir }, + ); + if (!parentSessionFile || !fs.existsSync(parentSessionFile)) { + return null; + } + try { + const manager = SessionManager.open(parentSessionFile); + const leafId = manager.getLeafId(); + if (leafId) { + const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile(); + const sessionId = manager.getSessionId(); + if (sessionFile && sessionId) { + return { sessionId, sessionFile }; + } + } + const sessionId = crypto.randomUUID(); + const timestamp = new Date().toISOString(); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp, + cwd: manager.getCwd(), + parentSession: parentSessionFile, + }; + fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8"); + return { sessionId, sessionFile }; + } catch { + return null; + } +} diff --git a/src/auto-reply/reply/session-hooks.ts b/src/auto-reply/reply/session-hooks.ts new file mode 100644 index 00000000000..8e22dc247bc --- /dev/null +++ b/src/auto-reply/reply/session-hooks.ts @@ -0,0 +1,66 @@ +import { resolveSessionAgentId } from "../../agents/agent-scope.js"; +import type { OpenClawConfig } from "../../config/config.js"; + +export type SessionHookContext = { + sessionId: string; + sessionKey: string; + agentId: string; +}; + +function buildSessionHookContext(params: { + sessionId: string; + sessionKey: string; + cfg: OpenClawConfig; +}): SessionHookContext { + return { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }), + }; +} + +export function buildSessionStartHookPayload(params: { + sessionId: string; + sessionKey: string; + cfg: OpenClawConfig; + resumedFrom?: string; +}): { + event: { sessionId: string; sessionKey: string; resumedFrom?: string }; + context: SessionHookContext; +} { + return { + event: { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + resumedFrom: params.resumedFrom, + }, + context: buildSessionHookContext({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + cfg: params.cfg, + }), + }; +} + +export function buildSessionEndHookPayload(params: { + sessionId: string; + sessionKey: string; + cfg: OpenClawConfig; + messageCount?: number; +}): { + event: { sessionId: string; sessionKey: string; messageCount: number }; + context: SessionHookContext; +} { + return { + event: { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + messageCount: params.messageCount ?? 0, + }, + context: buildSessionHookContext({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + cfg: params.cfg, + }), + }; +} diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 0af56ec6118..4cb4b986e72 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -1,7 +1,5 @@ import crypto from "node:crypto"; -import fs from "node:fs"; import path from "node:path"; -import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { normalizeChatType } from "../../channels/chat-type.js"; import type { OpenClawConfig } from "../../config/config.js"; @@ -17,7 +15,6 @@ import { resolveSessionResetPolicy, resolveSessionResetType, resolveGroupSessionKey, - resolveSessionFilePath, resolveSessionKey, resolveSessionTranscriptPath, resolveStorePath, @@ -30,91 +27,22 @@ import { archiveSessionTranscripts } from "../../gateway/session-utils.fs.js"; import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; -import { buildAgentMainSessionKey, normalizeMainKey } from "../../routing/session-key.js"; -import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; -import { - deliveryContextFromSession, - deliveryContextKey, - normalizeDeliveryContext, - normalizeSessionDeliveryFields, -} from "../../utils/delivery-context.js"; -import { - INTERNAL_MESSAGE_CHANNEL, - isDeliverableMessageChannel, - normalizeMessageChannel, -} from "../../utils/message-channel.js"; +import { normalizeMainKey } from "../../routing/session-key.js"; +import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { resolveCommandAuthorization } from "../command-auth.js"; -import type { MsgContext, TemplateContext } from "../templating.js"; +import type { TemplateContext } from "../templating.js"; import { normalizeInboundTextNewlines } from "./inbound-text.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; +import { + maybeRetireLegacyMainDeliveryRoute, + resolveLastChannelRaw, + resolveLastToRaw, +} from "./session-delivery.js"; +import { forkSessionFromParent, resolveParentForkMaxTokens } from "./session-fork.js"; +import { buildSessionEndHookPayload, buildSessionStartHookPayload } from "./session-hooks.js"; const log = createSubsystemLogger("session-init"); -function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined { - const parsed = parseAgentSessionKey(sessionKey); - if (!parsed?.rest) { - return undefined; - } - const head = parsed.rest.split(":")[0]?.trim().toLowerCase(); - if (!head || head === "main" || head === "cron" || head === "subagent" || head === "acp") { - return undefined; - } - return normalizeMessageChannel(head); -} - -function isExternalRoutingChannel(channel?: string): channel is string { - return Boolean( - channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel), - ); -} - -function resolveLastChannelRaw(params: { - originatingChannelRaw?: string; - persistedLastChannel?: string; - sessionKey?: string; -}): string | undefined { - const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw); - const persistedChannel = normalizeMessageChannel(params.persistedLastChannel); - const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey); - let resolved = params.originatingChannelRaw || params.persistedLastChannel; - // Internal/non-deliverable sources should not overwrite previously known - // external delivery routes (or explicit channel hints from the session key). - if (!isExternalRoutingChannel(originatingChannel)) { - if (isExternalRoutingChannel(persistedChannel)) { - resolved = persistedChannel; - } else if (isExternalRoutingChannel(sessionKeyChannelHint)) { - resolved = sessionKeyChannelHint; - } - } - return resolved; -} - -function resolveLastToRaw(params: { - originatingChannelRaw?: string; - originatingToRaw?: string; - toRaw?: string; - persistedLastTo?: string; - persistedLastChannel?: string; - sessionKey?: string; -}): string | undefined { - const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw); - const persistedChannel = normalizeMessageChannel(params.persistedLastChannel); - const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey); - - // When the turn originates from an internal/non-deliverable source, do not - // replace an established external destination with internal routing ids - // (e.g., session/webchat ids). - if (!isExternalRoutingChannel(originatingChannel)) { - const hasExternalFallback = - isExternalRoutingChannel(persistedChannel) || isExternalRoutingChannel(sessionKeyChannelHint); - if (hasExternalFallback && params.persistedLastTo) { - return params.persistedLastTo; - } - } - - return params.originatingToRaw || params.toRaw || params.persistedLastTo; -} - export type SessionInitResult = { sessionCtx: TemplateContext; sessionEntry: SessionEntry; @@ -134,193 +62,6 @@ export type SessionInitResult = { triggerBodyNormalized: string; }; -/** - * Default max parent token count beyond which thread/session parent forking is skipped. - * This prevents new thread sessions from inheriting near-full parent context. - * See #26905. - */ -const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000; - -type LegacyMainDeliveryRetirement = { - key: string; - entry: SessionEntry; -}; - -type SessionHookContext = { - sessionId: string; - sessionKey: string; - agentId: string; -}; - -function buildSessionHookContext(params: { - sessionId: string; - sessionKey: string; - cfg: OpenClawConfig; -}): SessionHookContext { - return { - sessionId: params.sessionId, - sessionKey: params.sessionKey, - agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }), - }; -} - -function buildSessionStartHookPayload(params: { - sessionId: string; - sessionKey: string; - cfg: OpenClawConfig; - resumedFrom?: string; -}): { - event: { sessionId: string; sessionKey: string; resumedFrom?: string }; - context: SessionHookContext; -} { - return { - event: { - sessionId: params.sessionId, - sessionKey: params.sessionKey, - resumedFrom: params.resumedFrom, - }, - context: buildSessionHookContext({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - cfg: params.cfg, - }), - }; -} - -function buildSessionEndHookPayload(params: { - sessionId: string; - sessionKey: string; - cfg: OpenClawConfig; - messageCount?: number; -}): { - event: { sessionId: string; sessionKey: string; messageCount: number }; - context: SessionHookContext; -} { - return { - event: { - sessionId: params.sessionId, - sessionKey: params.sessionKey, - messageCount: params.messageCount ?? 0, - }, - context: buildSessionHookContext({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - cfg: params.cfg, - }), - }; -} - -function resolveParentForkMaxTokens(cfg: OpenClawConfig): number { - const configured = cfg.session?.parentForkMaxTokens; - if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) { - return Math.floor(configured); - } - return DEFAULT_PARENT_FORK_MAX_TOKENS; -} - -function maybeRetireLegacyMainDeliveryRoute(params: { - sessionCfg: OpenClawConfig["session"] | undefined; - sessionKey: string; - sessionStore: Record; - agentId: string; - mainKey: string; - isGroup: boolean; - ctx: MsgContext; -}): LegacyMainDeliveryRetirement | undefined { - const dmScope = params.sessionCfg?.dmScope ?? "main"; - if (dmScope === "main" || params.isGroup) { - return undefined; - } - const canonicalMainSessionKey = buildAgentMainSessionKey({ - agentId: params.agentId, - mainKey: params.mainKey, - }).toLowerCase(); - if (params.sessionKey === canonicalMainSessionKey) { - return undefined; - } - const legacyMain = params.sessionStore[canonicalMainSessionKey]; - if (!legacyMain) { - return undefined; - } - const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain)); - if (!legacyRouteKey) { - return undefined; - } - const activeDirectRouteKey = deliveryContextKey( - normalizeDeliveryContext({ - channel: params.ctx.OriginatingChannel as string | undefined, - to: params.ctx.OriginatingTo || params.ctx.To, - accountId: params.ctx.AccountId, - threadId: params.ctx.MessageThreadId, - }), - ); - if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) { - return undefined; - } - if ( - legacyMain.deliveryContext === undefined && - legacyMain.lastChannel === undefined && - legacyMain.lastTo === undefined && - legacyMain.lastAccountId === undefined && - legacyMain.lastThreadId === undefined - ) { - return undefined; - } - return { - key: canonicalMainSessionKey, - entry: { - ...legacyMain, - deliveryContext: undefined, - lastChannel: undefined, - lastTo: undefined, - lastAccountId: undefined, - lastThreadId: undefined, - }, - }; -} - -function forkSessionFromParent(params: { - parentEntry: SessionEntry; - agentId: string; - sessionsDir: string; -}): { sessionId: string; sessionFile: string } | null { - const parentSessionFile = resolveSessionFilePath( - params.parentEntry.sessionId, - params.parentEntry, - { agentId: params.agentId, sessionsDir: params.sessionsDir }, - ); - if (!parentSessionFile || !fs.existsSync(parentSessionFile)) { - return null; - } - try { - const manager = SessionManager.open(parentSessionFile); - const leafId = manager.getLeafId(); - if (leafId) { - const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile(); - const sessionId = manager.getSessionId(); - if (sessionFile && sessionId) { - return { sessionId, sessionFile }; - } - } - const sessionId = crypto.randomUUID(); - const timestamp = new Date().toISOString(); - const fileTimestamp = timestamp.replace(/[:.]/g, "-"); - const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`); - const header = { - type: "session", - version: CURRENT_SESSION_VERSION, - id: sessionId, - timestamp, - cwd: manager.getCwd(), - parentSession: parentSessionFile, - }; - fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8"); - return { sessionId, sessionFile }; - } catch { - return null; - } -} - export async function initSessionState(params: { ctx: MsgContext; cfg: OpenClawConfig;