fix(reply): defer context compaction safely

This commit is contained in:
Keshav's Bot
2026-05-26 17:06:07 +01:00
committed by Peter Steinberger
parent 21c25bbb9d
commit ed3ae0da43
15 changed files with 714 additions and 40 deletions

View File

@@ -34,6 +34,16 @@ describe("classifyCompactionReason", () => {
);
});
it('classifies "already under target" as below threshold', () => {
expect(classifyCompactionReason("already under target")).toBe("below_threshold");
});
it("classifies deferred background maintenance as a skip-like reason", () => {
expect(classifyCompactionReason("deferred to background context-engine maintenance")).toBe(
"deferred_background",
);
});
it("classifies safeguard messages as guard-blocked", () => {
expect(
classifyCompactionReason(

View File

@@ -3,6 +3,9 @@ import { sanitizeForLog } from "../../terminal/ansi.js";
const MAX_COMPACTION_REASON_DETAIL_CHARS = 100;
export const DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON =
"deferred to background context-engine maintenance";
function isGenericCompactionCancelledReason(reason: string): boolean {
const normalized = normalizeLowercaseStringOrEmpty(reason);
return normalized === "compaction cancelled" || normalized === "error: compaction cancelled";
@@ -26,12 +29,17 @@ export function classifyCompactionReason(reason?: string): string {
if (text.includes("nothing to compact")) {
return "no_compactable_entries";
}
if (text.includes("below threshold")) {
// Backends use both phrases for the same harmless state: the transcript is
// already small enough, so preflight compaction should skip instead of fail.
if (text.includes("below threshold") || text.includes("already under target")) {
return "below_threshold";
}
if (text.includes("already compacted")) {
return "already_compacted_recently";
}
if (text.includes("deferred to background")) {
return "deferred_background";
}
if (text.includes("still exceeds target")) {
return "live_context_still_exceeds_target";
}

View File

@@ -97,6 +97,13 @@ function createDefaultSessionMessages(): unknown[] {
export const sessionMessages: unknown[] = createDefaultSessionMessages();
export const sessionAbortCompactionMock: Mock<(reason?: unknown) => void> = vi.fn();
export const createOpenClawCodingToolsMock = vi.fn(() => []);
export const guardSessionManagerMock = vi.fn(() => ({
flushPendingToolResults: vi.fn(),
}));
export const applyPiCompactionSettingsFromConfigMock = vi.fn();
export const createPreparedEmbeddedPiSettingsManagerMock = vi.fn(() => ({
getGlobalSettings: vi.fn(() => ({})),
}));
export const listRegisteredPluginAgentPromptGuidanceMock = vi.fn((params?: { surface?: string }) =>
params?.surface === "subagent"
? ["Subagent compact command guidance."]
@@ -120,6 +127,7 @@ export const rotateTranscriptAfterCompactionMock: Mock<
> = vi.fn(async () => ({
rotated: false,
}));
export const enqueueCommandInLaneMock = vi.fn((_lane: unknown, task: () => unknown) => task());
function createCompactHooksRuntimePlan(params: BuildAgentRuntimePlanParams): AgentRuntimePlan {
const modelApi = params.modelApi ?? params.model?.api ?? undefined;
@@ -272,6 +280,8 @@ export function resetCompactSessionStateMocks(): void {
maybeCompactAgentHarnessSessionMock.mockResolvedValue(undefined);
rotateTranscriptAfterCompactionMock.mockReset();
rotateTranscriptAfterCompactionMock.mockResolvedValue({ rotated: false });
enqueueCommandInLaneMock.mockReset();
enqueueCommandInLaneMock.mockImplementation((_lane: unknown, task: () => unknown) => task());
listRegisteredPluginAgentPromptGuidanceMock.mockReset();
listRegisteredPluginAgentPromptGuidanceMock.mockImplementation((params?: { surface?: string }) =>
params?.surface === "subagent"
@@ -332,6 +342,15 @@ export function resetCompactHooksHarnessMocks(): void {
resetCompactSessionStateMocks();
createOpenClawCodingToolsMock.mockReset();
createOpenClawCodingToolsMock.mockReturnValue([]);
guardSessionManagerMock.mockReset();
guardSessionManagerMock.mockReturnValue({
flushPendingToolResults: vi.fn(),
});
applyPiCompactionSettingsFromConfigMock.mockReset();
createPreparedEmbeddedPiSettingsManagerMock.mockReset();
createPreparedEmbeddedPiSettingsManagerMock.mockReturnValue({
getGlobalSettings: vi.fn(() => ({})),
});
}
export async function loadCompactHooksHarness(): Promise<{
@@ -470,14 +489,12 @@ export async function loadCompactHooksHarness(): Promise<{
}));
vi.doMock("../session-tool-result-guard-wrapper.js", () => ({
guardSessionManager: vi.fn(() => ({
flushPendingToolResults: vi.fn(),
})),
guardSessionManager: guardSessionManagerMock,
}));
vi.doMock("../pi-settings.js", () => ({
applyPiAutoCompactionGuard: vi.fn(() => ({ supported: true, disabled: false })),
applyPiCompactionSettingsFromConfig: vi.fn(),
applyPiCompactionSettingsFromConfig: applyPiCompactionSettingsFromConfigMock,
ensurePiCompactionReserveTokens: vi.fn(),
isSilentOverflowProneModel: vi.fn(() => false),
resolveCompactionReserveTokensFloor: vi.fn(() => 0),
@@ -532,7 +549,7 @@ export async function loadCompactHooksHarness(): Promise<{
}));
vi.doMock("../../process/command-queue.js", () => ({
enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()),
enqueueCommandInLane: enqueueCommandInLaneMock,
clearCommandLane: vi.fn(() => 0),
}));
@@ -793,9 +810,7 @@ export async function loadCompactHooksHarness(): Promise<{
}));
vi.doMock("../pi-project-settings.js", () => ({
createPreparedEmbeddedPiSettingsManager: vi.fn(() => ({
getGlobalSettings: vi.fn(() => ({})),
})),
createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerMock,
}));
vi.doMock("./sandbox-info.js", () => ({

View File

@@ -2,12 +2,16 @@ import type { AgentMessage } from "@earendil-works/pi-agent-core";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
applyExtraParamsToAgentMock,
applyPiCompactionSettingsFromConfigMock,
buildEmbeddedSystemPromptMock,
contextEngineCompactMock,
createPreparedEmbeddedPiSettingsManagerMock,
createOpenClawCodingToolsMock,
enqueueCommandInLaneMock,
ensureRuntimePluginsLoaded,
estimateTokensMock,
getMemorySearchManagerMock,
guardSessionManagerMock,
hookRunner,
listRegisteredPluginAgentPromptGuidanceMock,
loadCompactHooksHarness,
@@ -392,6 +396,15 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
expectRecordFields(mockCallArg(createOpenClawCodingToolsMock), {
modelContextWindowTokens: 64_000,
});
expectRecordFields(mockCallArg(guardSessionManagerMock, 0, 1), {
contextWindowTokens: 64_000,
});
expectRecordFields(mockCallArg(createPreparedEmbeddedPiSettingsManagerMock), {
contextTokenBudget: 64_000,
});
expectRecordFields(mockCallArg(applyPiCompactionSettingsFromConfigMock), {
contextTokenBudget: 64_000,
});
});
it("clamps the caller context token budget to the compaction model", async () => {
@@ -1542,6 +1555,39 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
});
});
it("fails deferred budget compaction when background maintenance is not scheduled", async () => {
const dispose = vi.fn(async () => {});
const maintain = vi.fn(async () => ({
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
}));
resolveContextEngineMock.mockResolvedValue({
info: { ownsCompaction: true, turnMaintenanceMode: "background" },
compact: contextEngineCompactMock,
dispose,
maintain,
} as never);
enqueueCommandInLaneMock.mockImplementationOnce(() => {
throw new Error("scheduler offline");
});
const result = await compactEmbeddedPiSession(
wrappedCompactionArgs({
trigger: "budget",
deferOwningContextEngineCompaction: true,
}),
);
expect(result.ok).toBe(false);
expect(result.compacted).toBe(false);
expect(result.reason).toBe("failed to schedule background context-engine maintenance");
expect(result.failure?.reason).toBe("deferred_compaction_not_scheduled");
expect(dispose).toHaveBeenCalledTimes(1);
expect(maintain).not.toHaveBeenCalled();
expect(contextEngineCompactMock).not.toHaveBeenCalled();
});
it("does not fall back to context-engine compaction for Codex native binding failures", async () => {
maybeCompactAgentHarnessSessionMock.mockResolvedValueOnce({
ok: false,

View File

@@ -3,7 +3,7 @@ import {
resolveContextEngine,
resolveContextEngineOwnerPluginId,
} from "../../context-engine/registry.js";
import type { ContextEngineRuntimeContext } from "../../context-engine/types.js";
import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-engine/types.js";
import {
captureCompactionCheckpointSnapshotAsync,
cleanupCompactionCheckpointSnapshot,
@@ -26,6 +26,7 @@ import {
} from "../harness/selection.js";
import { resolveContextConfigProviderForRuntime } from "../openai-codex-routing.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import { DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON } from "./compact-reasons.js";
import type { CompactEmbeddedPiSessionParams } from "./compact.types.js";
import { asCompactionHookRunner, runPostCompactionSideEffects } from "./compaction-hooks.js";
import {
@@ -64,6 +65,88 @@ function shouldFallbackAfterHarnessCompaction(
);
}
const DEFERRED_CONTEXT_ENGINE_COMPACTION_SCHEDULE_FAILURE_REASON =
"failed to schedule background context-engine maintenance";
function shouldDeferOwningContextEngineBudgetCompaction(params: {
compactParams: CompactEmbeddedPiSessionParams;
contextEngine: ContextEngine;
}): boolean {
// Request-time budget compaction for context-engine-owned transcripts can
// spend the whole reply preflight budget. Only defer engines that explicitly
// advertise background turn maintenance, leaving native/current-session
// harness compaction synchronous.
return (
params.compactParams.deferOwningContextEngineCompaction === true &&
params.compactParams.trigger === "budget" &&
params.contextEngine.info.ownsCompaction === true &&
params.contextEngine.info.turnMaintenanceMode === "background" &&
typeof params.contextEngine.maintain === "function"
);
}
async function disposeContextEngine(contextEngine: ContextEngine): Promise<void> {
try {
await contextEngine.dispose?.();
} catch (err) {
log.warn("context engine dispose failed after deferred maintenance", {
errorMessage: formatErrorMessage(err),
});
}
}
async function deferOwningContextEngineBudgetCompaction(params: {
compactParams: CompactEmbeddedPiSessionParams;
contextEngine: ContextEngine;
contextEngineRuntimeContext: ContextEngineRuntimeContext;
}): Promise<EmbeddedPiCompactResult> {
let deferredScheduled = false;
try {
await runContextEngineMaintenance({
contextEngine: params.contextEngine,
sessionId: params.compactParams.sessionId,
sessionKey: params.compactParams.sessionKey,
sessionFile: params.compactParams.sessionFile,
reason: "turn",
runtimeContext: params.contextEngineRuntimeContext,
config: params.compactParams.config,
disposeDeferredContextEngineAfterMaintenance: true,
onDeferredMaintenance: () => {
deferredScheduled = true;
},
});
} catch (err) {
log.warn("failed to defer context-engine budget compaction", {
errorMessage: formatErrorMessage(err),
});
}
if (!deferredScheduled) {
await disposeContextEngine(params.contextEngine);
log.warn(
`[compaction] failed to schedule context-engine-owned budget compaction background maintenance ` +
`(sessionKey=${params.compactParams.sessionKey ?? params.compactParams.sessionId})`,
);
return {
ok: false,
compacted: false,
reason: DEFERRED_CONTEXT_ENGINE_COMPACTION_SCHEDULE_FAILURE_REASON,
failure: { reason: "deferred_compaction_not_scheduled" },
};
}
log.info(
`[compaction] deferred context-engine-owned budget compaction to background maintenance ` +
`(sessionKey=${params.compactParams.sessionKey ?? params.compactParams.sessionId} ` +
`scheduled=${String(deferredScheduled)})`,
);
return {
ok: true,
compacted: false,
reason: DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON,
};
}
/**
* Compacts a session with lane queueing (session lane + global lane).
* Use this from outside a lane context. If already inside a lane, use
@@ -165,6 +248,18 @@ export async function compactEmbeddedPiSession(
`native harness compaction could not use its session binding; falling back to context engine: ${harnessResult.reason ?? "unknown"}`,
);
}
if (
shouldDeferOwningContextEngineBudgetCompaction({
compactParams: params,
contextEngine,
})
) {
return await deferOwningContextEngineBudgetCompaction({
compactParams: params,
contextEngine,
contextEngineRuntimeContext,
});
}
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =

View File

@@ -499,6 +499,8 @@ async function compactEmbeddedPiSessionDirectOnce(
defaultProvider: DEFAULT_PROVIDER,
defaultModel: DEFAULT_MODEL,
});
// Keep the configured provider for context-window policy, while auth/model loading below can
// route OpenAI compaction through Codex OAuth when that runtime owns the session credentials.
const modelConfigProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER;
const modelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL;
const authProfileId = resolvedCompactionTarget.authProfileId;
@@ -727,7 +729,7 @@ async function compactEmbeddedPiSessionDirectOnce(
model: effectiveModel,
modelApi: effectiveModel.api,
harnessId: params.agentHarnessId,
harnessRuntime: params.agentHarnessId,
harnessRuntime: selectedHarnessRuntime,
authProfileProvider: authProfileId?.split(":", 1)[0],
sessionAuthProfileId: authProfileId,
config: params.config,

View File

@@ -62,6 +62,11 @@ export type CompactEmbeddedPiSessionParams = {
tokenBudget?: number;
force?: boolean;
trigger?: "budget" | "overflow" | "manual";
/**
* Preflight callers can allow native/current-session harness compaction but
* move plugin-owned budget compaction onto background turn maintenance.
*/
deferOwningContextEngineCompaction?: boolean;
diagId?: string;
attempt?: number;
maxAttempts?: number;

View File

@@ -340,6 +340,7 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => {
describe("runContextEngineMaintenance", () => {
beforeEach(async () => {
vi.useRealTimers();
rewriteTranscriptEntriesInSessionManagerMock.mockClear();
rewriteTranscriptEntriesInSessionFileMock.mockClear();
await loadFreshContextEngineMaintenanceModuleForTest();
@@ -844,6 +845,118 @@ describe("runContextEngineMaintenance", () => {
});
});
it("disposes owned deferred engines only after their maintenance run finishes", async () => {
await withStateDirEnv("openclaw-turn-maintenance-dispose-", async () => {
resetCommandQueueStateForTest();
resetTaskRegistryForTests({ persist: false });
resetTaskFlowRegistryForTests({ persist: false });
const waitForRealAssertion = async (assertion: () => void): Promise<void> => {
const startedAt = Date.now();
for (;;) {
try {
assertion();
return;
} catch (error) {
if (Date.now() - startedAt >= 2_000) {
throw error;
}
await new Promise<void>((resolve) => setTimeout(resolve, 5));
}
}
};
const sessionKey = "agent:main:session-owned-dispose";
const events: string[] = [];
let releaseFirstMaintenance: (() => void) | undefined;
let releaseSecondMaintenance: (() => void) | undefined;
const createBackgroundEngine = (id: "first" | "second") =>
({
info: {
id,
name: "Test Engine",
turnMaintenanceMode: "background" as const,
},
ingest: async () => ({ ingested: true }),
assemble: async ({ messages }: { messages: unknown[] }) => ({
messages,
estimatedTokens: 0,
}),
compact: async () => ({ ok: true, compacted: false }),
maintain: vi.fn(async () => {
events.push(`maintain:${id}`);
await new Promise<void>((resolve) => {
if (id === "first") {
releaseFirstMaintenance = resolve;
} else {
releaseSecondMaintenance = resolve;
}
});
return {
changed: false,
bytesFreed: 0,
rewrittenEntries: 0,
};
}),
dispose: vi.fn(async () => {
events.push(`dispose:${id}`);
}),
}) as NonNullable<Parameters<typeof runContextEngineMaintenance>[0]["contextEngine"]>;
const firstEngine = createBackgroundEngine("first");
const secondEngine = createBackgroundEngine("second");
const deferredPromises: Promise<void>[] = [];
await runContextEngineMaintenance({
contextEngine: firstEngine,
sessionId: "session-owned-dispose",
sessionKey,
sessionFile: "/tmp/session-owned-dispose.jsonl",
reason: "turn",
disposeDeferredContextEngineAfterMaintenance: true,
onDeferredMaintenance: (promise) => {
deferredPromises.push(promise);
},
});
await waitForRealAssertion(() => expect(events).toContain("maintain:first"));
await runContextEngineMaintenance({
contextEngine: secondEngine,
sessionId: "session-owned-dispose",
sessionKey,
sessionFile: "/tmp/session-owned-dispose.jsonl",
reason: "turn",
disposeDeferredContextEngineAfterMaintenance: true,
onDeferredMaintenance: (promise) => {
deferredPromises.push(promise);
},
});
if (!releaseFirstMaintenance) {
throw new Error("Expected first maintenance release callback to be initialized");
}
releaseFirstMaintenance();
await waitForRealAssertion(() => expect(events).toContain("maintain:second"));
expect(secondEngine.dispose).not.toHaveBeenCalled();
if (!releaseSecondMaintenance) {
throw new Error("Expected second maintenance release callback to be initialized");
}
releaseSecondMaintenance();
await deferredPromises[1];
expect(firstEngine.dispose).toHaveBeenCalledTimes(1);
expect(secondEngine.dispose).toHaveBeenCalledTimes(1);
expect(events).toEqual([
"maintain:first",
"dispose:first",
"maintain:second",
"dispose:second",
]);
});
});
it("replaces legacy active maintenance tasks that are missing a runId", async () => {
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
vi.useFakeTimers();

View File

@@ -50,6 +50,7 @@ type DeferredTurnMaintenanceScheduleParams = {
runtimeContext?: ContextEngineRuntimeContext;
agentId?: string;
config?: OpenClawConfig;
disposeContextEngineAfterMaintenance?: boolean;
};
type DeferredTurnMaintenanceRunState = {
@@ -111,6 +112,18 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string {
return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`;
}
async function disposeDeferredMaintenanceContextEngine(
contextEngine: ContextEngine,
): Promise<void> {
try {
await contextEngine.dispose?.();
} catch (err) {
log.warn("context engine dispose failed after deferred maintenance", {
errorMessage: formatErrorMessage(err),
});
}
}
export function createDeferredTurnMaintenanceAbortSignal(params?: {
processLike?: DeferredTurnMaintenanceProcessLike;
}): {
@@ -386,6 +399,7 @@ async function runDeferredTurnMaintenanceWorker(params: {
agentId?: string;
runId: string;
config?: OpenClawConfig;
disposeContextEngineAfterMaintenance?: boolean;
}): Promise<void> {
let surfacedUserNotice = false;
let longRunningTimer: ReturnType<typeof setTimeout> | null = null;
@@ -533,6 +547,9 @@ async function runDeferredTurnMaintenanceWorker(params: {
log.warn(`deferred context engine maintenance failed: ${reason}`);
} finally {
shutdownAbort.dispose();
if (params.disposeContextEngineAfterMaintenance) {
await disposeDeferredMaintenanceContextEngine(params.contextEngine);
}
}
}
@@ -545,8 +562,15 @@ function scheduleDeferredTurnMaintenance(
}
const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey);
if (activeRun) {
const supersededParams = activeRun.rerunRequested ? activeRun.latestParams : undefined;
activeRun.rerunRequested = true;
activeRun.latestParams = { ...params, sessionKey };
if (
supersededParams?.disposeContextEngineAfterMaintenance &&
supersededParams.contextEngine !== params.contextEngine
) {
void disposeDeferredMaintenanceContextEngine(supersededParams.contextEngine);
}
return activeRun.promise;
}
@@ -593,6 +617,7 @@ function scheduleDeferredTurnMaintenance(
agentId: params.agentId,
config: params.config,
runId: task.runId!,
disposeContextEngineAfterMaintenance: params.disposeContextEngineAfterMaintenance,
}),
);
} catch (err) {
@@ -622,9 +647,13 @@ function scheduleDeferredTurnMaintenance(
const shutdownTriggered = schedulerAbort.abortSignal?.aborted === true;
const rerunParams =
current.rerunRequested && !shutdownTriggered ? current.latestParams : undefined;
const discardedRerunParams =
current.rerunRequested && shutdownTriggered ? current.latestParams : undefined;
activeDeferredTurnMaintenanceRuns.delete(sessionKey);
if (rerunParams) {
await scheduleDeferredTurnMaintenance(rerunParams);
} else if (discardedRerunParams?.disposeContextEngineAfterMaintenance) {
await disposeDeferredMaintenanceContextEngine(discardedRerunParams.contextEngine);
}
});
state = {
@@ -653,6 +682,7 @@ export async function runContextEngineMaintenance(params: {
executionMode?: "foreground" | "background";
onDeferredMaintenance?: (promise: Promise<void>) => void;
config?: OpenClawConfig;
disposeDeferredContextEngineAfterMaintenance?: boolean;
}): Promise<ContextEngineMaintenanceResult | undefined> {
if (typeof params.contextEngine?.maintain !== "function") {
return undefined;
@@ -675,6 +705,7 @@ export async function runContextEngineMaintenance(params: {
runtimeContext: params.runtimeContext,
agentId: params.agentId,
config: params.config,
disposeContextEngineAfterMaintenance: params.disposeDeferredContextEngineAfterMaintenance,
});
if (deferred) {
params.onDeferredMaintenance?.(deferred);

View File

@@ -673,6 +673,114 @@ describe("runMemoryFlushIfNeeded", () => {
expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled();
});
it("continues when preflight compaction reports the session is already under target", async () => {
const sessionFile = path.join(rootDir, "session.jsonl");
await fs.writeFile(
sessionFile,
`${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`,
"utf8",
);
registerMemoryFlushPlanResolverForTest(() => ({
softThresholdTokens: 1,
forceFlushTranscriptBytes: 1_000_000_000,
reserveTokensFloor: 0,
prompt: "Pre-compaction memory flush.\nNO_REPLY",
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
relativePath: "memory/2023-11-14.md",
}));
compactEmbeddedPiSessionMock.mockResolvedValueOnce({
ok: true,
compacted: false,
reason: "already under target",
});
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
updatedAt: Date.now(),
totalTokens: 120,
totalTokensFresh: true,
};
const entry = await runPreflightCompactionIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
sessionKey: "agent:main:main",
}),
defaultModel: "anthropic/claude-opus-4-6",
agentCfgContextTokens: 100,
sessionEntry,
sessionStore: { "agent:main:main": sessionEntry },
sessionKey: "agent:main:main",
storePath: path.join(rootDir, "sessions.json"),
isHeartbeat: false,
replyOperation: createReplyOperation(),
});
expect(entry).toBe(sessionEntry);
expect(compactEmbeddedPiSessionMock).toHaveBeenCalledTimes(1);
expect(requireCompactEmbeddedPiSessionCall()).toMatchObject({
trigger: "budget",
deferOwningContextEngineCompaction: false,
contextTokenBudget: 100,
});
expect(incrementCompactionCountMock).not.toHaveBeenCalled();
});
it("fails when required preflight context-engine compaction is deferred to background maintenance", async () => {
const sessionFile = path.join(rootDir, "session.jsonl");
await fs.writeFile(
sessionFile,
`${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`,
"utf8",
);
registerMemoryFlushPlanResolverForTest(() => ({
softThresholdTokens: 1,
forceFlushTranscriptBytes: 1_000_000_000,
reserveTokensFloor: 0,
prompt: "Pre-compaction memory flush.\nNO_REPLY",
systemPrompt: "Write memory to memory/YYYY-MM-DD.md.",
relativePath: "memory/2023-11-14.md",
}));
compactEmbeddedPiSessionMock.mockResolvedValueOnce({
ok: true,
compacted: false,
reason: "deferred to background context-engine maintenance",
});
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
updatedAt: Date.now(),
totalTokens: 120,
totalTokensFresh: true,
};
await expect(
runPreflightCompactionIfNeeded({
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
sessionKey: "agent:main:main",
}),
defaultModel: "anthropic/claude-opus-4-6",
agentCfgContextTokens: 100,
sessionEntry,
sessionStore: { "agent:main:main": sessionEntry },
sessionKey: "agent:main:main",
storePath: path.join(rootDir, "sessions.json"),
isHeartbeat: false,
replyOperation: createReplyOperation(),
}),
).rejects.toThrow(
"Preflight compaction required but failed: deferred to background context-engine maintenance",
);
expect(compactEmbeddedPiSessionMock).toHaveBeenCalledTimes(1);
expect(incrementCompactionCountMock).not.toHaveBeenCalled();
});
it("passes runtime policy session key to preflight compaction sandbox resolution", async () => {
const sessionFile = path.join(rootDir, "session.jsonl");
await fs.writeFile(
@@ -692,7 +800,8 @@ describe("runMemoryFlushIfNeeded", () => {
sessionId: "session",
sessionFile,
updatedAt: Date.now(),
totalTokensFresh: false,
totalTokens: 120,
totalTokensFresh: true,
};
await runPreflightCompactionIfNeeded({
@@ -750,7 +859,8 @@ describe("runMemoryFlushIfNeeded", () => {
sessionId: "session",
sessionFile,
updatedAt: Date.now(),
totalTokensFresh: false,
totalTokens: 120,
totalTokensFresh: true,
};
const sessionStore = { "agent:main:telegram:group:redacted": sessionEntry };
@@ -802,7 +912,8 @@ describe("runMemoryFlushIfNeeded", () => {
sessionId: "session",
sessionFile,
updatedAt: Date.now(),
totalTokensFresh: false,
totalTokens: 120,
totalTokensFresh: true,
};
const sessionStore = { "agent:main:telegram:group:redacted": sessionEntry };
@@ -947,7 +1058,7 @@ describe("runMemoryFlushIfNeeded", () => {
compactEmbeddedPiSessionMock.mockResolvedValueOnce({
ok: true,
compacted: false,
reason: "already under target",
reason: "plugin already stored this turn",
});
const sessionEntry: SessionEntry = {
sessionId: "session",

View File

@@ -10,6 +10,10 @@ import { runWithModelFallback } from "../../agents/model-fallback.js";
import { listLegacyRuntimeModelProviderAliases } from "../../agents/model-runtime-aliases.js";
import { isCliProvider } from "../../agents/model-selection.js";
import { resolveContextConfigProviderForRuntime } from "../../agents/openai-codex-routing.js";
import {
classifyCompactionReason,
DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON,
} from "../../agents/pi-embedded-runner/compact-reasons.js";
import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js";
import {
derivePromptTokens,
@@ -176,6 +180,22 @@ function resolveEffectivePromptTokens(
return base + output + estimate;
}
function isPreflightCompactionSkipReason(reason?: string): boolean {
const classification = classifyCompactionReason(reason);
// Preflight compaction is a guardrail, not a hard dependency. These classes
// mean the context engine found nothing useful to compact, so the reply should
// continue instead of surfacing a generic user-facing failure.
return (
classification === "below_threshold" ||
classification === "no_compactable_entries" ||
classification === "already_compacted_recently"
);
}
function isDeferredPreflightCompactionReason(reason?: string): boolean {
return normalizeOptionalString(reason) === DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON;
}
function resolveMemoryFlushModelFallbackOptions(
run: FollowupRun["run"],
model?: string,
@@ -624,6 +644,11 @@ export async function runPreflightCompactionIfNeeded(params: {
isHeartbeat: boolean;
replyOperation: ReplyOperation;
}): Promise<SessionEntry | undefined> {
const deps = {
compactEmbeddedPiSession: memoryDeps.compactEmbeddedPiSession,
incrementCompactionCount: memoryDeps.incrementCompactionCount,
refreshQueuedFollowupSession: memoryDeps.refreshQueuedFollowupSession,
};
if (!params.sessionKey) {
return params.sessionEntry;
}
@@ -788,7 +813,7 @@ export async function runPreflightCompactionIfNeeded(params: {
params.sessionKey ?? params.followupRun.run.sessionKey,
{ storePath: params.storePath },
);
const result = await memoryDeps.compactEmbeddedPiSession({
const result = await deps.compactEmbeddedPiSession({
sessionId: entry.sessionId,
sessionKey: params.sessionKey,
sandboxSessionKey: params.runtimePolicySessionKey,
@@ -813,14 +838,19 @@ export async function runPreflightCompactionIfNeeded(params: {
thinkLevel: params.followupRun.run.thinkLevel,
bashElevated: params.followupRun.run.bashElevated,
trigger: "budget",
currentTokenCount: tokenCountForCompaction ?? freshPersistedTokens,
deferOwningContextEngineCompaction: false,
contextTokenBudget: contextWindowTokens,
currentTokenCount: tokenCountForCompaction ?? freshPersistedTokens,
ownerNumbers: params.followupRun.run.ownerNumbers,
abortSignal: params.replyOperation.abortSignal,
});
if (!result?.ok) {
const reason = result?.reason ?? "not_compacted";
if (isPreflightCompactionSkipReason(reason)) {
logVerbose(`preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${reason}`);
return entry ?? params.sessionEntry;
}
logVerbose(`preflightCompaction failed: sessionKey=${params.sessionKey} reason=${reason}`);
if (isRecoverableNativeHarnessBindingFailure(result)) {
logVerbose(
@@ -832,12 +862,18 @@ export async function runPreflightCompactionIfNeeded(params: {
}
if (!result.compacted) {
const reason = result.reason ?? "not_compacted";
logVerbose(`preflightCompaction no-op: sessionKey=${params.sessionKey} reason=${reason}`);
const reason = normalizeOptionalString(result.reason);
if (isDeferredPreflightCompactionReason(reason)) {
logVerbose(`preflightCompaction failed: sessionKey=${params.sessionKey} reason=${reason}`);
throw new Error(`Preflight compaction required but failed: ${reason}`);
}
logVerbose(
`preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${reason ?? "not_compacted"}`,
);
return entry ?? params.sessionEntry;
}
await incrementCompactionCount({
await deps.incrementCompactionCount({
cfg: params.cfg,
sessionEntry: entry,
sessionStore: params.sessionStore,
@@ -861,7 +897,7 @@ export async function runPreflightCompactionIfNeeded(params: {
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
memoryDeps.refreshQueuedFollowupSession({
deps.refreshQueuedFollowupSession({
key: queueKey,
previousSessionId,
nextSessionId: entry.sessionId,

View File

@@ -190,6 +190,33 @@ describe("handleCompactCommand", () => {
expect(call.agentDir).toBe("/tmp/openclaw-agent-compact");
});
it("treats already-under-target manual compaction as skipped", async () => {
vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({
ok: false,
compacted: false,
reason: "already under target",
});
const result = await handleCompactCommand(
{
...buildCompactParams("/compact", {
commands: { text: true },
channels: { whatsapp: { allowFrom: ["*"] } },
} as OpenClawConfig),
sessionEntry: {
sessionId: "session-1",
updatedAt: Date.now(),
},
} as HandleCommandsParams,
true,
);
expect(result?.reply?.text).toBe(
"⚙️ Compaction skipped: context is already under the compaction target • Context 12.1k",
);
expect(vi.mocked(incrementCompactionCount)).not.toHaveBeenCalled();
});
it("uses the canonical session agent when resolving the compaction session file", async () => {
vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({
ok: true,

View File

@@ -53,9 +53,12 @@ function extractCompactInstructions(params: {
function isCompactionSkipReason(reason?: string): boolean {
const text = normalizeOptionalLowercaseString(reason) ?? "";
// Manual /compact mirrors preflight semantics: already-small sessions are a
// successful no-op, not a failed compaction.
return (
text.includes("nothing to compact") ||
text.includes("below threshold") ||
text.includes("already under target") ||
text.includes("already compacted") ||
text.includes("no real conversation messages")
);
@@ -74,6 +77,9 @@ function formatCompactionReason(reason?: string): string | undefined {
if (lower.includes("below threshold")) {
return "context is below the compaction threshold";
}
if (lower.includes("already under target")) {
return "context is already under the compaction target";
}
if (lower.includes("already compacted")) {
return "session was already compacted recently";
}

View File

@@ -52,6 +52,27 @@ let cachedDatabase: FlowRegistryDatabase | null = null;
const FLOW_REGISTRY_DIR_MODE = 0o700;
const FLOW_REGISTRY_FILE_MODE = 0o600;
const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
const FLOW_RUNS_COLUMNS = `
flow_id TEXT PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULL DEFAULT 'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
`;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
@@ -245,28 +266,70 @@ function hasFlowRunsColumn(db: DatabaseSync, columnName: string): boolean {
return rows.some((row) => row.name === columnName);
}
function rebuildLegacyFlowRunsTable(db: DatabaseSync) {
// Older live registries can retain owner_session_key TEXT NOT NULL even after owner_key is
// added. Current inserts do not write owner_session_key, so SQLite rejects mirrored flow rows
// until the table is rebuilt into the canonical schema.
db.exec(`
DROP TABLE IF EXISTS flow_runs_canonical_migration;
CREATE TABLE flow_runs_canonical_migration (
${FLOW_RUNS_COLUMNS}
);
INSERT INTO flow_runs_canonical_migration (
flow_id,
sync_mode,
owner_key,
requester_origin_json,
controller_id,
revision,
status,
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
state_json,
wait_json,
cancel_requested_at,
created_at,
updated_at,
ended_at
)
SELECT
flow_id,
CASE
WHEN sync_mode = 'task_mirrored' THEN 'task_mirrored'
ELSE 'managed'
END,
COALESCE(NULLIF(trim(owner_key), ''), owner_session_key),
requester_origin_json,
CASE
WHEN sync_mode = 'task_mirrored' THEN NULL
ELSE COALESCE(NULLIF(trim(controller_id), ''), 'core/legacy-restored')
END,
COALESCE(revision, 0),
status,
notify_policy,
goal,
current_step,
blocked_task_id,
blocked_summary,
state_json,
wait_json,
cancel_requested_at,
created_at,
updated_at,
ended_at
FROM flow_runs;
DROP TABLE flow_runs;
ALTER TABLE flow_runs_canonical_migration RENAME TO flow_runs;
`);
}
function ensureSchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULL DEFAULT 'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
${FLOW_RUNS_COLUMNS}
);
`);
if (!hasFlowRunsColumn(db, "owner_key") && hasFlowRunsColumn(db, "owner_session_key")) {
@@ -331,6 +394,35 @@ function ensureSchema(db: DatabaseSync) {
if (!hasFlowRunsColumn(db, "cancel_requested_at")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`);
}
if (hasFlowRunsColumn(db, "owner_session_key")) {
// Populate the canonical fields before rebuilding so existing rows survive the legacy-column
// drop, including pre-sync-mode single-task flows and older managed flows with no controller.
db.exec(`
UPDATE flow_runs
SET owner_key = owner_session_key
WHERE (owner_key IS NULL OR trim(owner_key) = '')
`);
db.exec(`
UPDATE flow_runs
SET sync_mode = CASE
WHEN shape = 'single_task' THEN 'task_mirrored'
ELSE 'managed'
END
WHERE sync_mode IS NULL OR trim(sync_mode) = ''
`);
db.exec(`
UPDATE flow_runs
SET revision = 0
WHERE revision IS NULL
`);
db.exec(`
UPDATE flow_runs
SET controller_id = 'core/legacy-restored'
WHERE sync_mode = 'managed'
AND (controller_id IS NULL OR trim(controller_id) = '')
`);
rebuildLegacyFlowRunsTable(db);
}
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);

View File

@@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import {
createTaskFlowForTask,
createManagedTaskFlow,
getTaskFlowById,
requestFlowCancel,
@@ -193,6 +194,82 @@ describe("task-flow-registry store runtime", () => {
});
});
it("migrates legacy owner_session_key schema before mirrored flow inserts", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;
resetTaskFlowRegistryForTests();
const sqlitePath = resolveTaskFlowRegistrySqlitePath(process.env);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(sqlitePath);
// This mirrors the live pre-migration table shape that kept owner_session_key as NOT NULL,
// which made current owner_key-only mirrored inserts fail with SQLITE_CONSTRAINT_NOTNULL.
db.exec(`
DROP TABLE IF EXISTS flow_runs;
CREATE TABLE flow_runs (
flow_id TEXT PRIMARY KEY,
owner_session_key TEXT NOT NULL,
requester_origin_json TEXT,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
INSERT INTO flow_runs (
flow_id,
owner_session_key,
status,
notify_policy,
goal,
current_step,
created_at,
updated_at
) VALUES (
'legacy-flow',
'agent:main:legacy',
'running',
'done_only',
'Legacy flow',
'legacy_step',
10,
15
);
`);
db.close();
resetTaskFlowRegistryForTests({ persist: false });
const mirrored = createTaskFlowForTask({
task: {
ownerKey: "agent:main:main",
taskId: "task-mirrored",
notifyPolicy: "silent",
status: "queued",
label: "Context engine turn maintenance",
task: "Deferred context-engine maintenance after turn.",
createdAt: 20,
},
});
expect(mirrored.syncMode).toBe("task_mirrored");
expect(mirrored.ownerKey).toBe("agent:main:main");
expect(mirrored.controllerId).toBeUndefined();
const legacy = getTaskFlowById("legacy-flow");
expect(legacy?.ownerKey).toBe("agent:main:legacy");
expect(legacy?.controllerId).toBe("core/legacy-restored");
const migratedDb = new DatabaseSync(sqlitePath);
const columns = migratedDb.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{
name?: string;
}>;
migratedDb.close();
expect(columns.map((column) => column.name)).not.toContain("owner_session_key");
});
});
it("drops malformed requester origin json from sqlite flow state", async () => {
await withFlowRegistryTempDir(async (root) => {
process.env.OPENCLAW_STATE_DIR = root;