refactor: split compaction hooks

This commit is contained in:
Peter Steinberger
2026-03-27 01:36:13 +00:00
parent f862685ed8
commit 3fdd7c9e00
2 changed files with 318 additions and 304 deletions

View File

@@ -15,7 +15,6 @@ import {
ensureContextEnginesInitialized,
resolveContextEngine,
} from "../../context-engine/index.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getMachineDisplayName } from "../../infra/machine-name.js";
import { generateSecureToken } from "../../infra/secure-random.js";
import { resolveSignalReactionLevel } from "../../plugin-sdk/signal.js";
@@ -24,17 +23,15 @@ import {
resolveTelegramReactionLevel,
} from "../../plugin-sdk/telegram.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { getActiveMemorySearchManager } from "../../plugins/memory-runtime.js";
import { prepareProviderRuntimeAuth } from "../../plugins/provider-runtime.js";
import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js";
import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
import { resolveUserPath } from "../../utils.js";
import { normalizeMessageChannel } from "../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../utils/provider-utils.js";
import { resolveOpenClawAgentDir } from "../agent-paths.js";
import { resolveSessionAgentId, resolveSessionAgentIds } from "../agent-scope.js";
import { resolveSessionAgentIds } from "../agent-scope.js";
import type { ExecElevatedDefaults } from "../bash-tools.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js";
import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js";
@@ -47,7 +44,6 @@ import { ensureCustomApiRegistered } from "../custom-api-registry.js";
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
import { resolveOpenClawDocsPath } from "../docs-path.js";
import { resolveMemorySearchConfig } from "../memory-search.js";
import {
applyLocalNoAuthHeaderOverride,
getApiKeyForModel,
@@ -88,6 +84,14 @@ import {
} from "../skills.js";
import { resolveTranscriptPolicy } from "../transcript-policy.js";
import { classifyCompactionReason, resolveCompactionFailureReason } from "./compact-reasons.js";
import {
asCompactionHookRunner,
buildBeforeCompactionHookMetrics,
estimateTokensAfterCompaction,
runAfterCompactionHooks,
runBeforeCompactionHooks,
runPostCompactionSideEffects,
} from "./compaction-hooks.js";
import {
compactWithSafetyTimeout,
resolveCompactionTimeoutMs,
@@ -258,311 +262,12 @@ function summarizeCompactionMessages(messages: AgentMessage[]): CompactionMessag
};
}
function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" {
const mode = config?.agents?.defaults?.compaction?.postIndexSync;
if (mode === "off" || mode === "async" || mode === "await") {
return mode;
}
return "async";
}
async function runPostCompactionSessionMemorySync(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
if (!params.config) {
return;
}
try {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
const agentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.config,
});
const resolvedMemory = resolveMemorySearchConfig(params.config, agentId);
if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) {
return;
}
if (!resolvedMemory.sync.sessions.postCompactionForce) {
return;
}
const { manager } = await getActiveMemorySearchManager({
cfg: params.config,
agentId,
});
if (!manager?.sync) {
return;
}
const syncTask = manager.sync({
reason: "post-compaction",
sessionFiles: [sessionFile],
});
await syncTask;
} catch (err) {
log.warn(`memory sync skipped (post-compaction): ${String(err)}`);
}
}
function syncPostCompactionSessionMemory(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
mode: "off" | "async" | "await";
}): Promise<void> {
if (params.mode === "off" || !params.config) {
return Promise.resolve();
}
const syncTask = runPostCompactionSessionMemorySync({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
if (params.mode === "await") {
return syncTask;
}
void syncTask;
return Promise.resolve();
}
export async function runPostCompactionSideEffects(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
emitSessionTranscriptUpdate(sessionFile);
await syncPostCompactionSessionMemory({
config: params.config,
sessionKey: params.sessionKey,
sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
}
type CompactionHookRunner = {
hasHooks?: (hookName?: string) => boolean;
runBeforeCompaction?: (
metrics: { messageCount: number; tokenCount?: number; sessionFile?: string },
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
runAfterCompaction?: (
metrics: {
messageCount: number;
tokenCount?: number;
compactedCount: number;
sessionFile: string;
},
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
};
function asCompactionHookRunner(
hookRunner: ReturnType<typeof getGlobalHookRunner> | null | undefined,
): CompactionHookRunner | null {
if (!hookRunner) {
return null;
}
return {
hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false,
runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner),
runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner),
};
}
function estimateTokenCountSafe(
messages: AgentMessage[],
estimateTokensFn: (message: AgentMessage) => number,
): number | undefined {
try {
let total = 0;
for (const message of messages) {
total += estimateTokensFn(message);
}
return total;
} catch {
return undefined;
}
}
function buildBeforeCompactionHookMetrics(params: {
originalMessages: AgentMessage[];
currentMessages: AgentMessage[];
observedTokenCount?: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
return {
messageCountOriginal: params.originalMessages.length,
tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn),
messageCountBefore: params.currentMessages.length,
tokenCountBefore:
params.observedTokenCount ??
estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn),
};
}
async function runBeforeCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionKey?: string;
sessionAgentId: string;
workspaceDir: string;
messageProvider?: string;
metrics: ReturnType<typeof buildBeforeCompactionHookMetrics>;
}) {
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
try {
const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey,
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
messageCountOriginal: params.metrics.messageCountOriginal,
tokenCountOriginal: params.metrics.tokenCountOriginal,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:before hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("before_compaction")) {
try {
await params.hookRunner.runBeforeCompaction?.(
{
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
return {
hookSessionKey,
missingSessionKey,
};
}
function containsRealConversationMessages(messages: AgentMessage[]): boolean {
return messages.some((message, index, allMessages) =>
hasRealConversationContent(message, allMessages, index),
);
}
function estimateTokensAfterCompaction(params: {
messagesAfter: AgentMessage[];
observedTokenCount?: number;
fullSessionTokensBefore: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn);
if (tokensAfter === undefined) {
return undefined;
}
const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore;
if (
sanityCheckBaseline > 0 &&
tokensAfter >
(params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1)
) {
return undefined;
}
return tokensAfter;
}
async function runAfterCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionAgentId: string;
hookSessionKey: string;
missingSessionKey: boolean;
workspaceDir: string;
messageProvider?: string;
messageCountAfter: number;
tokensAfter?: number;
compactedCount: number;
sessionFile: string;
summaryLength?: number;
tokensBefore?: number;
firstKeptEntryId?: string;
}) {
try {
const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey: params.missingSessionKey,
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
summaryLength: params.summaryLength,
tokensBefore: params.tokensBefore,
tokensAfter: params.tokensAfter,
firstKeptEntryId: params.firstKeptEntryId,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:after hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("after_compaction")) {
try {
await params.hookRunner.runAfterCompaction?.(
{
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
sessionFile: params.sessionFile,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: params.hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
}
/**
* Core compaction logic without lane queueing.
* Use this when already inside a session/global lane to avoid deadlocks.
@@ -1399,3 +1104,5 @@ export const __testing = {
runAfterCompactionHooks,
runPostCompactionSideEffects,
} as const;
export { runPostCompactionSideEffects } from "./compaction-hooks.js";

View File

@@ -0,0 +1,307 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { OpenClawConfig } from "../../config/config.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import { getActiveMemorySearchManager } from "../../plugins/memory-runtime.js";
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
import { resolveSessionAgentId } from "../agent-scope.js";
import { resolveMemorySearchConfig } from "../memory-search.js";
import { log } from "./logger.js";
function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" {
const mode = config?.agents?.defaults?.compaction?.postIndexSync;
if (mode === "off" || mode === "async" || mode === "await") {
return mode;
}
return "async";
}
async function runPostCompactionSessionMemorySync(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
if (!params.config) {
return;
}
try {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
const agentId = resolveSessionAgentId({
sessionKey: params.sessionKey,
config: params.config,
});
const resolvedMemory = resolveMemorySearchConfig(params.config, agentId);
if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) {
return;
}
if (!resolvedMemory.sync.sessions.postCompactionForce) {
return;
}
const { manager } = await getActiveMemorySearchManager({
cfg: params.config,
agentId,
});
if (!manager?.sync) {
return;
}
await manager.sync({
reason: "post-compaction",
sessionFiles: [sessionFile],
});
} catch (err) {
log.warn(`memory sync skipped (post-compaction): ${String(err)}`);
}
}
function syncPostCompactionSessionMemory(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
mode: "off" | "async" | "await";
}): Promise<void> {
if (params.mode === "off" || !params.config) {
return Promise.resolve();
}
const syncTask = runPostCompactionSessionMemorySync({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
if (params.mode === "await") {
return syncTask;
}
void syncTask;
return Promise.resolve();
}
export async function runPostCompactionSideEffects(params: {
config?: OpenClawConfig;
sessionKey?: string;
sessionFile: string;
}): Promise<void> {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return;
}
emitSessionTranscriptUpdate(sessionFile);
await syncPostCompactionSessionMemory({
config: params.config,
sessionKey: params.sessionKey,
sessionFile,
mode: resolvePostCompactionIndexSyncMode(params.config),
});
}
export type CompactionHookRunner = {
hasHooks?: (hookName?: string) => boolean;
runBeforeCompaction?: (
metrics: { messageCount: number; tokenCount?: number; sessionFile?: string },
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
runAfterCompaction?: (
metrics: {
messageCount: number;
tokenCount?: number;
compactedCount: number;
sessionFile: string;
},
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
};
export function asCompactionHookRunner(
hookRunner: ReturnType<typeof getGlobalHookRunner> | null | undefined,
): CompactionHookRunner | null {
if (!hookRunner) {
return null;
}
return {
hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false,
runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner),
runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner),
};
}
function estimateTokenCountSafe(
messages: AgentMessage[],
estimateTokensFn: (message: AgentMessage) => number,
): number | undefined {
try {
let total = 0;
for (const message of messages) {
total += estimateTokensFn(message);
}
return total;
} catch {
return undefined;
}
}
export function buildBeforeCompactionHookMetrics(params: {
originalMessages: AgentMessage[];
currentMessages: AgentMessage[];
observedTokenCount?: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
return {
messageCountOriginal: params.originalMessages.length,
tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn),
messageCountBefore: params.currentMessages.length,
tokenCountBefore:
params.observedTokenCount ??
estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn),
};
}
export async function runBeforeCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionKey?: string;
sessionAgentId: string;
workspaceDir: string;
messageProvider?: string;
metrics: ReturnType<typeof buildBeforeCompactionHookMetrics>;
}) {
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
try {
const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey,
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
messageCountOriginal: params.metrics.messageCountOriginal,
tokenCountOriginal: params.metrics.tokenCountOriginal,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:before hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("before_compaction")) {
try {
await params.hookRunner.runBeforeCompaction?.(
{
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
return {
hookSessionKey,
missingSessionKey,
};
}
export function estimateTokensAfterCompaction(params: {
messagesAfter: AgentMessage[];
observedTokenCount?: number;
fullSessionTokensBefore: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn);
if (tokensAfter === undefined) {
return undefined;
}
const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore;
if (
sanityCheckBaseline > 0 &&
tokensAfter >
(params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1)
) {
return undefined;
}
return tokensAfter;
}
export async function runAfterCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionAgentId: string;
hookSessionKey: string;
missingSessionKey: boolean;
workspaceDir: string;
messageProvider?: string;
messageCountAfter: number;
tokensAfter?: number;
compactedCount: number;
sessionFile: string;
summaryLength?: number;
tokensBefore?: number;
firstKeptEntryId?: string;
}) {
try {
const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey: params.missingSessionKey,
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
summaryLength: params.summaryLength,
tokensBefore: params.tokensBefore,
tokensAfter: params.tokensAfter,
firstKeptEntryId: params.firstKeptEntryId,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:after hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("after_compaction")) {
try {
await params.hookRunner.runAfterCompaction?.(
{
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
sessionFile: params.sessionFile,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: params.hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
}