perf(inbound): trim reply-run startup imports (#52332)

* perf(inbound): trim reply-run startup imports

* style(reply): format body runtime import

* test(reply): restore runtime seam mocks
This commit is contained in:
Vincent Koc
2026-03-22 09:05:14 -07:00
committed by GitHub
parent 05279539a8
commit 6b7206ed35
9 changed files with 193 additions and 135 deletions

View File

@@ -0,0 +1,6 @@
export {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,
resolveEmbeddedSessionLane,
} from "./pi-embedded.js";

View File

@@ -0,0 +1 @@
export { runReplyAgent } from "./agent-runner.js";

View File

@@ -1,7 +1,15 @@
import type { SessionEntry } from "../../config/sessions.js";
import { updateSessionStore } from "../../config/sessions.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import { setAbortMemory } from "./abort-primitives.js";
let sessionStoreRuntimePromise: Promise<
typeof import("../../config/sessions/store.runtime.js")
> | null = null;
function loadSessionStoreRuntime() {
sessionStoreRuntimePromise ??= import("../../config/sessions/store.runtime.js");
return sessionStoreRuntimePromise;
}
export async function applySessionHints(params: {
baseBody: string;
abortedLastRun: boolean;
@@ -23,6 +31,7 @@ export async function applySessionHints(params: {
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
const sessionKey = params.sessionKey;
const { updateSessionStore } = await loadSessionStoreRuntime();
await updateSessionStore(params.storePath, (store) => {
const entry = store[sessionKey] ?? params.sessionEntry;
if (!entry) {

View File

@@ -5,17 +5,23 @@ vi.mock("../../agents/auth-profiles/session-override.js", () => ({
resolveSessionAuthProfileOverride: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../../agents/pi-embedded.js", () => ({
vi.mock("../../agents/pi-embedded.runtime.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: vi.fn().mockReturnValue("session:session-key"),
}));
vi.mock("../../config/sessions.js", () => ({
vi.mock("../../config/sessions/group.js", () => ({
resolveGroupSessionKey: vi.fn().mockReturnValue(undefined),
}));
vi.mock("../../config/sessions/paths.js", () => ({
resolveSessionFilePath: vi.fn().mockReturnValue("/tmp/session.jsonl"),
resolveSessionFilePathOptions: vi.fn().mockReturnValue({}),
}));
vi.mock("../../config/sessions/store.js", () => ({
updateSessionStore: vi.fn(),
}));
@@ -30,6 +36,7 @@ vi.mock("../../process/command-queue.js", () => ({
vi.mock("../../routing/session-key.js", () => ({
normalizeMainKey: vi.fn().mockReturnValue("main"),
normalizeAgentId: vi.fn((id?: string) => id ?? "default"),
}));
vi.mock("../../utils/provider-utils.js", () => ({
@@ -40,7 +47,7 @@ vi.mock("../command-detection.js", () => ({
hasControlCommand: vi.fn().mockReturnValue(false),
}));
vi.mock("./agent-runner.js", () => ({
vi.mock("./agent-runner.runtime.js", () => ({
runReplyAgent: vi.fn().mockResolvedValue({ text: "ok" }),
}));
@@ -58,20 +65,23 @@ vi.mock("./inbound-meta.js", () => ({
buildInboundUserContextPrefix: vi.fn().mockReturnValue(""),
}));
vi.mock("./queue.js", () => ({
vi.mock("./queue/settings.js", () => ({
resolveQueueSettings: vi.fn().mockReturnValue({ mode: "followup" }),
}));
vi.mock("./route-reply.js", () => ({
vi.mock("./route-reply.runtime.js", () => ({
routeReply: vi.fn(),
}));
vi.mock("./session-updates.js", () => ({
vi.mock("./session-updates.runtime.js", () => ({
ensureSkillSnapshot: vi.fn().mockImplementation(async ({ sessionEntry, systemSent }) => ({
sessionEntry,
systemSent,
skillsSnapshot: undefined,
})),
}));
vi.mock("./session-system-events.js", () => ({
drainFormattedSystemEvents: vi.fn().mockResolvedValue(undefined),
}));
@@ -79,9 +89,9 @@ vi.mock("./typing-mode.js", () => ({
resolveTypingMode: vi.fn().mockReturnValue("off"),
}));
import { runReplyAgent } from "./agent-runner.js";
import { routeReply } from "./route-reply.js";
import { drainFormattedSystemEvents } from "./session-updates.js";
import { runReplyAgent } from "./agent-runner.runtime.js";
import { routeReply } from "./route-reply.runtime.js";
import { drainFormattedSystemEvents } from "./session-system-events.js";
import { resolveTypingMode } from "./typing-mode.js";
function baseParams(

View File

@@ -2,12 +2,6 @@ import crypto from "node:crypto";
import { resolveSessionAuthProfileOverride } from "../../agents/auth-profiles/session-override.js";
import type { ExecToolDefaults } from "../../agents/bash-tools.js";
import { resolveFastModeState } from "../../agents/fast-mode.js";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,
resolveEmbeddedSessionLane,
} from "../../agents/pi-embedded.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveGroupSessionKey } from "../../config/sessions/group.js";
import {
@@ -35,7 +29,6 @@ import {
} from "../thinking.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { runReplyAgent } from "./agent-runner.js";
import { applySessionHints } from "./body.js";
import type { buildCommandContext } from "./commands.js";
import type { InlineDirectives } from "./directive-handling.js";
@@ -43,10 +36,10 @@ import { buildGroupChatContext, buildGroupIntro } from "./groups.js";
import { buildInboundMetaSystemPrompt, buildInboundUserContextPrefix } from "./inbound-meta.js";
import type { createModelSelectionState } from "./model-selection.js";
import { resolveOriginMessageProvider } from "./origin-routing.js";
import { resolveQueueSettings } from "./queue.js";
import { routeReply } from "./route-reply.js";
import { resolveQueueSettings } from "./queue/settings.js";
import type { RouteReplyParams } from "./route-reply.js";
import { buildBareSessionResetPrompt } from "./session-reset-prompt.js";
import { drainFormattedSystemEvents, ensureSkillSnapshot } from "./session-updates.js";
import { drainFormattedSystemEvents } from "./session-system-events.js";
import { resolveTypingMode } from "./typing-mode.js";
import { resolveRunTypingPolicy } from "./typing-policy.js";
import type { TypingController } from "./typing.js";
@@ -55,6 +48,33 @@ import { appendUntrustedContext } from "./untrusted-context.js";
type AgentDefaults = NonNullable<OpenClawConfig["agents"]>["defaults"];
type ExecOverrides = Pick<ExecToolDefaults, "host" | "security" | "ask" | "node">;
let piEmbeddedRuntimePromise: Promise<typeof import("../../agents/pi-embedded.runtime.js")> | null =
null;
let agentRunnerRuntimePromise: Promise<typeof import("./agent-runner.runtime.js")> | null = null;
let routeReplyRuntimePromise: Promise<typeof import("./route-reply.runtime.js")> | null = null;
let sessionUpdatesRuntimePromise: Promise<typeof import("./session-updates.runtime.js")> | null =
null;
function loadPiEmbeddedRuntime() {
piEmbeddedRuntimePromise ??= import("../../agents/pi-embedded.runtime.js");
return piEmbeddedRuntimePromise;
}
function loadAgentRunnerRuntime() {
agentRunnerRuntimePromise ??= import("./agent-runner.runtime.js");
return agentRunnerRuntimePromise;
}
function loadRouteReplyRuntime() {
routeReplyRuntimePromise ??= import("./route-reply.runtime.js");
return routeReplyRuntimePromise;
}
function loadSessionUpdatesRuntime() {
sessionUpdatesRuntimePromise ??= import("./session-updates.runtime.js");
return sessionUpdatesRuntimePromise;
}
function buildResetSessionNoticeText(params: {
provider: string;
model: string;
@@ -72,13 +92,13 @@ function resolveResetSessionNoticeRoute(params: {
ctx: MsgContext;
command: ReturnType<typeof buildCommandContext>;
}): {
channel: Parameters<typeof routeReply>[0]["channel"];
channel: RouteReplyParams["channel"];
to: string;
} | null {
const commandChannel = params.command.channel?.trim().toLowerCase();
const fallbackChannel =
commandChannel && commandChannel !== "webchat"
? (commandChannel as Parameters<typeof routeReply>[0]["channel"])
? (commandChannel as RouteReplyParams["channel"])
: undefined;
const channel = params.ctx.OriginatingChannel ?? fallbackChannel;
const to = params.ctx.OriginatingTo ?? params.command.from ?? params.command.to;
@@ -107,6 +127,7 @@ async function sendResetSessionNotice(params: {
if (!route) {
return;
}
const { routeReply } = await loadRouteReplyRuntime();
await routeReply({
payload: {
text: buildResetSessionNoticeText({
@@ -368,6 +389,7 @@ export async function runPreparedReply(
: threadStarterBody
? `[Thread starter - for context]\n${threadStarterBody}`
: undefined;
const { ensureSkillSnapshot } = await loadSessionUpdatesRuntime();
const skillResult = await ensureSkillSnapshot({
sessionEntry,
sessionStore,
@@ -446,6 +468,12 @@ export async function runPreparedReply(
inlineMode: perMessageQueueMode,
inlineOptions: perMessageQueueOptions,
});
const {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,
resolveEmbeddedSessionLane,
} = await loadPiEmbeddedRuntime();
const sessionLaneKey = resolveEmbeddedSessionLane(sessionKey ?? sessionIdFinal);
const laneSize = getQueueSize(sessionLaneKey);
if (resolvedQueue.mode === "interrupt" && laneSize > 0) {
@@ -538,6 +566,7 @@ export async function runPreparedReply(
},
};
const { runReplyAgent } = await loadAgentRunnerRuntime();
return runReplyAgent({
commandBody: prefixedCommandBody,
followupRun,

View File

@@ -0,0 +1 @@
export { routeReply } from "./route-reply.js";

View File

@@ -0,0 +1,112 @@
import { resolveUserTimezone } from "../../agents/date-time.js";
import type { OpenClawConfig } from "../../config/config.js";
import { buildChannelSummary } from "../../infra/channel-summary.js";
import {
formatUtcTimestamp,
formatZonedTimestamp,
resolveTimezone,
} from "../../infra/format-time/format-datetime.ts";
import { drainSystemEventEntries } from "../../infra/system-events.js";
/** Drain queued system events, format as `System:` lines, return the block (or undefined). */
export async function drainFormattedSystemEvents(params: {
cfg: OpenClawConfig;
sessionKey: string;
isMainSession: boolean;
isNewSession: boolean;
}): Promise<string | undefined> {
const compactSystemEvent = (line: string): string | null => {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
const lower = trimmed.toLowerCase();
if (lower.includes("reason periodic")) {
return null;
}
// Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat".
// The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this.
if (lower.startsWith("read heartbeat.md")) {
return null;
}
if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) {
return null;
}
if (trimmed.startsWith("Node:")) {
return trimmed.replace(/ · last input [^·]+/i, "").trim();
}
return trimmed;
};
const resolveSystemEventTimezone = (cfg: OpenClawConfig) => {
const raw = cfg.agents?.defaults?.envelopeTimezone?.trim();
if (!raw) {
return { mode: "local" as const };
}
const lowered = raw.toLowerCase();
if (lowered === "utc" || lowered === "gmt") {
return { mode: "utc" as const };
}
if (lowered === "local" || lowered === "host") {
return { mode: "local" as const };
}
if (lowered === "user") {
return {
mode: "iana" as const,
timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone),
};
}
const explicit = resolveTimezone(raw);
return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const };
};
const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => {
const date = new Date(ts);
if (Number.isNaN(date.getTime())) {
return "unknown-time";
}
const zone = resolveSystemEventTimezone(cfg);
if (zone.mode === "utc") {
return formatUtcTimestamp(date, { displaySeconds: true });
}
if (zone.mode === "local") {
return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time";
}
return (
formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ??
"unknown-time"
);
};
const systemLines: string[] = [];
const queued = drainSystemEventEntries(params.sessionKey);
systemLines.push(
...queued
.map((event) => {
const compacted = compactSystemEvent(event.text);
if (!compacted) {
return null;
}
return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`;
})
.filter((v): v is string => Boolean(v)),
);
if (params.isMainSession && params.isNewSession) {
const summary = await buildChannelSummary(params.cfg);
if (summary.length > 0) {
systemLines.unshift(...summary);
}
}
if (systemLines.length === 0) {
return undefined;
}
// Format events as trusted System: lines for the message timeline.
// Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):",
// so these gateway-originated lines are distinguishable by the model.
// Each sub-line of a multi-line event gets its own System: prefix so continuation
// lines can't be mistaken for user content.
return systemLines
.flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`))
.join("\n");
}

View File

@@ -0,0 +1 @@
export { ensureSkillSnapshot } from "./session-updates.js";

View File

@@ -1,121 +1,10 @@
import crypto from "node:crypto";
import { resolveUserTimezone } from "../../agents/date-time.js";
import { buildWorkspaceSkillSnapshot } from "../../agents/skills.js";
import { ensureSkillsWatcher, getSkillsSnapshotVersion } from "../../agents/skills/refresh.js";
import type { OpenClawConfig } from "../../config/config.js";
import { type SessionEntry, updateSessionStore } from "../../config/sessions.js";
import { buildChannelSummary } from "../../infra/channel-summary.js";
import {
resolveTimezone,
formatUtcTimestamp,
formatZonedTimestamp,
} from "../../infra/format-time/format-datetime.ts";
import { getRemoteSkillEligibility } from "../../infra/skills-remote.js";
import { drainSystemEventEntries } from "../../infra/system-events.js";
/** Drain queued system events, format as `System:` lines, return the block (or undefined). */
export async function drainFormattedSystemEvents(params: {
cfg: OpenClawConfig;
sessionKey: string;
isMainSession: boolean;
isNewSession: boolean;
}): Promise<string | undefined> {
const compactSystemEvent = (line: string): string | null => {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
const lower = trimmed.toLowerCase();
if (lower.includes("reason periodic")) {
return null;
}
// Filter out the actual heartbeat prompt, but not cron jobs that mention "heartbeat"
// The heartbeat prompt starts with "Read HEARTBEAT.md" - cron payloads won't match this
if (lower.startsWith("read heartbeat.md")) {
return null;
}
// Also filter heartbeat poll/wake noise
if (lower.includes("heartbeat poll") || lower.includes("heartbeat wake")) {
return null;
}
if (trimmed.startsWith("Node:")) {
return trimmed.replace(/ · last input [^·]+/i, "").trim();
}
return trimmed;
};
const resolveSystemEventTimezone = (cfg: OpenClawConfig) => {
const raw = cfg.agents?.defaults?.envelopeTimezone?.trim();
if (!raw) {
return { mode: "local" as const };
}
const lowered = raw.toLowerCase();
if (lowered === "utc" || lowered === "gmt") {
return { mode: "utc" as const };
}
if (lowered === "local" || lowered === "host") {
return { mode: "local" as const };
}
if (lowered === "user") {
return {
mode: "iana" as const,
timeZone: resolveUserTimezone(cfg.agents?.defaults?.userTimezone),
};
}
const explicit = resolveTimezone(raw);
return explicit ? { mode: "iana" as const, timeZone: explicit } : { mode: "local" as const };
};
const formatSystemEventTimestamp = (ts: number, cfg: OpenClawConfig) => {
const date = new Date(ts);
if (Number.isNaN(date.getTime())) {
return "unknown-time";
}
const zone = resolveSystemEventTimezone(cfg);
if (zone.mode === "utc") {
return formatUtcTimestamp(date, { displaySeconds: true });
}
if (zone.mode === "local") {
return formatZonedTimestamp(date, { displaySeconds: true }) ?? "unknown-time";
}
return (
formatZonedTimestamp(date, { timeZone: zone.timeZone, displaySeconds: true }) ??
"unknown-time"
);
};
const systemLines: string[] = [];
const queued = drainSystemEventEntries(params.sessionKey);
systemLines.push(
...queued
.map((event) => {
const compacted = compactSystemEvent(event.text);
if (!compacted) {
return null;
}
return `[${formatSystemEventTimestamp(event.ts, params.cfg)}] ${compacted}`;
})
.filter((v): v is string => Boolean(v)),
);
if (params.isMainSession && params.isNewSession) {
const summary = await buildChannelSummary(params.cfg);
if (summary.length > 0) {
systemLines.unshift(...summary);
}
}
if (systemLines.length === 0) {
return undefined;
}
// Format events as trusted System: lines for the message timeline.
// Inbound sanitization rewrites any user-supplied "System:" to "System (untrusted):",
// so these gateway-originated lines are distinguishable by the model.
// Each sub-line of a multi-line event gets its own System: prefix so continuation
// lines can't be mistaken for user content.
return systemLines
.flatMap((line) => line.split("\n").map((subline) => `System: ${subline}`))
.join("\n");
}
export { drainFormattedSystemEvents } from "./session-system-events.js";
async function persistSessionEntryUpdate(params: {
sessionStore?: Record<string, SessionEntry>;