diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index b087cf27dc4..fa52b17ee20 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -2,7 +2,7 @@ export type { MessagingToolSend } from "./pi-embedded-messaging.js"; export { compactEmbeddedPiSession, compactEmbeddedPiSession as compactEmbeddedAgentSession, -} from "./pi-embedded-runner/compact.js"; +} from "./pi-embedded-runner/compact.queued.js"; export { applyExtraParamsToAgent, resolveAgentTransportOverride, diff --git a/src/agents/pi-embedded-runner/compact.hooks.harness.ts b/src/agents/pi-embedded-runner/compact.hooks.harness.ts index 0a07f181e28..fa2b54d6cea 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.harness.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.harness.ts @@ -191,7 +191,7 @@ export function resetCompactHooksHarnessMocks(): void { export async function loadCompactHooksHarness(): Promise<{ compactEmbeddedPiSessionDirect: typeof import("./compact.js").compactEmbeddedPiSessionDirect; - compactEmbeddedPiSession: typeof import("./compact.js").compactEmbeddedPiSession; + compactEmbeddedPiSession: typeof import("./compact.queued.js").compactEmbeddedPiSession; __testing: typeof import("./compact.js").__testing; onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate; }> { @@ -570,13 +570,15 @@ export async function loadCompactHooksHarness(): Promise<{ resolveExecToolDefaults: vi.fn(() => undefined), })); - const [compactModule, transcriptEvents] = await Promise.all([ + const [compactModule, compactQueuedModule, transcriptEvents] = await Promise.all([ import("./compact.js"), + import("./compact.queued.js"), import("../../sessions/transcript-events.js"), ]); return { ...compactModule, + compactEmbeddedPiSession: compactQueuedModule.compactEmbeddedPiSession, onSessionTranscriptUpdate: transcriptEvents.onSessionTranscriptUpdate, }; } diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index e6091af8b9e..263c131f8d5 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -24,7 +24,7 @@ import { } from "./compact.hooks.harness.js"; let compactEmbeddedPiSessionDirect: typeof import("./compact.js").compactEmbeddedPiSessionDirect; -let compactEmbeddedPiSession: typeof import("./compact.js").compactEmbeddedPiSession; +let compactEmbeddedPiSession: typeof import("./compact.queued.js").compactEmbeddedPiSession; let compactTesting: typeof import("./compact.js").__testing; let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate; diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts new file mode 100644 index 00000000000..2a71a694264 --- /dev/null +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -0,0 +1,261 @@ +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; +import { resolveContextEngine } from "../../context-engine/registry.js"; +import { + captureCompactionCheckpointSnapshot, + cleanupCompactionCheckpointSnapshot, + persistSessionCompactionCheckpoint, + resolveSessionCompactionCheckpointReason, + type CapturedCompactionCheckpointSnapshot, +} from "../../gateway/session-compaction-checkpoints.js"; +import { formatErrorMessage } from "../../infra/errors.js"; +import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import type { ProviderRuntimeModel } from "../../plugins/types.js"; +import { enqueueCommandInLane } from "../../process/command-queue.js"; +import { resolveUserPath } from "../../utils.js"; +import { resolveOpenClawAgentDir } from "../agent-paths.js"; +import { resolveSessionAgentIds } from "../agent-scope.js"; +import { resolveContextWindowInfo } from "../context-window-guard.js"; +import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; +import { maybeCompactAgentHarnessSession } from "../harness/selection.js"; +import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; +import type { CompactEmbeddedPiSessionParams } from "./compact.types.js"; +import { asCompactionHookRunner, runPostCompactionSideEffects } from "./compaction-hooks.js"; +import { + buildEmbeddedCompactionRuntimeContext, + resolveEmbeddedCompactionTarget, +} from "./compaction-runtime-context.js"; +import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; +import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; +import { log } from "./logger.js"; +import { readPiModelContextTokens } from "./model-context-tokens.js"; +import { resolveModelAsync } from "./model.js"; +import type { EmbeddedPiCompactResult } from "./types.js"; + +/** + * Compacts a session with lane queueing (session lane + global lane). + * Use this from outside a lane context. If already inside a lane, use + * `compactEmbeddedPiSessionDirect` to avoid deadlocks. + */ +export async function compactEmbeddedPiSession( + params: CompactEmbeddedPiSessionParams, +): Promise { + const harnessResult = await maybeCompactAgentHarnessSession(params); + if (harnessResult) { + return harnessResult; + } + const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); + const globalLane = resolveGlobalLane(params.lane); + const enqueueGlobal = + params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); + return enqueueCommandInLane(sessionLane, () => + enqueueGlobal(async () => { + ensureRuntimePluginsLoaded({ + config: params.config, + workspaceDir: params.workspaceDir, + allowGatewaySubagentBinding: params.allowGatewaySubagentBinding, + }); + ensureContextEnginesInitialized(); + const contextEngine = await resolveContextEngine(params.config); + let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null; + let checkpointSnapshotRetained = false; + try { + const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); + const resolvedCompactionTarget = resolveEmbeddedCompactionTarget({ + config: params.config, + provider: params.provider, + modelId: params.model, + authProfileId: params.authProfileId, + defaultProvider: DEFAULT_PROVIDER, + defaultModel: DEFAULT_MODEL, + }); + // Resolve token budget from the effective compaction model so engine- + // owned /compact implementations see the same target as the runtime. + const ceProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER; + const ceModelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL; + const { model: ceModel } = await resolveModelAsync( + ceProvider, + ceModelId, + agentDir, + params.config, + ); + const ceRuntimeModel = ceModel as ProviderRuntimeModel | undefined; + const ceCtxInfo = resolveContextWindowInfo({ + cfg: params.config, + provider: ceProvider, + modelId: ceModelId, + modelContextTokens: readPiModelContextTokens(ceModel), + modelContextWindow: ceRuntimeModel?.contextWindow, + defaultTokens: DEFAULT_CONTEXT_TOKENS, + }); + // When the context engine owns compaction, its compact() implementation + // bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally). + // Fire before_compaction / after_compaction hooks here so plugin subscribers + // are notified regardless of which engine is active. + const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; + checkpointSnapshot = engineOwnsCompaction + ? captureCompactionCheckpointSnapshot({ + sessionManager: SessionManager.open(params.sessionFile), + sessionFile: params.sessionFile, + }) + : null; + const hookRunner = engineOwnsCompaction + ? asCompactionHookRunner(getGlobalHookRunner()) + : null; + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; + const { sessionAgentId } = resolveSessionAgentIds({ + sessionKey: params.sessionKey, + config: params.config, + }); + const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; + const hookCtx = { + sessionId: params.sessionId, + agentId: sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: resolveUserPath(params.workspaceDir), + messageProvider: resolvedMessageProvider, + }; + const runtimeContext = { + ...params, + ...buildEmbeddedCompactionRuntimeContext({ + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + currentChannelId: params.currentChannelId, + currentThreadTs: params.currentThreadTs, + currentMessageId: params.currentMessageId, + authProfileId: params.authProfileId, + workspaceDir: params.workspaceDir, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + senderId: params.senderId, + provider: params.provider, + modelId: params.model, + thinkLevel: params.thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + }), + }; + // Engine-owned compaction doesn't load the transcript at this level, so + // message counts are unavailable. We pass sessionFile so hook subscribers + // can read the transcript themselves if they need exact counts. + if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) { + try { + await hookRunner.runBeforeCompaction( + { + messageCount: -1, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: formatErrorMessage(err), + }); + } + } + const result = await contextEngine.compact({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + tokenBudget: ceCtxInfo.tokens, + currentTokenCount: params.currentTokenCount, + compactionTarget: params.trigger === "manual" ? "threshold" : "budget", + customInstructions: params.customInstructions, + force: params.trigger === "manual", + runtimeContext, + }); + if (result.ok && result.compacted) { + if (params.config && params.sessionKey && checkpointSnapshot) { + try { + const postCompactionSession = SessionManager.open(params.sessionFile); + const postLeafId = postCompactionSession.getLeafId() ?? undefined; + const storedCheckpoint = await persistSessionCompactionCheckpoint({ + cfg: params.config, + sessionKey: params.sessionKey, + sessionId: params.sessionId, + reason: resolveSessionCompactionCheckpointReason({ + trigger: params.trigger, + }), + snapshot: checkpointSnapshot, + summary: result.result?.summary, + firstKeptEntryId: result.result?.firstKeptEntryId, + tokensBefore: result.result?.tokensBefore, + tokensAfter: result.result?.tokensAfter, + postSessionFile: params.sessionFile, + postLeafId, + postEntryId: postLeafId, + }); + checkpointSnapshotRetained = storedCheckpoint !== null; + } catch (err) { + log.warn("failed to persist compaction checkpoint", { + errorMessage: formatErrorMessage(err), + }); + } + } + await runContextEngineMaintenance({ + contextEngine, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + reason: "compaction", + runtimeContext, + }); + } + if (engineOwnsCompaction && result.ok && result.compacted) { + await runPostCompactionSideEffects({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + } + if ( + result.ok && + result.compacted && + hookRunner?.hasHooks?.("after_compaction") && + hookRunner.runAfterCompaction + ) { + try { + await hookRunner.runAfterCompaction( + { + messageCount: -1, + compactedCount: -1, + tokenCount: result.result?.tokensAfter, + sessionFile: params.sessionFile, + }, + hookCtx, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: formatErrorMessage(err), + }); + } + } + return { + ok: result.ok, + compacted: result.compacted, + reason: result.reason, + result: result.result + ? { + summary: result.result.summary ?? "", + firstKeptEntryId: result.result.firstKeptEntryId ?? "", + tokensBefore: result.result.tokensBefore, + tokensAfter: result.result.tokensAfter, + details: result.result.details, + } + : undefined, + }; + } finally { + if (!checkpointSnapshotRetained) { + await cleanupCompactionCheckpointSnapshot(checkpointSnapshot); + } + await contextEngine.dispose?.(); + } + }), + ); +} diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 58fe882461e..4880fec42d1 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -10,8 +10,6 @@ import { import type { ThinkLevel } from "../../auto-reply/thinking.js"; import { resolveChannelCapabilities } from "../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; -import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; -import { resolveContextEngine } from "../../context-engine/registry.js"; import { captureCompactionCheckpointSnapshot, cleanupCompactionCheckpointSnapshot, @@ -31,7 +29,6 @@ import { transformProviderSystemPrompt, } from "../../plugins/provider-runtime.js"; import type { ProviderRuntimeModel } from "../../plugins/types.js"; -import { enqueueCommandInLane } from "../../process/command-queue.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js"; import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; @@ -55,7 +52,6 @@ import { resolveContextWindowInfo } from "../context-window-guard.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; -import { maybeCompactAgentHarnessSession } from "../harness/selection.js"; import { resolveHeartbeatPromptForSystemPrompt } from "../heartbeat-system-prompt.js"; import { applyAuthHeaderOverride, @@ -105,19 +101,14 @@ import { runBeforeCompactionHooks, runPostCompactionSideEffects, } from "./compaction-hooks.js"; -import { - buildEmbeddedCompactionRuntimeContext, - resolveEmbeddedCompactionTarget, -} from "./compaction-runtime-context.js"; +import { resolveEmbeddedCompactionTarget } from "./compaction-runtime-context.js"; import { compactWithSafetyTimeout, resolveCompactionTimeoutMs, } from "./compaction-safety-timeout.js"; -import { runContextEngineMaintenance } from "./context-engine-maintenance.js"; import { buildEmbeddedExtensionFactories } from "./extensions.js"; import { applyExtraParamsToAgent } from "./extra-params.js"; import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js"; -import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js"; import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-discovery-input.js"; @@ -1200,234 +1191,6 @@ export async function compactEmbeddedPiSessionDirect( } } -/** - * Compacts a session with lane queueing (session lane + global lane). - * Use this from outside a lane context. If already inside a lane, use - * `compactEmbeddedPiSessionDirect` to avoid deadlocks. - */ -export async function compactEmbeddedPiSession( - params: CompactEmbeddedPiSessionParams, -): Promise { - const harnessResult = await maybeCompactAgentHarnessSession(params); - if (harnessResult) { - return harnessResult; - } - const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); - const globalLane = resolveGlobalLane(params.lane); - const enqueueGlobal = - params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); - return enqueueCommandInLane(sessionLane, () => - enqueueGlobal(async () => { - ensureRuntimePluginsLoaded({ - config: params.config, - workspaceDir: params.workspaceDir, - allowGatewaySubagentBinding: params.allowGatewaySubagentBinding, - }); - ensureContextEnginesInitialized(); - const contextEngine = await resolveContextEngine(params.config); - let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null; - let checkpointSnapshotRetained = false; - try { - const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); - const resolvedCompactionTarget = resolveEmbeddedCompactionTarget({ - config: params.config, - provider: params.provider, - modelId: params.model, - authProfileId: params.authProfileId, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); - // Resolve token budget from the effective compaction model so engine- - // owned /compact implementations see the same target as the runtime. - const ceProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER; - const ceModelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL; - const { model: ceModel } = await resolveModelAsync( - ceProvider, - ceModelId, - agentDir, - params.config, - ); - const ceRuntimeModel = ceModel as ProviderRuntimeModel | undefined; - const ceCtxInfo = resolveContextWindowInfo({ - cfg: params.config, - provider: ceProvider, - modelId: ceModelId, - modelContextTokens: readPiModelContextTokens(ceModel), - modelContextWindow: ceRuntimeModel?.contextWindow, - defaultTokens: DEFAULT_CONTEXT_TOKENS, - }); - // When the context engine owns compaction, its compact() implementation - // bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally). - // Fire before_compaction / after_compaction hooks here so plugin subscribers - // are notified regardless of which engine is active. - const engineOwnsCompaction = contextEngine.info.ownsCompaction === true; - checkpointSnapshot = engineOwnsCompaction - ? captureCompactionCheckpointSnapshot({ - sessionManager: SessionManager.open(params.sessionFile), - sessionFile: params.sessionFile, - }) - : null; - const hookRunner = engineOwnsCompaction - ? asCompactionHookRunner(getGlobalHookRunner()) - : null; - const hookSessionKey = params.sessionKey?.trim() || params.sessionId; - const { sessionAgentId } = resolveSessionAgentIds({ - sessionKey: params.sessionKey, - config: params.config, - }); - const resolvedMessageProvider = params.messageChannel ?? params.messageProvider; - const hookCtx = { - sessionId: params.sessionId, - agentId: sessionAgentId, - sessionKey: hookSessionKey, - workspaceDir: resolveUserPath(params.workspaceDir), - messageProvider: resolvedMessageProvider, - }; - const runtimeContext = { - ...params, - ...buildEmbeddedCompactionRuntimeContext({ - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - currentChannelId: params.currentChannelId, - currentThreadTs: params.currentThreadTs, - currentMessageId: params.currentMessageId, - authProfileId: params.authProfileId, - workspaceDir: params.workspaceDir, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - senderId: params.senderId, - provider: params.provider, - modelId: params.model, - thinkLevel: params.thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - }), - }; - // Engine-owned compaction doesn't load the transcript at this level, so - // message counts are unavailable. We pass sessionFile so hook subscribers - // can read the transcript themselves if they need exact counts. - if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) { - try { - await hookRunner.runBeforeCompaction( - { - messageCount: -1, - sessionFile: params.sessionFile, - }, - hookCtx, - ); - } catch (err) { - log.warn("before_compaction hook failed", { - errorMessage: formatErrorMessage(err), - }); - } - } - const result = await contextEngine.compact({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - tokenBudget: ceCtxInfo.tokens, - currentTokenCount: params.currentTokenCount, - compactionTarget: params.trigger === "manual" ? "threshold" : "budget", - customInstructions: params.customInstructions, - force: params.trigger === "manual", - runtimeContext, - }); - if (result.ok && result.compacted) { - if (params.config && params.sessionKey && checkpointSnapshot) { - try { - const postCompactionSession = SessionManager.open(params.sessionFile); - const postLeafId = postCompactionSession.getLeafId() ?? undefined; - const storedCheckpoint = await persistSessionCompactionCheckpoint({ - cfg: params.config, - sessionKey: params.sessionKey, - sessionId: params.sessionId, - reason: resolveSessionCompactionCheckpointReason({ - trigger: params.trigger, - }), - snapshot: checkpointSnapshot, - summary: result.result?.summary, - firstKeptEntryId: result.result?.firstKeptEntryId, - tokensBefore: result.result?.tokensBefore, - tokensAfter: result.result?.tokensAfter, - postSessionFile: params.sessionFile, - postLeafId, - postEntryId: postLeafId, - }); - checkpointSnapshotRetained = storedCheckpoint !== null; - } catch (err) { - log.warn("failed to persist compaction checkpoint", { - errorMessage: formatErrorMessage(err), - }); - } - } - await runContextEngineMaintenance({ - contextEngine, - sessionId: params.sessionId, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - reason: "compaction", - runtimeContext, - }); - } - if (engineOwnsCompaction && result.ok && result.compacted) { - await runPostCompactionSideEffects({ - config: params.config, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - }); - } - if ( - result.ok && - result.compacted && - hookRunner?.hasHooks?.("after_compaction") && - hookRunner.runAfterCompaction - ) { - try { - await hookRunner.runAfterCompaction( - { - messageCount: -1, - compactedCount: -1, - tokenCount: result.result?.tokensAfter, - sessionFile: params.sessionFile, - }, - hookCtx, - ); - } catch (err) { - log.warn("after_compaction hook failed", { - errorMessage: formatErrorMessage(err), - }); - } - } - return { - ok: result.ok, - compacted: result.compacted, - reason: result.reason, - result: result.result - ? { - summary: result.result.summary ?? "", - firstKeptEntryId: result.result.firstKeptEntryId ?? "", - tokensBefore: result.result.tokensBefore, - tokensAfter: result.result.tokensAfter, - details: result.result.details, - } - : undefined, - }; - } finally { - if (!checkpointSnapshotRetained) { - await cleanupCompactionCheckpointSnapshot(checkpointSnapshot); - } - await contextEngine.dispose?.(); - } - }), - ); -} - export const __testing = { hasRealConversationContent, hasMeaningfulConversationContent,