fix(agents): split queued embedded compaction wrapper

This commit is contained in:
Vincent Koc
2026-04-11 20:23:46 +01:00
parent 9d717176d3
commit f1c4e2f11d
5 changed files with 268 additions and 242 deletions

View File

@@ -2,7 +2,7 @@ export type { MessagingToolSend } from "./pi-embedded-messaging.js";
export {
compactEmbeddedPiSession,
compactEmbeddedPiSession as compactEmbeddedAgentSession,
} from "./pi-embedded-runner/compact.js";
} from "./pi-embedded-runner/compact.queued.js";
export {
applyExtraParamsToAgent,
resolveAgentTransportOverride,

View File

@@ -191,7 +191,7 @@ export function resetCompactHooksHarnessMocks(): void {
export async function loadCompactHooksHarness(): Promise<{
compactEmbeddedPiSessionDirect: typeof import("./compact.js").compactEmbeddedPiSessionDirect;
compactEmbeddedPiSession: typeof import("./compact.js").compactEmbeddedPiSession;
compactEmbeddedPiSession: typeof import("./compact.queued.js").compactEmbeddedPiSession;
__testing: typeof import("./compact.js").__testing;
onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate;
}> {
@@ -570,13 +570,15 @@ export async function loadCompactHooksHarness(): Promise<{
resolveExecToolDefaults: vi.fn(() => undefined),
}));
const [compactModule, transcriptEvents] = await Promise.all([
const [compactModule, compactQueuedModule, transcriptEvents] = await Promise.all([
import("./compact.js"),
import("./compact.queued.js"),
import("../../sessions/transcript-events.js"),
]);
return {
...compactModule,
compactEmbeddedPiSession: compactQueuedModule.compactEmbeddedPiSession,
onSessionTranscriptUpdate: transcriptEvents.onSessionTranscriptUpdate,
};
}

View File

@@ -24,7 +24,7 @@ import {
} from "./compact.hooks.harness.js";
let compactEmbeddedPiSessionDirect: typeof import("./compact.js").compactEmbeddedPiSessionDirect;
let compactEmbeddedPiSession: typeof import("./compact.js").compactEmbeddedPiSession;
let compactEmbeddedPiSession: typeof import("./compact.queued.js").compactEmbeddedPiSession;
let compactTesting: typeof import("./compact.js").__testing;
let onSessionTranscriptUpdate: typeof import("../../sessions/transcript-events.js").onSessionTranscriptUpdate;

View File

@@ -0,0 +1,261 @@
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
import { resolveContextEngine } from "../../context-engine/registry.js";
import {
captureCompactionCheckpointSnapshot,
cleanupCompactionCheckpointSnapshot,
persistSessionCompactionCheckpoint,
resolveSessionCompactionCheckpointReason,
type CapturedCompactionCheckpointSnapshot,
} from "../../gateway/session-compaction-checkpoints.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
import type { ProviderRuntimeModel } from "../../plugins/types.js";
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { resolveUserPath } from "../../utils.js";
import { resolveOpenClawAgentDir } from "../agent-paths.js";
import { resolveSessionAgentIds } from "../agent-scope.js";
import { resolveContextWindowInfo } from "../context-window-guard.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
import { maybeCompactAgentHarnessSession } from "../harness/selection.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import type { CompactEmbeddedPiSessionParams } from "./compact.types.js";
import { asCompactionHookRunner, runPostCompactionSideEffects } from "./compaction-hooks.js";
import {
buildEmbeddedCompactionRuntimeContext,
resolveEmbeddedCompactionTarget,
} from "./compaction-runtime-context.js";
import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { readPiModelContextTokens } from "./model-context-tokens.js";
import { resolveModelAsync } from "./model.js";
import type { EmbeddedPiCompactResult } from "./types.js";
/**
* Compacts a session with lane queueing (session lane + global lane).
* Use this from outside a lane context. If already inside a lane, use
* `compactEmbeddedPiSessionDirect` to avoid deadlocks.
*/
export async function compactEmbeddedPiSession(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
const harnessResult = await maybeCompactAgentHarnessSession(params);
if (harnessResult) {
return harnessResult;
}
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts));
return enqueueCommandInLane(sessionLane, () =>
enqueueGlobal(async () => {
ensureRuntimePluginsLoaded({
config: params.config,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: params.allowGatewaySubagentBinding,
});
ensureContextEnginesInitialized();
const contextEngine = await resolveContextEngine(params.config);
let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null;
let checkpointSnapshotRetained = false;
try {
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
const resolvedCompactionTarget = resolveEmbeddedCompactionTarget({
config: params.config,
provider: params.provider,
modelId: params.model,
authProfileId: params.authProfileId,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
// Resolve token budget from the effective compaction model so engine-
// owned /compact implementations see the same target as the runtime.
const ceProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER;
const ceModelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL;
const { model: ceModel } = await resolveModelAsync(
ceProvider,
ceModelId,
agentDir,
params.config,
);
const ceRuntimeModel = ceModel as ProviderRuntimeModel | undefined;
const ceCtxInfo = resolveContextWindowInfo({
cfg: params.config,
provider: ceProvider,
modelId: ceModelId,
modelContextTokens: readPiModelContextTokens(ceModel),
modelContextWindow: ceRuntimeModel?.contextWindow,
defaultTokens: DEFAULT_CONTEXT_TOKENS,
});
// When the context engine owns compaction, its compact() implementation
// bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally).
// Fire before_compaction / after_compaction hooks here so plugin subscribers
// are notified regardless of which engine is active.
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
checkpointSnapshot = engineOwnsCompaction
? captureCompactionCheckpointSnapshot({
sessionManager: SessionManager.open(params.sessionFile),
sessionFile: params.sessionFile,
})
: null;
const hookRunner = engineOwnsCompaction
? asCompactionHookRunner(getGlobalHookRunner())
: null;
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
const { sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const resolvedMessageProvider = params.messageChannel ?? params.messageProvider;
const hookCtx = {
sessionId: params.sessionId,
agentId: sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: resolveUserPath(params.workspaceDir),
messageProvider: resolvedMessageProvider,
};
const runtimeContext = {
...params,
...buildEmbeddedCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageProvider,
agentAccountId: params.agentAccountId,
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
currentMessageId: params.currentMessageId,
authProfileId: params.authProfileId,
workspaceDir: params.workspaceDir,
agentDir,
config: params.config,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
senderId: params.senderId,
provider: params.provider,
modelId: params.model,
thinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel,
bashElevated: params.bashElevated,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
}),
};
// Engine-owned compaction doesn't load the transcript at this level, so
// message counts are unavailable. We pass sessionFile so hook subscribers
// can read the transcript themselves if they need exact counts.
if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) {
try {
await hookRunner.runBeforeCompaction(
{
messageCount: -1,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: formatErrorMessage(err),
});
}
}
const result = await contextEngine.compact({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
tokenBudget: ceCtxInfo.tokens,
currentTokenCount: params.currentTokenCount,
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
customInstructions: params.customInstructions,
force: params.trigger === "manual",
runtimeContext,
});
if (result.ok && result.compacted) {
if (params.config && params.sessionKey && checkpointSnapshot) {
try {
const postCompactionSession = SessionManager.open(params.sessionFile);
const postLeafId = postCompactionSession.getLeafId() ?? undefined;
const storedCheckpoint = await persistSessionCompactionCheckpoint({
cfg: params.config,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
reason: resolveSessionCompactionCheckpointReason({
trigger: params.trigger,
}),
snapshot: checkpointSnapshot,
summary: result.result?.summary,
firstKeptEntryId: result.result?.firstKeptEntryId,
tokensBefore: result.result?.tokensBefore,
tokensAfter: result.result?.tokensAfter,
postSessionFile: params.sessionFile,
postLeafId,
postEntryId: postLeafId,
});
checkpointSnapshotRetained = storedCheckpoint !== null;
} catch (err) {
log.warn("failed to persist compaction checkpoint", {
errorMessage: formatErrorMessage(err),
});
}
}
await runContextEngineMaintenance({
contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "compaction",
runtimeContext,
});
}
if (engineOwnsCompaction && result.ok && result.compacted) {
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
}
if (
result.ok &&
result.compacted &&
hookRunner?.hasHooks?.("after_compaction") &&
hookRunner.runAfterCompaction
) {
try {
await hookRunner.runAfterCompaction(
{
messageCount: -1,
compactedCount: -1,
tokenCount: result.result?.tokensAfter,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: formatErrorMessage(err),
});
}
}
return {
ok: result.ok,
compacted: result.compacted,
reason: result.reason,
result: result.result
? {
summary: result.result.summary ?? "",
firstKeptEntryId: result.result.firstKeptEntryId ?? "",
tokensBefore: result.result.tokensBefore,
tokensAfter: result.result.tokensAfter,
details: result.result.details,
}
: undefined,
};
} finally {
if (!checkpointSnapshotRetained) {
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
}
await contextEngine.dispose?.();
}
}),
);
}

View File

@@ -10,8 +10,6 @@ import {
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import { resolveChannelCapabilities } from "../../config/channel-capabilities.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { ensureContextEnginesInitialized } from "../../context-engine/init.js";
import { resolveContextEngine } from "../../context-engine/registry.js";
import {
captureCompactionCheckpointSnapshot,
cleanupCompactionCheckpointSnapshot,
@@ -31,7 +29,6 @@ import {
transformProviderSystemPrompt,
} from "../../plugins/provider-runtime.js";
import type { ProviderRuntimeModel } from "../../plugins/types.js";
import { enqueueCommandInLane } from "../../process/command-queue.js";
import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
@@ -55,7 +52,6 @@ import { resolveContextWindowInfo } from "../context-window-guard.js";
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js";
import { resolveOpenClawDocsPath } from "../docs-path.js";
import { maybeCompactAgentHarnessSession } from "../harness/selection.js";
import { resolveHeartbeatPromptForSystemPrompt } from "../heartbeat-system-prompt.js";
import {
applyAuthHeaderOverride,
@@ -105,19 +101,14 @@ import {
runBeforeCompactionHooks,
runPostCompactionSideEffects,
} from "./compaction-hooks.js";
import {
buildEmbeddedCompactionRuntimeContext,
resolveEmbeddedCompactionTarget,
} from "./compaction-runtime-context.js";
import { resolveEmbeddedCompactionTarget } from "./compaction-runtime-context.js";
import {
compactWithSafetyTimeout,
resolveCompactionTimeoutMs,
} from "./compaction-safety-timeout.js";
import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
import { buildEmbeddedExtensionFactories } from "./extensions.js";
import { applyExtraParamsToAgent } from "./extra-params.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "./history.js";
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { hardenManualCompactionBoundary } from "./manual-compaction-boundary.js";
import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-discovery-input.js";
@@ -1200,234 +1191,6 @@ export async function compactEmbeddedPiSessionDirect(
}
}
/**
* Compacts a session with lane queueing (session lane + global lane).
* Use this from outside a lane context. If already inside a lane, use
* `compactEmbeddedPiSessionDirect` to avoid deadlocks.
*/
export async function compactEmbeddedPiSession(
params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> {
const harnessResult = await maybeCompactAgentHarnessSession(params);
if (harnessResult) {
return harnessResult;
}
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts));
return enqueueCommandInLane(sessionLane, () =>
enqueueGlobal(async () => {
ensureRuntimePluginsLoaded({
config: params.config,
workspaceDir: params.workspaceDir,
allowGatewaySubagentBinding: params.allowGatewaySubagentBinding,
});
ensureContextEnginesInitialized();
const contextEngine = await resolveContextEngine(params.config);
let checkpointSnapshot: CapturedCompactionCheckpointSnapshot | null = null;
let checkpointSnapshotRetained = false;
try {
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
const resolvedCompactionTarget = resolveEmbeddedCompactionTarget({
config: params.config,
provider: params.provider,
modelId: params.model,
authProfileId: params.authProfileId,
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
// Resolve token budget from the effective compaction model so engine-
// owned /compact implementations see the same target as the runtime.
const ceProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER;
const ceModelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL;
const { model: ceModel } = await resolveModelAsync(
ceProvider,
ceModelId,
agentDir,
params.config,
);
const ceRuntimeModel = ceModel as ProviderRuntimeModel | undefined;
const ceCtxInfo = resolveContextWindowInfo({
cfg: params.config,
provider: ceProvider,
modelId: ceModelId,
modelContextTokens: readPiModelContextTokens(ceModel),
modelContextWindow: ceRuntimeModel?.contextWindow,
defaultTokens: DEFAULT_CONTEXT_TOKENS,
});
// When the context engine owns compaction, its compact() implementation
// bypasses compactEmbeddedPiSessionDirect (which fires the hooks internally).
// Fire before_compaction / after_compaction hooks here so plugin subscribers
// are notified regardless of which engine is active.
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
checkpointSnapshot = engineOwnsCompaction
? captureCompactionCheckpointSnapshot({
sessionManager: SessionManager.open(params.sessionFile),
sessionFile: params.sessionFile,
})
: null;
const hookRunner = engineOwnsCompaction
? asCompactionHookRunner(getGlobalHookRunner())
: null;
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
const { sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const resolvedMessageProvider = params.messageChannel ?? params.messageProvider;
const hookCtx = {
sessionId: params.sessionId,
agentId: sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: resolveUserPath(params.workspaceDir),
messageProvider: resolvedMessageProvider,
};
const runtimeContext = {
...params,
...buildEmbeddedCompactionRuntimeContext({
sessionKey: params.sessionKey,
messageChannel: params.messageChannel,
messageProvider: params.messageProvider,
agentAccountId: params.agentAccountId,
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
currentMessageId: params.currentMessageId,
authProfileId: params.authProfileId,
workspaceDir: params.workspaceDir,
agentDir,
config: params.config,
skillsSnapshot: params.skillsSnapshot,
senderIsOwner: params.senderIsOwner,
senderId: params.senderId,
provider: params.provider,
modelId: params.model,
thinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel,
bashElevated: params.bashElevated,
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
}),
};
// Engine-owned compaction doesn't load the transcript at this level, so
// message counts are unavailable. We pass sessionFile so hook subscribers
// can read the transcript themselves if they need exact counts.
if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) {
try {
await hookRunner.runBeforeCompaction(
{
messageCount: -1,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: formatErrorMessage(err),
});
}
}
const result = await contextEngine.compact({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
tokenBudget: ceCtxInfo.tokens,
currentTokenCount: params.currentTokenCount,
compactionTarget: params.trigger === "manual" ? "threshold" : "budget",
customInstructions: params.customInstructions,
force: params.trigger === "manual",
runtimeContext,
});
if (result.ok && result.compacted) {
if (params.config && params.sessionKey && checkpointSnapshot) {
try {
const postCompactionSession = SessionManager.open(params.sessionFile);
const postLeafId = postCompactionSession.getLeafId() ?? undefined;
const storedCheckpoint = await persistSessionCompactionCheckpoint({
cfg: params.config,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
reason: resolveSessionCompactionCheckpointReason({
trigger: params.trigger,
}),
snapshot: checkpointSnapshot,
summary: result.result?.summary,
firstKeptEntryId: result.result?.firstKeptEntryId,
tokensBefore: result.result?.tokensBefore,
tokensAfter: result.result?.tokensAfter,
postSessionFile: params.sessionFile,
postLeafId,
postEntryId: postLeafId,
});
checkpointSnapshotRetained = storedCheckpoint !== null;
} catch (err) {
log.warn("failed to persist compaction checkpoint", {
errorMessage: formatErrorMessage(err),
});
}
}
await runContextEngineMaintenance({
contextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
reason: "compaction",
runtimeContext,
});
}
if (engineOwnsCompaction && result.ok && result.compacted) {
await runPostCompactionSideEffects({
config: params.config,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
});
}
if (
result.ok &&
result.compacted &&
hookRunner?.hasHooks?.("after_compaction") &&
hookRunner.runAfterCompaction
) {
try {
await hookRunner.runAfterCompaction(
{
messageCount: -1,
compactedCount: -1,
tokenCount: result.result?.tokensAfter,
sessionFile: params.sessionFile,
},
hookCtx,
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: formatErrorMessage(err),
});
}
}
return {
ok: result.ok,
compacted: result.compacted,
reason: result.reason,
result: result.result
? {
summary: result.result.summary ?? "",
firstKeptEntryId: result.result.firstKeptEntryId ?? "",
tokensBefore: result.result.tokensBefore,
tokensAfter: result.result.tokensAfter,
details: result.result.details,
}
: undefined,
};
} finally {
if (!checkpointSnapshotRetained) {
await cleanupCompactionCheckpointSnapshot(checkpointSnapshot);
}
await contextEngine.dispose?.();
}
}),
);
}
export const __testing = {
hasRealConversationContent,
hasMeaningfulConversationContent,