From 71ec42127d8f77c641bcf9cdf296352f4ce63709 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Thu, 5 Mar 2026 22:08:26 -0500 Subject: [PATCH] feat(hooks): emit compaction lifecycle hooks (#16788) --- CHANGELOG.md | 1 + docs/automation/hooks.md | 15 + .../pi-embedded-runner/compact.hooks.test.ts | 357 ++++++++++++++++++ src/agents/pi-embedded-runner/compact.ts | 171 ++++++--- 4 files changed, 494 insertions(+), 50 deletions(-) create mode 100644 src/agents/pi-embedded-runner/compact.hooks.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index f53c2aa5a08..f12365ebaef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai - Plugins/hook policy: add `plugins.entries..hooks.allowPromptInjection`, validate unknown typed hook names at runtime, and preserve legacy `before_agent_start` model/provider overrides while stripping prompt-mutating fields when prompt injection is disabled. (#36567) thanks @gumadeiras. - Tools/Diffs guidance: restore a short system-prompt hint for enabled diffs while keeping the detailed instructions in the companion skill, so diffs usage guidance stays out of user-prompt space. (#36904) thanks @gumadeiras. - Telegram/ACP topic bindings: accept Telegram Mac Unicode dash option prefixes in `/acp spawn`, support Telegram topic thread binding (`--thread here|auto`), route bound-topic follow-ups to ACP sessions, add actionable Telegram approval buttons with prefixed approval-id resolution, and pin successful bind confirmations in-topic. (#36683) Thanks @huntharo. +- Hooks/Compaction lifecycle: emit `session:compact:before` and `session:compact:after` internal events plus plugin compaction callbacks with session/count metadata, so automations can react to compaction runs consistently. (#16788) thanks @vincentkoc. ### Breaking diff --git a/docs/automation/hooks.md b/docs/automation/hooks.md index d34480f1ed3..d89838f6105 100644 --- a/docs/automation/hooks.md +++ b/docs/automation/hooks.md @@ -243,6 +243,14 @@ Triggered when agent commands are issued: - **`command:reset`**: When `/reset` command is issued - **`command:stop`**: When `/stop` command is issued +### Session Events + +- **`session:compact:before`**: Right before compaction summarizes history +- **`session:compact:after`**: After compaction completes with summary metadata + +Internal hook payloads emit these as `type: "session"` with `action: "compact:before"` / `action: "compact:after"`; listeners subscribe with the combined keys above. +Specific handler registration uses the literal key format `${type}:${action}`. For these events, register `session:compact:before` and `session:compact:after`. + ### Agent Events - **`agent:bootstrap`**: Before workspace bootstrap files are injected (hooks may mutate `context.bootstrapFiles`) @@ -351,6 +359,13 @@ These hooks are not event-stream listeners; they let plugins synchronously adjus - **`tool_result_persist`**: transform tool results before they are written to the session transcript. Must be synchronous; return the updated tool result payload or `undefined` to keep it as-is. See [Agent Loop](/concepts/agent-loop). +### Plugin Hook Events + +Compaction lifecycle hooks exposed through the plugin hook runner: + +- **`before_compaction`**: Runs before compaction with count/token metadata +- **`after_compaction`**: Runs after compaction with compaction summary metadata + ### Future Events Planned event types: diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts new file mode 100644 index 00000000000..ce8b9e0f696 --- /dev/null +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -0,0 +1,357 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { hookRunner, triggerInternalHook, sanitizeSessionHistoryMock } = vi.hoisted(() => ({ + hookRunner: { + hasHooks: vi.fn(), + runBeforeCompaction: vi.fn(), + runAfterCompaction: vi.fn(), + }, + triggerInternalHook: vi.fn(), + sanitizeSessionHistoryMock: vi.fn(async (params: { messages: unknown[] }) => params.messages), +})); + +vi.mock("../../plugins/hook-runner-global.js", () => ({ + getGlobalHookRunner: () => hookRunner, +})); + +vi.mock("../../hooks/internal-hooks.js", async () => { + const actual = await vi.importActual( + "../../hooks/internal-hooks.js", + ); + return { + ...actual, + triggerInternalHook, + }; +}); + +vi.mock("@mariozechner/pi-coding-agent", () => { + return { + createAgentSession: vi.fn(async () => { + const session = { + sessionId: "session-1", + messages: [ + { role: "user", content: "hello", timestamp: 1 }, + { role: "assistant", content: [{ type: "text", text: "hi" }], timestamp: 2 }, + { + role: "toolResult", + toolCallId: "t1", + toolName: "exec", + content: [{ type: "text", text: "output" }], + isError: false, + timestamp: 3, + }, + ], + agent: { + replaceMessages: vi.fn((messages: unknown[]) => { + session.messages = [...(messages as typeof session.messages)]; + }), + streamFn: vi.fn(), + }, + compact: vi.fn(async () => { + // simulate compaction trimming to a single message + session.messages.splice(1); + return { + summary: "summary", + firstKeptEntryId: "entry-1", + tokensBefore: 120, + details: { ok: true }, + }; + }), + dispose: vi.fn(), + }; + return { session }; + }), + SessionManager: { + open: vi.fn(() => ({})), + }, + SettingsManager: { + create: vi.fn(() => ({})), + }, + estimateTokens: vi.fn(() => 10), + }; +}); + +vi.mock("../session-tool-result-guard-wrapper.js", () => ({ + guardSessionManager: vi.fn(() => ({ + flushPendingToolResults: vi.fn(), + })), +})); + +vi.mock("../pi-settings.js", () => ({ + ensurePiCompactionReserveTokens: vi.fn(), + resolveCompactionReserveTokensFloor: vi.fn(() => 0), +})); + +vi.mock("../models-config.js", () => ({ + ensureOpenClawModelsJson: vi.fn(async () => {}), +})); + +vi.mock("../model-auth.js", () => ({ + getApiKeyForModel: vi.fn(async () => ({ apiKey: "test", mode: "env" })), + resolveModelAuthMode: vi.fn(() => "env"), +})); + +vi.mock("../sandbox.js", () => ({ + resolveSandboxContext: vi.fn(async () => null), +})); + +vi.mock("../session-file-repair.js", () => ({ + repairSessionFileIfNeeded: vi.fn(async () => {}), +})); + +vi.mock("../session-write-lock.js", () => ({ + acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })), + resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 0), +})); + +vi.mock("../bootstrap-files.js", () => ({ + makeBootstrapWarn: vi.fn(() => () => {}), + resolveBootstrapContextForRun: vi.fn(async () => ({ contextFiles: [] })), +})); + +vi.mock("../docs-path.js", () => ({ + resolveOpenClawDocsPath: vi.fn(async () => undefined), +})); + +vi.mock("../channel-tools.js", () => ({ + listChannelSupportedActions: vi.fn(() => undefined), + resolveChannelMessageToolHints: vi.fn(() => undefined), +})); + +vi.mock("../pi-tools.js", () => ({ + createOpenClawCodingTools: vi.fn(() => []), +})); + +vi.mock("./google.js", () => ({ + logToolSchemasForGoogle: vi.fn(), + sanitizeSessionHistory: sanitizeSessionHistoryMock, + sanitizeToolsForGoogle: vi.fn(({ tools }: { tools: unknown[] }) => tools), +})); + +vi.mock("./tool-split.js", () => ({ + splitSdkTools: vi.fn(() => ({ builtInTools: [], customTools: [] })), +})); + +vi.mock("../transcript-policy.js", () => ({ + resolveTranscriptPolicy: vi.fn(() => ({ + allowSyntheticToolResults: false, + validateGeminiTurns: false, + validateAnthropicTurns: false, + })), +})); + +vi.mock("./extensions.js", () => ({ + buildEmbeddedExtensionFactories: vi.fn(() => []), +})); + +vi.mock("./history.js", () => ({ + getDmHistoryLimitFromSessionKey: vi.fn(() => undefined), + limitHistoryTurns: vi.fn((msgs: unknown[]) => msgs.slice(0, 2)), +})); + +vi.mock("../skills.js", () => ({ + applySkillEnvOverrides: vi.fn(() => () => {}), + applySkillEnvOverridesFromSnapshot: vi.fn(() => () => {}), + loadWorkspaceSkillEntries: vi.fn(() => []), + resolveSkillsPromptForRun: vi.fn(() => undefined), +})); + +vi.mock("../agent-paths.js", () => ({ + resolveOpenClawAgentDir: vi.fn(() => "/tmp"), +})); + +vi.mock("../agent-scope.js", () => ({ + resolveSessionAgentIds: vi.fn(() => ({ defaultAgentId: "main", sessionAgentId: "main" })), +})); + +vi.mock("../date-time.js", () => ({ + formatUserTime: vi.fn(() => ""), + resolveUserTimeFormat: vi.fn(() => ""), + resolveUserTimezone: vi.fn(() => ""), +})); + +vi.mock("../defaults.js", () => ({ + DEFAULT_MODEL: "fake-model", + DEFAULT_PROVIDER: "openai", +})); + +vi.mock("../utils.js", () => ({ + resolveUserPath: vi.fn((p: string) => p), +})); + +vi.mock("../../infra/machine-name.js", () => ({ + getMachineDisplayName: vi.fn(async () => "machine"), +})); + +vi.mock("../../config/channel-capabilities.js", () => ({ + resolveChannelCapabilities: vi.fn(() => undefined), +})); + +vi.mock("../../utils/message-channel.js", () => ({ + normalizeMessageChannel: vi.fn(() => undefined), +})); + +vi.mock("../pi-embedded-helpers.js", () => ({ + ensureSessionHeader: vi.fn(async () => {}), + validateAnthropicTurns: vi.fn((m: unknown[]) => m), + validateGeminiTurns: vi.fn((m: unknown[]) => m), +})); + +vi.mock("../pi-project-settings.js", () => ({ + createPreparedEmbeddedPiSettingsManager: vi.fn(() => ({ + getGlobalSettings: vi.fn(() => ({})), + })), +})); + +vi.mock("./sandbox-info.js", () => ({ + buildEmbeddedSandboxInfo: vi.fn(() => undefined), +})); + +vi.mock("./model.js", () => ({ + buildModelAliasLines: vi.fn(() => []), + resolveModel: vi.fn(() => ({ + model: { provider: "openai", api: "responses", id: "fake", input: [] }, + error: null, + authStorage: { setRuntimeApiKey: vi.fn() }, + modelRegistry: {}, + })), +})); + +vi.mock("./session-manager-cache.js", () => ({ + prewarmSessionFile: vi.fn(async () => {}), + trackSessionManagerAccess: vi.fn(), +})); + +vi.mock("./system-prompt.js", () => ({ + applySystemPromptOverrideToSession: vi.fn(), + buildEmbeddedSystemPrompt: vi.fn(() => ""), + createSystemPromptOverride: vi.fn(() => () => ""), +})); + +vi.mock("./utils.js", () => ({ + describeUnknownError: vi.fn((err: unknown) => String(err)), + mapThinkingLevel: vi.fn(() => "off"), + resolveExecToolDefaults: vi.fn(() => undefined), +})); + +import { compactEmbeddedPiSessionDirect } from "./compact.js"; + +const sessionHook = (action: string) => + triggerInternalHook.mock.calls.find( + (call) => call[0]?.type === "session" && call[0]?.action === action, + )?.[0]; + +describe("compactEmbeddedPiSessionDirect hooks", () => { + beforeEach(() => { + triggerInternalHook.mockClear(); + hookRunner.hasHooks.mockReset(); + hookRunner.runBeforeCompaction.mockReset(); + hookRunner.runAfterCompaction.mockReset(); + sanitizeSessionHistoryMock.mockReset(); + sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => { + return params.messages; + }); + }); + + it("emits internal + plugin compaction hooks with counts", async () => { + hookRunner.hasHooks.mockReturnValue(true); + let sanitizedCount = 0; + sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => { + const sanitized = params.messages.slice(1); + sanitizedCount = sanitized.length; + return sanitized; + }); + + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + messageChannel: "telegram", + customInstructions: "focus on decisions", + }); + + expect(result.ok).toBe(true); + expect(sessionHook("compact:before")).toMatchObject({ + type: "session", + action: "compact:before", + }); + const beforeContext = sessionHook("compact:before")?.context; + const afterContext = sessionHook("compact:after")?.context; + + expect(beforeContext).toMatchObject({ + messageCount: 2, + tokenCount: 20, + messageCountOriginal: sanitizedCount, + tokenCountOriginal: sanitizedCount * 10, + }); + expect(afterContext).toMatchObject({ + messageCount: 1, + compactedCount: 1, + }); + expect(afterContext?.compactedCount).toBe( + (beforeContext?.messageCountOriginal as number) - (afterContext?.messageCount as number), + ); + + expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith( + expect.objectContaining({ + messageCount: 2, + tokenCount: 20, + }), + expect.objectContaining({ sessionKey: "agent:main:session-1", messageProvider: "telegram" }), + ); + expect(hookRunner.runAfterCompaction).toHaveBeenCalledWith( + { + messageCount: 1, + tokenCount: 10, + compactedCount: 1, + }, + expect.objectContaining({ sessionKey: "agent:main:session-1", messageProvider: "telegram" }), + ); + }); + + it("uses sessionId as hook session key fallback when sessionKey is missing", async () => { + hookRunner.hasHooks.mockReturnValue(true); + + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + }); + + expect(result.ok).toBe(true); + expect(sessionHook("compact:before")?.sessionKey).toBe("session-1"); + expect(sessionHook("compact:after")?.sessionKey).toBe("session-1"); + expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith( + expect.any(Object), + expect.objectContaining({ sessionKey: "session-1" }), + ); + expect(hookRunner.runAfterCompaction).toHaveBeenCalledWith( + expect.any(Object), + expect.objectContaining({ sessionKey: "session-1" }), + ); + }); + + it("applies validated transcript before hooks even when it becomes empty", async () => { + hookRunner.hasHooks.mockReturnValue(true); + sanitizeSessionHistoryMock.mockResolvedValue([]); + + const result = await compactEmbeddedPiSessionDirect({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + customInstructions: "focus on decisions", + }); + + expect(result.ok).toBe(true); + const beforeContext = sessionHook("compact:before")?.context; + expect(beforeContext).toMatchObject({ + messageCountOriginal: 0, + tokenCountOriginal: 0, + messageCount: 0, + tokenCount: 0, + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 83b98f532d4..1742c554033 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -11,6 +11,7 @@ import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js"; import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js"; import { resolveChannelCapabilities } from "../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; @@ -359,6 +360,7 @@ export async function compactEmbeddedPiSessionDirect( }); const sessionLabel = params.sessionKey ?? params.sessionId; + const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; const { contextFiles } = await resolveBootstrapContextForRun({ workspaceDir: effectiveWorkspace, config: params.config, @@ -372,7 +374,7 @@ export async function compactEmbeddedPiSessionDirect( elevated: params.bashElevated, }, sandbox, - messageProvider: params.messageChannel ?? params.messageProvider, + messageProvider: resolvedMessageProvider, agentAccountId: params.agentAccountId, sessionKey: sandboxSessionKey, sessionId: params.sessionId, @@ -577,7 +579,7 @@ export async function compactEmbeddedPiSessionDirect( }); const { session } = await createAgentSession({ - cwd: resolvedWorkspace, + cwd: effectiveWorkspace, agentDir, authStorage, modelRegistry, @@ -609,10 +611,14 @@ export async function compactEmbeddedPiSessionDirect( const validated = transcriptPolicy.validateAnthropicTurns ? validateAnthropicTurns(validatedGemini) : validatedGemini; - // Capture full message history BEFORE limiting — plugins need the complete conversation - const preCompactionMessages = [...session.messages]; + // Apply validated transcript to the live session even when no history limit is configured, + // so compaction and hook metrics are based on the same message set. + session.agent.replaceMessages(validated); + // "Original" compaction metrics should describe the validated transcript that enters + // limiting/compaction, not the raw on-disk session snapshot. + const originalMessages = session.messages.slice(); const truncated = limitHistoryTurns( - validated, + session.messages, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), ); // Re-run tool_use/tool_result pairing repair after truncation, since @@ -624,34 +630,69 @@ export async function compactEmbeddedPiSessionDirect( if (limited.length > 0) { session.agent.replaceMessages(limited); } - // Run before_compaction hooks (fire-and-forget). - // The session JSONL already contains all messages on disk, so plugins - // can read sessionFile asynchronously and process in parallel with - // the compaction LLM call — no need to block or wait for after_compaction. + const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; const hookRunner = getGlobalHookRunner(); - const hookCtx = { - agentId: params.sessionKey?.split(":")[0] ?? "main", - sessionKey: params.sessionKey, - sessionId: params.sessionId, - workspaceDir: params.workspaceDir, - messageProvider: params.messageChannel ?? params.messageProvider, - }; - if (hookRunner?.hasHooks("before_compaction")) { - hookRunner - .runBeforeCompaction( - { - messageCount: preCompactionMessages.length, - compactingCount: limited.length, - messages: preCompactionMessages, - sessionFile: params.sessionFile, - }, - hookCtx, - ) - .catch((hookErr: unknown) => { - log.warn(`before_compaction hook failed: ${String(hookErr)}`); - }); + const messageCountOriginal = originalMessages.length; + let tokenCountOriginal: number | undefined; + try { + tokenCountOriginal = 0; + for (const message of originalMessages) { + tokenCountOriginal += estimateTokens(message); + } + } catch { + tokenCountOriginal = undefined; + } + const messageCountBefore = session.messages.length; + let tokenCountBefore: number | undefined; + try { + tokenCountBefore = 0; + for (const message of session.messages) { + tokenCountBefore += estimateTokens(message); + } + } catch { + tokenCountBefore = undefined; + } + // TODO(#7175): Consider exposing full message snapshots or pre-compaction injection + // hooks; current events only report counts/metadata. + try { + const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey, + messageCount: messageCountBefore, + tokenCount: tokenCountBefore, + messageCountOriginal, + tokenCountOriginal, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:before hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (hookRunner?.hasHooks("before_compaction")) { + try { + await hookRunner.runBeforeCompaction( + { + messageCount: messageCountBefore, + tokenCount: tokenCountBefore, + }, + { + sessionId: params.sessionId, + agentId: sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: effectiveWorkspace, + messageProvider: resolvedMessageProvider, + }, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } } - const diagEnabled = log.isEnabled("debug"); const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; if (diagEnabled && preMetrics) { @@ -679,6 +720,9 @@ export async function compactEmbeddedPiSessionDirect( } const compactStartedAt = Date.now(); + // Measure compactedCount from the original pre-limiting transcript so compaction + // lifecycle metrics represent total reduction through the compaction pipeline. + const messageCountCompactionInput = messageCountOriginal; const result = await compactWithSafetyTimeout(() => session.compact(params.customInstructions), ); @@ -697,25 +741,8 @@ export async function compactEmbeddedPiSessionDirect( // If estimation fails, leave tokensAfter undefined tokensAfter = undefined; } - // Run after_compaction hooks (fire-and-forget). - // Also includes sessionFile for plugins that only need to act after - // compaction completes (e.g. analytics, cleanup). - if (hookRunner?.hasHooks("after_compaction")) { - hookRunner - .runAfterCompaction( - { - messageCount: session.messages.length, - tokenCount: tokensAfter, - compactedCount: limited.length - session.messages.length, - sessionFile: params.sessionFile, - }, - hookCtx, - ) - .catch((hookErr) => { - log.warn(`after_compaction hook failed: ${hookErr}`); - }); - } - + const messageCountAfter = session.messages.length; + const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter); const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined; if (diagEnabled && preMetrics && postMetrics) { log.debug( @@ -731,6 +758,50 @@ export async function compactEmbeddedPiSessionDirect( `delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`, ); } + // TODO(#9611): Consider exposing compaction summaries or post-compaction injection; + // current events only report summary metadata. + try { + const hookEvent = createInternalHookEvent("session", "compact:after", hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey, + messageCount: messageCountAfter, + tokenCount: tokensAfter, + compactedCount, + summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, + tokensBefore: result.tokensBefore, + tokensAfter, + firstKeptEntryId: result.firstKeptEntryId, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:after hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (hookRunner?.hasHooks("after_compaction")) { + try { + await hookRunner.runAfterCompaction( + { + messageCount: messageCountAfter, + tokenCount: tokensAfter, + compactedCount, + }, + { + sessionId: params.sessionId, + agentId: sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: effectiveWorkspace, + messageProvider: resolvedMessageProvider, + }, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } return { ok: true, compacted: true,