refactor: extract session init helpers

This commit is contained in:
Peter Steinberger
2026-03-03 02:36:57 +00:00
parent 9702d94196
commit 8ee357fc76
4 changed files with 284 additions and 269 deletions

View File

@@ -0,0 +1,145 @@
import type { SessionEntry } from "../../config/sessions.js";
import { buildAgentMainSessionKey } from "../../routing/session-key.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import {
deliveryContextFromSession,
deliveryContextKey,
normalizeDeliveryContext,
} from "../../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
normalizeMessageChannel,
} from "../../utils/message-channel.js";
import type { MsgContext } from "../templating.js";
export type LegacyMainDeliveryRetirement = {
key: string;
entry: SessionEntry;
};
function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed?.rest) {
return undefined;
}
const head = parsed.rest.split(":")[0]?.trim().toLowerCase();
if (!head || head === "main" || head === "cron" || head === "subagent" || head === "acp") {
return undefined;
}
return normalizeMessageChannel(head);
}
function isExternalRoutingChannel(channel?: string): channel is string {
return Boolean(
channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel),
);
}
export function resolveLastChannelRaw(params: {
originatingChannelRaw?: string;
persistedLastChannel?: string;
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
let resolved = params.originatingChannelRaw || params.persistedLastChannel;
// Internal/non-deliverable sources should not overwrite previously known
// external delivery routes (or explicit channel hints from the session key).
if (!isExternalRoutingChannel(originatingChannel)) {
if (isExternalRoutingChannel(persistedChannel)) {
resolved = persistedChannel;
} else if (isExternalRoutingChannel(sessionKeyChannelHint)) {
resolved = sessionKeyChannelHint;
}
}
return resolved;
}
export function resolveLastToRaw(params: {
originatingChannelRaw?: string;
originatingToRaw?: string;
toRaw?: string;
persistedLastTo?: string;
persistedLastChannel?: string;
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
// When the turn originates from an internal/non-deliverable source, do not
// replace an established external destination with internal routing ids
// (e.g., session/webchat ids).
if (!isExternalRoutingChannel(originatingChannel)) {
const hasExternalFallback =
isExternalRoutingChannel(persistedChannel) || isExternalRoutingChannel(sessionKeyChannelHint);
if (hasExternalFallback && params.persistedLastTo) {
return params.persistedLastTo;
}
}
return params.originatingToRaw || params.toRaw || params.persistedLastTo;
}
export function maybeRetireLegacyMainDeliveryRoute(params: {
sessionCfg: { dmScope?: string } | undefined;
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
agentId: string;
mainKey: string;
isGroup: boolean;
ctx: MsgContext;
}): LegacyMainDeliveryRetirement | undefined {
const dmScope = params.sessionCfg?.dmScope ?? "main";
if (dmScope === "main" || params.isGroup) {
return undefined;
}
const canonicalMainSessionKey = buildAgentMainSessionKey({
agentId: params.agentId,
mainKey: params.mainKey,
}).toLowerCase();
if (params.sessionKey === canonicalMainSessionKey) {
return undefined;
}
const legacyMain = params.sessionStore[canonicalMainSessionKey];
if (!legacyMain) {
return undefined;
}
const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain));
if (!legacyRouteKey) {
return undefined;
}
const activeDirectRouteKey = deliveryContextKey(
normalizeDeliveryContext({
channel: params.ctx.OriginatingChannel as string | undefined,
to: params.ctx.OriginatingTo || params.ctx.To,
accountId: params.ctx.AccountId,
threadId: params.ctx.MessageThreadId,
}),
);
if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) {
return undefined;
}
if (
legacyMain.deliveryContext === undefined &&
legacyMain.lastChannel === undefined &&
legacyMain.lastTo === undefined &&
legacyMain.lastAccountId === undefined &&
legacyMain.lastThreadId === undefined
) {
return undefined;
}
return {
key: canonicalMainSessionKey,
entry: {
...legacyMain,
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
},
};
}

View File

@@ -0,0 +1,63 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveSessionFilePath, type SessionEntry } from "../../config/sessions.js";
/**
* Default max parent token count beyond which thread/session parent forking is skipped.
* This prevents new thread sessions from inheriting near-full parent context.
* See #26905.
*/
const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000;
export function resolveParentForkMaxTokens(cfg: OpenClawConfig): number {
const configured = cfg.session?.parentForkMaxTokens;
if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) {
return Math.floor(configured);
}
return DEFAULT_PARENT_FORK_MAX_TOKENS;
}
export function forkSessionFromParent(params: {
parentEntry: SessionEntry;
agentId: string;
sessionsDir: string;
}): { sessionId: string; sessionFile: string } | null {
const parentSessionFile = resolveSessionFilePath(
params.parentEntry.sessionId,
params.parentEntry,
{ agentId: params.agentId, sessionsDir: params.sessionsDir },
);
if (!parentSessionFile || !fs.existsSync(parentSessionFile)) {
return null;
}
try {
const manager = SessionManager.open(parentSessionFile);
const leafId = manager.getLeafId();
if (leafId) {
const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile();
const sessionId = manager.getSessionId();
if (sessionFile && sessionId) {
return { sessionId, sessionFile };
}
}
const sessionId = crypto.randomUUID();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: manager.getCwd(),
parentSession: parentSessionFile,
};
fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8");
return { sessionId, sessionFile };
} catch {
return null;
}
}

View File

@@ -0,0 +1,66 @@
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import type { OpenClawConfig } from "../../config/config.js";
export type SessionHookContext = {
sessionId: string;
sessionKey: string;
agentId: string;
};
function buildSessionHookContext(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
}): SessionHookContext {
return {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }),
};
}
export function buildSessionStartHookPayload(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
resumedFrom?: string;
}): {
event: { sessionId: string; sessionKey: string; resumedFrom?: string };
context: SessionHookContext;
} {
return {
event: {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
resumedFrom: params.resumedFrom,
},
context: buildSessionHookContext({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cfg: params.cfg,
}),
};
}
export function buildSessionEndHookPayload(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
messageCount?: number;
}): {
event: { sessionId: string; sessionKey: string; messageCount: number };
context: SessionHookContext;
} {
return {
event: {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
messageCount: params.messageCount ?? 0,
},
context: buildSessionHookContext({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cfg: params.cfg,
}),
};
}

View File

@@ -1,7 +1,5 @@
import crypto from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { normalizeChatType } from "../../channels/chat-type.js";
import type { OpenClawConfig } from "../../config/config.js";
@@ -17,7 +15,6 @@ import {
resolveSessionResetPolicy,
resolveSessionResetType,
resolveGroupSessionKey,
resolveSessionFilePath,
resolveSessionKey,
resolveSessionTranscriptPath,
resolveStorePath,
@@ -30,91 +27,22 @@ import { archiveSessionTranscripts } from "../../gateway/session-utils.fs.js";
import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { buildAgentMainSessionKey, normalizeMainKey } from "../../routing/session-key.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import {
deliveryContextFromSession,
deliveryContextKey,
normalizeDeliveryContext,
normalizeSessionDeliveryFields,
} from "../../utils/delivery-context.js";
import {
INTERNAL_MESSAGE_CHANNEL,
isDeliverableMessageChannel,
normalizeMessageChannel,
} from "../../utils/message-channel.js";
import { normalizeMainKey } from "../../routing/session-key.js";
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
import { resolveCommandAuthorization } from "../command-auth.js";
import type { MsgContext, TemplateContext } from "../templating.js";
import type { TemplateContext } from "../templating.js";
import { normalizeInboundTextNewlines } from "./inbound-text.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import {
maybeRetireLegacyMainDeliveryRoute,
resolveLastChannelRaw,
resolveLastToRaw,
} from "./session-delivery.js";
import { forkSessionFromParent, resolveParentForkMaxTokens } from "./session-fork.js";
import { buildSessionEndHookPayload, buildSessionStartHookPayload } from "./session-hooks.js";
const log = createSubsystemLogger("session-init");
function resolveSessionKeyChannelHint(sessionKey?: string): string | undefined {
const parsed = parseAgentSessionKey(sessionKey);
if (!parsed?.rest) {
return undefined;
}
const head = parsed.rest.split(":")[0]?.trim().toLowerCase();
if (!head || head === "main" || head === "cron" || head === "subagent" || head === "acp") {
return undefined;
}
return normalizeMessageChannel(head);
}
function isExternalRoutingChannel(channel?: string): channel is string {
return Boolean(
channel && channel !== INTERNAL_MESSAGE_CHANNEL && isDeliverableMessageChannel(channel),
);
}
function resolveLastChannelRaw(params: {
originatingChannelRaw?: string;
persistedLastChannel?: string;
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
let resolved = params.originatingChannelRaw || params.persistedLastChannel;
// Internal/non-deliverable sources should not overwrite previously known
// external delivery routes (or explicit channel hints from the session key).
if (!isExternalRoutingChannel(originatingChannel)) {
if (isExternalRoutingChannel(persistedChannel)) {
resolved = persistedChannel;
} else if (isExternalRoutingChannel(sessionKeyChannelHint)) {
resolved = sessionKeyChannelHint;
}
}
return resolved;
}
function resolveLastToRaw(params: {
originatingChannelRaw?: string;
originatingToRaw?: string;
toRaw?: string;
persistedLastTo?: string;
persistedLastChannel?: string;
sessionKey?: string;
}): string | undefined {
const originatingChannel = normalizeMessageChannel(params.originatingChannelRaw);
const persistedChannel = normalizeMessageChannel(params.persistedLastChannel);
const sessionKeyChannelHint = resolveSessionKeyChannelHint(params.sessionKey);
// When the turn originates from an internal/non-deliverable source, do not
// replace an established external destination with internal routing ids
// (e.g., session/webchat ids).
if (!isExternalRoutingChannel(originatingChannel)) {
const hasExternalFallback =
isExternalRoutingChannel(persistedChannel) || isExternalRoutingChannel(sessionKeyChannelHint);
if (hasExternalFallback && params.persistedLastTo) {
return params.persistedLastTo;
}
}
return params.originatingToRaw || params.toRaw || params.persistedLastTo;
}
export type SessionInitResult = {
sessionCtx: TemplateContext;
sessionEntry: SessionEntry;
@@ -134,193 +62,6 @@ export type SessionInitResult = {
triggerBodyNormalized: string;
};
/**
* Default max parent token count beyond which thread/session parent forking is skipped.
* This prevents new thread sessions from inheriting near-full parent context.
* See #26905.
*/
const DEFAULT_PARENT_FORK_MAX_TOKENS = 100_000;
type LegacyMainDeliveryRetirement = {
key: string;
entry: SessionEntry;
};
type SessionHookContext = {
sessionId: string;
sessionKey: string;
agentId: string;
};
function buildSessionHookContext(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
}): SessionHookContext {
return {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
agentId: resolveSessionAgentId({ sessionKey: params.sessionKey, config: params.cfg }),
};
}
function buildSessionStartHookPayload(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
resumedFrom?: string;
}): {
event: { sessionId: string; sessionKey: string; resumedFrom?: string };
context: SessionHookContext;
} {
return {
event: {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
resumedFrom: params.resumedFrom,
},
context: buildSessionHookContext({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cfg: params.cfg,
}),
};
}
function buildSessionEndHookPayload(params: {
sessionId: string;
sessionKey: string;
cfg: OpenClawConfig;
messageCount?: number;
}): {
event: { sessionId: string; sessionKey: string; messageCount: number };
context: SessionHookContext;
} {
return {
event: {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
messageCount: params.messageCount ?? 0,
},
context: buildSessionHookContext({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cfg: params.cfg,
}),
};
}
function resolveParentForkMaxTokens(cfg: OpenClawConfig): number {
const configured = cfg.session?.parentForkMaxTokens;
if (typeof configured === "number" && Number.isFinite(configured) && configured >= 0) {
return Math.floor(configured);
}
return DEFAULT_PARENT_FORK_MAX_TOKENS;
}
function maybeRetireLegacyMainDeliveryRoute(params: {
sessionCfg: OpenClawConfig["session"] | undefined;
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
agentId: string;
mainKey: string;
isGroup: boolean;
ctx: MsgContext;
}): LegacyMainDeliveryRetirement | undefined {
const dmScope = params.sessionCfg?.dmScope ?? "main";
if (dmScope === "main" || params.isGroup) {
return undefined;
}
const canonicalMainSessionKey = buildAgentMainSessionKey({
agentId: params.agentId,
mainKey: params.mainKey,
}).toLowerCase();
if (params.sessionKey === canonicalMainSessionKey) {
return undefined;
}
const legacyMain = params.sessionStore[canonicalMainSessionKey];
if (!legacyMain) {
return undefined;
}
const legacyRouteKey = deliveryContextKey(deliveryContextFromSession(legacyMain));
if (!legacyRouteKey) {
return undefined;
}
const activeDirectRouteKey = deliveryContextKey(
normalizeDeliveryContext({
channel: params.ctx.OriginatingChannel as string | undefined,
to: params.ctx.OriginatingTo || params.ctx.To,
accountId: params.ctx.AccountId,
threadId: params.ctx.MessageThreadId,
}),
);
if (!activeDirectRouteKey || activeDirectRouteKey !== legacyRouteKey) {
return undefined;
}
if (
legacyMain.deliveryContext === undefined &&
legacyMain.lastChannel === undefined &&
legacyMain.lastTo === undefined &&
legacyMain.lastAccountId === undefined &&
legacyMain.lastThreadId === undefined
) {
return undefined;
}
return {
key: canonicalMainSessionKey,
entry: {
...legacyMain,
deliveryContext: undefined,
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
},
};
}
function forkSessionFromParent(params: {
parentEntry: SessionEntry;
agentId: string;
sessionsDir: string;
}): { sessionId: string; sessionFile: string } | null {
const parentSessionFile = resolveSessionFilePath(
params.parentEntry.sessionId,
params.parentEntry,
{ agentId: params.agentId, sessionsDir: params.sessionsDir },
);
if (!parentSessionFile || !fs.existsSync(parentSessionFile)) {
return null;
}
try {
const manager = SessionManager.open(parentSessionFile);
const leafId = manager.getLeafId();
if (leafId) {
const sessionFile = manager.createBranchedSession(leafId) ?? manager.getSessionFile();
const sessionId = manager.getSessionId();
if (sessionFile && sessionId) {
return { sessionId, sessionFile };
}
}
const sessionId = crypto.randomUUID();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(manager.getSessionDir(), `${fileTimestamp}_${sessionId}.jsonl`);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: manager.getCwd(),
parentSession: parentSessionFile,
};
fs.writeFileSync(sessionFile, `${JSON.stringify(header)}\n`, "utf-8");
return { sessionId, sessionFile };
} catch {
return null;
}
}
export async function initSessionState(params: {
ctx: MsgContext;
cfg: OpenClawConfig;