diff --git a/src/agents/pi-embedded-runner/compact-reasons.test.ts b/src/agents/pi-embedded-runner/compact-reasons.test.ts index 6018b371a0d..333203bef9f 100644 --- a/src/agents/pi-embedded-runner/compact-reasons.test.ts +++ b/src/agents/pi-embedded-runner/compact-reasons.test.ts @@ -34,6 +34,16 @@ describe("classifyCompactionReason", () => { ); }); + it('classifies "already under target" as below threshold', () => { + expect(classifyCompactionReason("already under target")).toBe("below_threshold"); + }); + + it("classifies deferred background maintenance as a skip-like reason", () => { + expect(classifyCompactionReason("deferred to background context-engine maintenance")).toBe( + "deferred_background", + ); + }); + it("classifies safeguard messages as guard-blocked", () => { expect( classifyCompactionReason( diff --git a/src/agents/pi-embedded-runner/compact-reasons.ts b/src/agents/pi-embedded-runner/compact-reasons.ts index 5e98181435e..2ff9ecc0dff 100644 --- a/src/agents/pi-embedded-runner/compact-reasons.ts +++ b/src/agents/pi-embedded-runner/compact-reasons.ts @@ -3,6 +3,9 @@ import { sanitizeForLog } from "../../terminal/ansi.js"; const MAX_COMPACTION_REASON_DETAIL_CHARS = 100; +export const DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON = + "deferred to background context-engine maintenance"; + function isGenericCompactionCancelledReason(reason: string): boolean { const normalized = normalizeLowercaseStringOrEmpty(reason); return normalized === "compaction cancelled" || normalized === "error: compaction cancelled"; @@ -26,12 +29,17 @@ export function classifyCompactionReason(reason?: string): string { if (text.includes("nothing to compact")) { return "no_compactable_entries"; } - if (text.includes("below threshold")) { + // Backends use both phrases for the same harmless state: the transcript is + // already small enough, so preflight compaction should skip instead of fail. + if (text.includes("below threshold") || text.includes("already under target")) { return "below_threshold"; } if (text.includes("already compacted")) { return "already_compacted_recently"; } + if (text.includes("deferred to background")) { + return "deferred_background"; + } if (text.includes("still exceeds target")) { return "live_context_still_exceeds_target"; } diff --git a/src/agents/pi-embedded-runner/compact.hooks.harness.ts b/src/agents/pi-embedded-runner/compact.hooks.harness.ts index 249a3606820..b10c1dab859 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.harness.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.harness.ts @@ -97,6 +97,13 @@ function createDefaultSessionMessages(): unknown[] { export const sessionMessages: unknown[] = createDefaultSessionMessages(); export const sessionAbortCompactionMock: Mock<(reason?: unknown) => void> = vi.fn(); export const createOpenClawCodingToolsMock = vi.fn(() => []); +export const guardSessionManagerMock = vi.fn(() => ({ + flushPendingToolResults: vi.fn(), +})); +export const applyPiCompactionSettingsFromConfigMock = vi.fn(); +export const createPreparedEmbeddedPiSettingsManagerMock = vi.fn(() => ({ + getGlobalSettings: vi.fn(() => ({})), +})); export const listRegisteredPluginAgentPromptGuidanceMock = vi.fn((params?: { surface?: string }) => params?.surface === "subagent" ? ["Subagent compact command guidance."] @@ -120,6 +127,7 @@ export const rotateTranscriptAfterCompactionMock: Mock< > = vi.fn(async () => ({ rotated: false, })); +export const enqueueCommandInLaneMock = vi.fn((_lane: unknown, task: () => unknown) => task()); function createCompactHooksRuntimePlan(params: BuildAgentRuntimePlanParams): AgentRuntimePlan { const modelApi = params.modelApi ?? params.model?.api ?? undefined; @@ -272,6 +280,8 @@ export function resetCompactSessionStateMocks(): void { maybeCompactAgentHarnessSessionMock.mockResolvedValue(undefined); rotateTranscriptAfterCompactionMock.mockReset(); rotateTranscriptAfterCompactionMock.mockResolvedValue({ rotated: false }); + enqueueCommandInLaneMock.mockReset(); + enqueueCommandInLaneMock.mockImplementation((_lane: unknown, task: () => unknown) => task()); listRegisteredPluginAgentPromptGuidanceMock.mockReset(); listRegisteredPluginAgentPromptGuidanceMock.mockImplementation((params?: { surface?: string }) => params?.surface === "subagent" @@ -332,6 +342,15 @@ export function resetCompactHooksHarnessMocks(): void { resetCompactSessionStateMocks(); createOpenClawCodingToolsMock.mockReset(); createOpenClawCodingToolsMock.mockReturnValue([]); + guardSessionManagerMock.mockReset(); + guardSessionManagerMock.mockReturnValue({ + flushPendingToolResults: vi.fn(), + }); + applyPiCompactionSettingsFromConfigMock.mockReset(); + createPreparedEmbeddedPiSettingsManagerMock.mockReset(); + createPreparedEmbeddedPiSettingsManagerMock.mockReturnValue({ + getGlobalSettings: vi.fn(() => ({})), + }); } export async function loadCompactHooksHarness(): Promise<{ @@ -470,14 +489,12 @@ export async function loadCompactHooksHarness(): Promise<{ })); vi.doMock("../session-tool-result-guard-wrapper.js", () => ({ - guardSessionManager: vi.fn(() => ({ - flushPendingToolResults: vi.fn(), - })), + guardSessionManager: guardSessionManagerMock, })); vi.doMock("../pi-settings.js", () => ({ applyPiAutoCompactionGuard: vi.fn(() => ({ supported: true, disabled: false })), - applyPiCompactionSettingsFromConfig: vi.fn(), + applyPiCompactionSettingsFromConfig: applyPiCompactionSettingsFromConfigMock, ensurePiCompactionReserveTokens: vi.fn(), isSilentOverflowProneModel: vi.fn(() => false), resolveCompactionReserveTokensFloor: vi.fn(() => 0), @@ -532,7 +549,7 @@ export async function loadCompactHooksHarness(): Promise<{ })); vi.doMock("../../process/command-queue.js", () => ({ - enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()), + enqueueCommandInLane: enqueueCommandInLaneMock, clearCommandLane: vi.fn(() => 0), })); @@ -793,9 +810,7 @@ export async function loadCompactHooksHarness(): Promise<{ })); vi.doMock("../pi-project-settings.js", () => ({ - createPreparedEmbeddedPiSettingsManager: vi.fn(() => ({ - getGlobalSettings: vi.fn(() => ({})), - })), + createPreparedEmbeddedPiSettingsManager: createPreparedEmbeddedPiSettingsManagerMock, })); vi.doMock("./sandbox-info.js", () => ({ diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 9f7e3eba4db..704d6a3441e 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -2,12 +2,16 @@ import type { AgentMessage } from "@earendil-works/pi-agent-core"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { applyExtraParamsToAgentMock, + applyPiCompactionSettingsFromConfigMock, buildEmbeddedSystemPromptMock, contextEngineCompactMock, + createPreparedEmbeddedPiSettingsManagerMock, createOpenClawCodingToolsMock, + enqueueCommandInLaneMock, ensureRuntimePluginsLoaded, estimateTokensMock, getMemorySearchManagerMock, + guardSessionManagerMock, hookRunner, listRegisteredPluginAgentPromptGuidanceMock, loadCompactHooksHarness, @@ -392,6 +396,15 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { expectRecordFields(mockCallArg(createOpenClawCodingToolsMock), { modelContextWindowTokens: 64_000, }); + expectRecordFields(mockCallArg(guardSessionManagerMock, 0, 1), { + contextWindowTokens: 64_000, + }); + expectRecordFields(mockCallArg(createPreparedEmbeddedPiSettingsManagerMock), { + contextTokenBudget: 64_000, + }); + expectRecordFields(mockCallArg(applyPiCompactionSettingsFromConfigMock), { + contextTokenBudget: 64_000, + }); }); it("clamps the caller context token budget to the compaction model", async () => { @@ -1542,6 +1555,39 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { }); }); + it("fails deferred budget compaction when background maintenance is not scheduled", async () => { + const dispose = vi.fn(async () => {}); + const maintain = vi.fn(async () => ({ + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + })); + resolveContextEngineMock.mockResolvedValue({ + info: { ownsCompaction: true, turnMaintenanceMode: "background" }, + compact: contextEngineCompactMock, + dispose, + maintain, + } as never); + enqueueCommandInLaneMock.mockImplementationOnce(() => { + throw new Error("scheduler offline"); + }); + + const result = await compactEmbeddedPiSession( + wrappedCompactionArgs({ + trigger: "budget", + deferOwningContextEngineCompaction: true, + }), + ); + + expect(result.ok).toBe(false); + expect(result.compacted).toBe(false); + expect(result.reason).toBe("failed to schedule background context-engine maintenance"); + expect(result.failure?.reason).toBe("deferred_compaction_not_scheduled"); + expect(dispose).toHaveBeenCalledTimes(1); + expect(maintain).not.toHaveBeenCalled(); + expect(contextEngineCompactMock).not.toHaveBeenCalled(); + }); + it("does not fall back to context-engine compaction for Codex native binding failures", async () => { maybeCompactAgentHarnessSessionMock.mockResolvedValueOnce({ ok: false, diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index 77ff6895eb6..365605fd632 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -3,7 +3,7 @@ import { resolveContextEngine, resolveContextEngineOwnerPluginId, } from "../../context-engine/registry.js"; -import type { ContextEngineRuntimeContext } from "../../context-engine/types.js"; +import type { ContextEngine, ContextEngineRuntimeContext } from "../../context-engine/types.js"; import { captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, @@ -26,6 +26,7 @@ import { } from "../harness/selection.js"; import { resolveContextConfigProviderForRuntime } from "../openai-codex-routing.js"; import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; +import { DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON } from "./compact-reasons.js"; import type { CompactEmbeddedPiSessionParams } from "./compact.types.js"; import { asCompactionHookRunner, runPostCompactionSideEffects } from "./compaction-hooks.js"; import { @@ -64,6 +65,88 @@ function shouldFallbackAfterHarnessCompaction( ); } +const DEFERRED_CONTEXT_ENGINE_COMPACTION_SCHEDULE_FAILURE_REASON = + "failed to schedule background context-engine maintenance"; + +function shouldDeferOwningContextEngineBudgetCompaction(params: { + compactParams: CompactEmbeddedPiSessionParams; + contextEngine: ContextEngine; +}): boolean { + // Request-time budget compaction for context-engine-owned transcripts can + // spend the whole reply preflight budget. Only defer engines that explicitly + // advertise background turn maintenance, leaving native/current-session + // harness compaction synchronous. + return ( + params.compactParams.deferOwningContextEngineCompaction === true && + params.compactParams.trigger === "budget" && + params.contextEngine.info.ownsCompaction === true && + params.contextEngine.info.turnMaintenanceMode === "background" && + typeof params.contextEngine.maintain === "function" + ); +} + +async function disposeContextEngine(contextEngine: ContextEngine): Promise { + try { + await contextEngine.dispose?.(); + } catch (err) { + log.warn("context engine dispose failed after deferred maintenance", { + errorMessage: formatErrorMessage(err), + }); + } +} + +async function deferOwningContextEngineBudgetCompaction(params: { + compactParams: CompactEmbeddedPiSessionParams; + contextEngine: ContextEngine; + contextEngineRuntimeContext: ContextEngineRuntimeContext; +}): Promise { + let deferredScheduled = false; + try { + await runContextEngineMaintenance({ + contextEngine: params.contextEngine, + sessionId: params.compactParams.sessionId, + sessionKey: params.compactParams.sessionKey, + sessionFile: params.compactParams.sessionFile, + reason: "turn", + runtimeContext: params.contextEngineRuntimeContext, + config: params.compactParams.config, + disposeDeferredContextEngineAfterMaintenance: true, + onDeferredMaintenance: () => { + deferredScheduled = true; + }, + }); + } catch (err) { + log.warn("failed to defer context-engine budget compaction", { + errorMessage: formatErrorMessage(err), + }); + } + + if (!deferredScheduled) { + await disposeContextEngine(params.contextEngine); + log.warn( + `[compaction] failed to schedule context-engine-owned budget compaction background maintenance ` + + `(sessionKey=${params.compactParams.sessionKey ?? params.compactParams.sessionId})`, + ); + return { + ok: false, + compacted: false, + reason: DEFERRED_CONTEXT_ENGINE_COMPACTION_SCHEDULE_FAILURE_REASON, + failure: { reason: "deferred_compaction_not_scheduled" }, + }; + } + + log.info( + `[compaction] deferred context-engine-owned budget compaction to background maintenance ` + + `(sessionKey=${params.compactParams.sessionKey ?? params.compactParams.sessionId} ` + + `scheduled=${String(deferredScheduled)})`, + ); + return { + ok: true, + compacted: false, + reason: DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON, + }; +} + /** * Compacts a session with lane queueing (session lane + global lane). * Use this from outside a lane context. If already inside a lane, use @@ -165,6 +248,18 @@ export async function compactEmbeddedPiSession( `native harness compaction could not use its session binding; falling back to context engine: ${harnessResult.reason ?? "unknown"}`, ); } + if ( + shouldDeferOwningContextEngineBudgetCompaction({ + compactParams: params, + contextEngine, + }) + ) { + return await deferOwningContextEngineBudgetCompaction({ + compactParams: params, + contextEngine, + contextEngineRuntimeContext, + }); + } const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); const enqueueGlobal = diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 8e41a7d480a..2e630d01656 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -499,6 +499,8 @@ async function compactEmbeddedPiSessionDirectOnce( defaultProvider: DEFAULT_PROVIDER, defaultModel: DEFAULT_MODEL, }); + // Keep the configured provider for context-window policy, while auth/model loading below can + // route OpenAI compaction through Codex OAuth when that runtime owns the session credentials. const modelConfigProvider = resolvedCompactionTarget.provider ?? DEFAULT_PROVIDER; const modelId = resolvedCompactionTarget.model ?? DEFAULT_MODEL; const authProfileId = resolvedCompactionTarget.authProfileId; @@ -727,7 +729,7 @@ async function compactEmbeddedPiSessionDirectOnce( model: effectiveModel, modelApi: effectiveModel.api, harnessId: params.agentHarnessId, - harnessRuntime: params.agentHarnessId, + harnessRuntime: selectedHarnessRuntime, authProfileProvider: authProfileId?.split(":", 1)[0], sessionAuthProfileId: authProfileId, config: params.config, diff --git a/src/agents/pi-embedded-runner/compact.types.ts b/src/agents/pi-embedded-runner/compact.types.ts index b81091b3983..add0e8fa7f7 100644 --- a/src/agents/pi-embedded-runner/compact.types.ts +++ b/src/agents/pi-embedded-runner/compact.types.ts @@ -62,6 +62,11 @@ export type CompactEmbeddedPiSessionParams = { tokenBudget?: number; force?: boolean; trigger?: "budget" | "overflow" | "manual"; + /** + * Preflight callers can allow native/current-session harness compaction but + * move plugin-owned budget compaction onto background turn maintenance. + */ + deferOwningContextEngineCompaction?: boolean; diagId?: string; attempt?: number; maxAttempts?: number; diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index 4057e44ed13..1a1476e73ad 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -340,6 +340,7 @@ describe("createDeferredTurnMaintenanceAbortSignal", () => { describe("runContextEngineMaintenance", () => { beforeEach(async () => { + vi.useRealTimers(); rewriteTranscriptEntriesInSessionManagerMock.mockClear(); rewriteTranscriptEntriesInSessionFileMock.mockClear(); await loadFreshContextEngineMaintenanceModuleForTest(); @@ -844,6 +845,118 @@ describe("runContextEngineMaintenance", () => { }); }); + it("disposes owned deferred engines only after their maintenance run finishes", async () => { + await withStateDirEnv("openclaw-turn-maintenance-dispose-", async () => { + resetCommandQueueStateForTest(); + resetTaskRegistryForTests({ persist: false }); + resetTaskFlowRegistryForTests({ persist: false }); + const waitForRealAssertion = async (assertion: () => void): Promise => { + const startedAt = Date.now(); + for (;;) { + try { + assertion(); + return; + } catch (error) { + if (Date.now() - startedAt >= 2_000) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, 5)); + } + } + }; + + const sessionKey = "agent:main:session-owned-dispose"; + const events: string[] = []; + let releaseFirstMaintenance: (() => void) | undefined; + let releaseSecondMaintenance: (() => void) | undefined; + + const createBackgroundEngine = (id: "first" | "second") => + ({ + info: { + id, + name: "Test Engine", + turnMaintenanceMode: "background" as const, + }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }: { messages: unknown[] }) => ({ + messages, + estimatedTokens: 0, + }), + compact: async () => ({ ok: true, compacted: false }), + maintain: vi.fn(async () => { + events.push(`maintain:${id}`); + await new Promise((resolve) => { + if (id === "first") { + releaseFirstMaintenance = resolve; + } else { + releaseSecondMaintenance = resolve; + } + }); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }), + dispose: vi.fn(async () => { + events.push(`dispose:${id}`); + }), + }) as NonNullable[0]["contextEngine"]>; + + const firstEngine = createBackgroundEngine("first"); + const secondEngine = createBackgroundEngine("second"); + const deferredPromises: Promise[] = []; + + await runContextEngineMaintenance({ + contextEngine: firstEngine, + sessionId: "session-owned-dispose", + sessionKey, + sessionFile: "/tmp/session-owned-dispose.jsonl", + reason: "turn", + disposeDeferredContextEngineAfterMaintenance: true, + onDeferredMaintenance: (promise) => { + deferredPromises.push(promise); + }, + }); + + await waitForRealAssertion(() => expect(events).toContain("maintain:first")); + + await runContextEngineMaintenance({ + contextEngine: secondEngine, + sessionId: "session-owned-dispose", + sessionKey, + sessionFile: "/tmp/session-owned-dispose.jsonl", + reason: "turn", + disposeDeferredContextEngineAfterMaintenance: true, + onDeferredMaintenance: (promise) => { + deferredPromises.push(promise); + }, + }); + + if (!releaseFirstMaintenance) { + throw new Error("Expected first maintenance release callback to be initialized"); + } + releaseFirstMaintenance(); + await waitForRealAssertion(() => expect(events).toContain("maintain:second")); + expect(secondEngine.dispose).not.toHaveBeenCalled(); + + if (!releaseSecondMaintenance) { + throw new Error("Expected second maintenance release callback to be initialized"); + } + releaseSecondMaintenance(); + await deferredPromises[1]; + + expect(firstEngine.dispose).toHaveBeenCalledTimes(1); + expect(secondEngine.dispose).toHaveBeenCalledTimes(1); + expect(events).toEqual([ + "maintain:first", + "dispose:first", + "maintain:second", + "dispose:second", + ]); + }); + }); + it("replaces legacy active maintenance tasks that are missing a runId", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index a078f8d5aff..0427e9e90aa 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -50,6 +50,7 @@ type DeferredTurnMaintenanceScheduleParams = { runtimeContext?: ContextEngineRuntimeContext; agentId?: string; config?: OpenClawConfig; + disposeContextEngineAfterMaintenance?: boolean; }; type DeferredTurnMaintenanceRunState = { @@ -111,6 +112,18 @@ function resolveDeferredTurnMaintenanceLane(sessionKey: string): string { return `${TURN_MAINTENANCE_LANE_PREFIX}${sessionKey}`; } +async function disposeDeferredMaintenanceContextEngine( + contextEngine: ContextEngine, +): Promise { + try { + await contextEngine.dispose?.(); + } catch (err) { + log.warn("context engine dispose failed after deferred maintenance", { + errorMessage: formatErrorMessage(err), + }); + } +} + export function createDeferredTurnMaintenanceAbortSignal(params?: { processLike?: DeferredTurnMaintenanceProcessLike; }): { @@ -386,6 +399,7 @@ async function runDeferredTurnMaintenanceWorker(params: { agentId?: string; runId: string; config?: OpenClawConfig; + disposeContextEngineAfterMaintenance?: boolean; }): Promise { let surfacedUserNotice = false; let longRunningTimer: ReturnType | null = null; @@ -533,6 +547,9 @@ async function runDeferredTurnMaintenanceWorker(params: { log.warn(`deferred context engine maintenance failed: ${reason}`); } finally { shutdownAbort.dispose(); + if (params.disposeContextEngineAfterMaintenance) { + await disposeDeferredMaintenanceContextEngine(params.contextEngine); + } } } @@ -545,8 +562,15 @@ function scheduleDeferredTurnMaintenance( } const activeRun = activeDeferredTurnMaintenanceRuns.get(sessionKey); if (activeRun) { + const supersededParams = activeRun.rerunRequested ? activeRun.latestParams : undefined; activeRun.rerunRequested = true; activeRun.latestParams = { ...params, sessionKey }; + if ( + supersededParams?.disposeContextEngineAfterMaintenance && + supersededParams.contextEngine !== params.contextEngine + ) { + void disposeDeferredMaintenanceContextEngine(supersededParams.contextEngine); + } return activeRun.promise; } @@ -593,6 +617,7 @@ function scheduleDeferredTurnMaintenance( agentId: params.agentId, config: params.config, runId: task.runId!, + disposeContextEngineAfterMaintenance: params.disposeContextEngineAfterMaintenance, }), ); } catch (err) { @@ -622,9 +647,13 @@ function scheduleDeferredTurnMaintenance( const shutdownTriggered = schedulerAbort.abortSignal?.aborted === true; const rerunParams = current.rerunRequested && !shutdownTriggered ? current.latestParams : undefined; + const discardedRerunParams = + current.rerunRequested && shutdownTriggered ? current.latestParams : undefined; activeDeferredTurnMaintenanceRuns.delete(sessionKey); if (rerunParams) { await scheduleDeferredTurnMaintenance(rerunParams); + } else if (discardedRerunParams?.disposeContextEngineAfterMaintenance) { + await disposeDeferredMaintenanceContextEngine(discardedRerunParams.contextEngine); } }); state = { @@ -653,6 +682,7 @@ export async function runContextEngineMaintenance(params: { executionMode?: "foreground" | "background"; onDeferredMaintenance?: (promise: Promise) => void; config?: OpenClawConfig; + disposeDeferredContextEngineAfterMaintenance?: boolean; }): Promise { if (typeof params.contextEngine?.maintain !== "function") { return undefined; @@ -675,6 +705,7 @@ export async function runContextEngineMaintenance(params: { runtimeContext: params.runtimeContext, agentId: params.agentId, config: params.config, + disposeContextEngineAfterMaintenance: params.disposeDeferredContextEngineAfterMaintenance, }); if (deferred) { params.onDeferredMaintenance?.(deferred); diff --git a/src/auto-reply/reply/agent-runner-memory.test.ts b/src/auto-reply/reply/agent-runner-memory.test.ts index 83fea7f82e3..3e43d673971 100644 --- a/src/auto-reply/reply/agent-runner-memory.test.ts +++ b/src/auto-reply/reply/agent-runner-memory.test.ts @@ -673,6 +673,114 @@ describe("runMemoryFlushIfNeeded", () => { expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled(); }); + it("continues when preflight compaction reports the session is already under target", async () => { + const sessionFile = path.join(rootDir, "session.jsonl"); + await fs.writeFile( + sessionFile, + `${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`, + "utf8", + ); + registerMemoryFlushPlanResolverForTest(() => ({ + softThresholdTokens: 1, + forceFlushTranscriptBytes: 1_000_000_000, + reserveTokensFloor: 0, + prompt: "Pre-compaction memory flush.\nNO_REPLY", + systemPrompt: "Write memory to memory/YYYY-MM-DD.md.", + relativePath: "memory/2023-11-14.md", + })); + compactEmbeddedPiSessionMock.mockResolvedValueOnce({ + ok: true, + compacted: false, + reason: "already under target", + }); + const sessionEntry: SessionEntry = { + sessionId: "session", + sessionFile, + updatedAt: Date.now(), + totalTokens: 120, + totalTokensFresh: true, + }; + + const entry = await runPreflightCompactionIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun({ + sessionId: "session", + sessionFile, + sessionKey: "agent:main:main", + }), + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100, + sessionEntry, + sessionStore: { "agent:main:main": sessionEntry }, + sessionKey: "agent:main:main", + storePath: path.join(rootDir, "sessions.json"), + isHeartbeat: false, + replyOperation: createReplyOperation(), + }); + + expect(entry).toBe(sessionEntry); + expect(compactEmbeddedPiSessionMock).toHaveBeenCalledTimes(1); + expect(requireCompactEmbeddedPiSessionCall()).toMatchObject({ + trigger: "budget", + deferOwningContextEngineCompaction: false, + contextTokenBudget: 100, + }); + expect(incrementCompactionCountMock).not.toHaveBeenCalled(); + }); + + it("fails when required preflight context-engine compaction is deferred to background maintenance", async () => { + const sessionFile = path.join(rootDir, "session.jsonl"); + await fs.writeFile( + sessionFile, + `${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`, + "utf8", + ); + registerMemoryFlushPlanResolverForTest(() => ({ + softThresholdTokens: 1, + forceFlushTranscriptBytes: 1_000_000_000, + reserveTokensFloor: 0, + prompt: "Pre-compaction memory flush.\nNO_REPLY", + systemPrompt: "Write memory to memory/YYYY-MM-DD.md.", + relativePath: "memory/2023-11-14.md", + })); + compactEmbeddedPiSessionMock.mockResolvedValueOnce({ + ok: true, + compacted: false, + reason: "deferred to background context-engine maintenance", + }); + const sessionEntry: SessionEntry = { + sessionId: "session", + sessionFile, + updatedAt: Date.now(), + totalTokens: 120, + totalTokensFresh: true, + }; + + await expect( + runPreflightCompactionIfNeeded({ + cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, + followupRun: createTestFollowupRun({ + sessionId: "session", + sessionFile, + sessionKey: "agent:main:main", + }), + defaultModel: "anthropic/claude-opus-4-6", + agentCfgContextTokens: 100, + sessionEntry, + sessionStore: { "agent:main:main": sessionEntry }, + sessionKey: "agent:main:main", + storePath: path.join(rootDir, "sessions.json"), + isHeartbeat: false, + replyOperation: createReplyOperation(), + }), + ).rejects.toThrow( + "Preflight compaction required but failed: deferred to background context-engine maintenance", + ); + + expect(compactEmbeddedPiSessionMock).toHaveBeenCalledTimes(1); + expect(incrementCompactionCountMock).not.toHaveBeenCalled(); + }); + it("passes runtime policy session key to preflight compaction sandbox resolution", async () => { const sessionFile = path.join(rootDir, "session.jsonl"); await fs.writeFile( @@ -692,7 +800,8 @@ describe("runMemoryFlushIfNeeded", () => { sessionId: "session", sessionFile, updatedAt: Date.now(), - totalTokensFresh: false, + totalTokens: 120, + totalTokensFresh: true, }; await runPreflightCompactionIfNeeded({ @@ -750,7 +859,8 @@ describe("runMemoryFlushIfNeeded", () => { sessionId: "session", sessionFile, updatedAt: Date.now(), - totalTokensFresh: false, + totalTokens: 120, + totalTokensFresh: true, }; const sessionStore = { "agent:main:telegram:group:redacted": sessionEntry }; @@ -802,7 +912,8 @@ describe("runMemoryFlushIfNeeded", () => { sessionId: "session", sessionFile, updatedAt: Date.now(), - totalTokensFresh: false, + totalTokens: 120, + totalTokensFresh: true, }; const sessionStore = { "agent:main:telegram:group:redacted": sessionEntry }; @@ -947,7 +1058,7 @@ describe("runMemoryFlushIfNeeded", () => { compactEmbeddedPiSessionMock.mockResolvedValueOnce({ ok: true, compacted: false, - reason: "already under target", + reason: "plugin already stored this turn", }); const sessionEntry: SessionEntry = { sessionId: "session", diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index 162800e3701..32c6a4d91f6 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -10,6 +10,10 @@ import { runWithModelFallback } from "../../agents/model-fallback.js"; import { listLegacyRuntimeModelProviderAliases } from "../../agents/model-runtime-aliases.js"; import { isCliProvider } from "../../agents/model-selection.js"; import { resolveContextConfigProviderForRuntime } from "../../agents/openai-codex-routing.js"; +import { + classifyCompactionReason, + DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON, +} from "../../agents/pi-embedded-runner/compact-reasons.js"; import { resolveSandboxConfigForAgent, resolveSandboxRuntimeStatus } from "../../agents/sandbox.js"; import { derivePromptTokens, @@ -176,6 +180,22 @@ function resolveEffectivePromptTokens( return base + output + estimate; } +function isPreflightCompactionSkipReason(reason?: string): boolean { + const classification = classifyCompactionReason(reason); + // Preflight compaction is a guardrail, not a hard dependency. These classes + // mean the context engine found nothing useful to compact, so the reply should + // continue instead of surfacing a generic user-facing failure. + return ( + classification === "below_threshold" || + classification === "no_compactable_entries" || + classification === "already_compacted_recently" + ); +} + +function isDeferredPreflightCompactionReason(reason?: string): boolean { + return normalizeOptionalString(reason) === DEFERRED_CONTEXT_ENGINE_COMPACTION_REASON; +} + function resolveMemoryFlushModelFallbackOptions( run: FollowupRun["run"], model?: string, @@ -624,6 +644,11 @@ export async function runPreflightCompactionIfNeeded(params: { isHeartbeat: boolean; replyOperation: ReplyOperation; }): Promise { + const deps = { + compactEmbeddedPiSession: memoryDeps.compactEmbeddedPiSession, + incrementCompactionCount: memoryDeps.incrementCompactionCount, + refreshQueuedFollowupSession: memoryDeps.refreshQueuedFollowupSession, + }; if (!params.sessionKey) { return params.sessionEntry; } @@ -788,7 +813,7 @@ export async function runPreflightCompactionIfNeeded(params: { params.sessionKey ?? params.followupRun.run.sessionKey, { storePath: params.storePath }, ); - const result = await memoryDeps.compactEmbeddedPiSession({ + const result = await deps.compactEmbeddedPiSession({ sessionId: entry.sessionId, sessionKey: params.sessionKey, sandboxSessionKey: params.runtimePolicySessionKey, @@ -813,14 +838,19 @@ export async function runPreflightCompactionIfNeeded(params: { thinkLevel: params.followupRun.run.thinkLevel, bashElevated: params.followupRun.run.bashElevated, trigger: "budget", - currentTokenCount: tokenCountForCompaction ?? freshPersistedTokens, + deferOwningContextEngineCompaction: false, contextTokenBudget: contextWindowTokens, + currentTokenCount: tokenCountForCompaction ?? freshPersistedTokens, ownerNumbers: params.followupRun.run.ownerNumbers, abortSignal: params.replyOperation.abortSignal, }); if (!result?.ok) { const reason = result?.reason ?? "not_compacted"; + if (isPreflightCompactionSkipReason(reason)) { + logVerbose(`preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${reason}`); + return entry ?? params.sessionEntry; + } logVerbose(`preflightCompaction failed: sessionKey=${params.sessionKey} reason=${reason}`); if (isRecoverableNativeHarnessBindingFailure(result)) { logVerbose( @@ -832,12 +862,18 @@ export async function runPreflightCompactionIfNeeded(params: { } if (!result.compacted) { - const reason = result.reason ?? "not_compacted"; - logVerbose(`preflightCompaction no-op: sessionKey=${params.sessionKey} reason=${reason}`); + const reason = normalizeOptionalString(result.reason); + if (isDeferredPreflightCompactionReason(reason)) { + logVerbose(`preflightCompaction failed: sessionKey=${params.sessionKey} reason=${reason}`); + throw new Error(`Preflight compaction required but failed: ${reason}`); + } + logVerbose( + `preflightCompaction skipped: sessionKey=${params.sessionKey} reason=${reason ?? "not_compacted"}`, + ); return entry ?? params.sessionEntry; } - await incrementCompactionCount({ + await deps.incrementCompactionCount({ cfg: params.cfg, sessionEntry: entry, sessionStore: params.sessionStore, @@ -861,7 +897,7 @@ export async function runPreflightCompactionIfNeeded(params: { } const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey; if (queueKey) { - memoryDeps.refreshQueuedFollowupSession({ + deps.refreshQueuedFollowupSession({ key: queueKey, previousSessionId, nextSessionId: entry.sessionId, diff --git a/src/auto-reply/reply/commands-compact.test.ts b/src/auto-reply/reply/commands-compact.test.ts index ce6d5eefcf3..faaa00ff1be 100644 --- a/src/auto-reply/reply/commands-compact.test.ts +++ b/src/auto-reply/reply/commands-compact.test.ts @@ -190,6 +190,33 @@ describe("handleCompactCommand", () => { expect(call.agentDir).toBe("/tmp/openclaw-agent-compact"); }); + it("treats already-under-target manual compaction as skipped", async () => { + vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({ + ok: false, + compacted: false, + reason: "already under target", + }); + + const result = await handleCompactCommand( + { + ...buildCompactParams("/compact", { + commands: { text: true }, + channels: { whatsapp: { allowFrom: ["*"] } }, + } as OpenClawConfig), + sessionEntry: { + sessionId: "session-1", + updatedAt: Date.now(), + }, + } as HandleCommandsParams, + true, + ); + + expect(result?.reply?.text).toBe( + "⚙️ Compaction skipped: context is already under the compaction target • Context 12.1k", + ); + expect(vi.mocked(incrementCompactionCount)).not.toHaveBeenCalled(); + }); + it("uses the canonical session agent when resolving the compaction session file", async () => { vi.mocked(compactEmbeddedPiSession).mockResolvedValueOnce({ ok: true, diff --git a/src/auto-reply/reply/commands-compact.ts b/src/auto-reply/reply/commands-compact.ts index 56ae15e95d9..b26a913ea01 100644 --- a/src/auto-reply/reply/commands-compact.ts +++ b/src/auto-reply/reply/commands-compact.ts @@ -53,9 +53,12 @@ function extractCompactInstructions(params: { function isCompactionSkipReason(reason?: string): boolean { const text = normalizeOptionalLowercaseString(reason) ?? ""; + // Manual /compact mirrors preflight semantics: already-small sessions are a + // successful no-op, not a failed compaction. return ( text.includes("nothing to compact") || text.includes("below threshold") || + text.includes("already under target") || text.includes("already compacted") || text.includes("no real conversation messages") ); @@ -74,6 +77,9 @@ function formatCompactionReason(reason?: string): string | undefined { if (lower.includes("below threshold")) { return "context is below the compaction threshold"; } + if (lower.includes("already under target")) { + return "context is already under the compaction target"; + } if (lower.includes("already compacted")) { return "session was already compacted recently"; } diff --git a/src/tasks/task-flow-registry.store.sqlite.ts b/src/tasks/task-flow-registry.store.sqlite.ts index bb13022393b..1e31bbc281c 100644 --- a/src/tasks/task-flow-registry.store.sqlite.ts +++ b/src/tasks/task-flow-registry.store.sqlite.ts @@ -52,6 +52,27 @@ let cachedDatabase: FlowRegistryDatabase | null = null; const FLOW_REGISTRY_DIR_MODE = 0o700; const FLOW_REGISTRY_FILE_MODE = 0o600; const FLOW_REGISTRY_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; +const FLOW_RUNS_COLUMNS = ` + flow_id TEXT PRIMARY KEY, + shape TEXT, + sync_mode TEXT NOT NULL DEFAULT 'managed', + owner_key TEXT NOT NULL, + requester_origin_json TEXT, + controller_id TEXT, + revision INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + goal TEXT NOT NULL, + current_step TEXT, + blocked_task_id TEXT, + blocked_summary TEXT, + state_json TEXT, + wait_json TEXT, + cancel_requested_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + ended_at INTEGER +`; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { @@ -245,28 +266,70 @@ function hasFlowRunsColumn(db: DatabaseSync, columnName: string): boolean { return rows.some((row) => row.name === columnName); } +function rebuildLegacyFlowRunsTable(db: DatabaseSync) { + // Older live registries can retain owner_session_key TEXT NOT NULL even after owner_key is + // added. Current inserts do not write owner_session_key, so SQLite rejects mirrored flow rows + // until the table is rebuilt into the canonical schema. + db.exec(` + DROP TABLE IF EXISTS flow_runs_canonical_migration; + CREATE TABLE flow_runs_canonical_migration ( + ${FLOW_RUNS_COLUMNS} + ); + INSERT INTO flow_runs_canonical_migration ( + flow_id, + sync_mode, + owner_key, + requester_origin_json, + controller_id, + revision, + status, + notify_policy, + goal, + current_step, + blocked_task_id, + blocked_summary, + state_json, + wait_json, + cancel_requested_at, + created_at, + updated_at, + ended_at + ) + SELECT + flow_id, + CASE + WHEN sync_mode = 'task_mirrored' THEN 'task_mirrored' + ELSE 'managed' + END, + COALESCE(NULLIF(trim(owner_key), ''), owner_session_key), + requester_origin_json, + CASE + WHEN sync_mode = 'task_mirrored' THEN NULL + ELSE COALESCE(NULLIF(trim(controller_id), ''), 'core/legacy-restored') + END, + COALESCE(revision, 0), + status, + notify_policy, + goal, + current_step, + blocked_task_id, + blocked_summary, + state_json, + wait_json, + cancel_requested_at, + created_at, + updated_at, + ended_at + FROM flow_runs; + DROP TABLE flow_runs; + ALTER TABLE flow_runs_canonical_migration RENAME TO flow_runs; + `); +} + function ensureSchema(db: DatabaseSync) { db.exec(` CREATE TABLE IF NOT EXISTS flow_runs ( - flow_id TEXT PRIMARY KEY, - shape TEXT, - sync_mode TEXT NOT NULL DEFAULT 'managed', - owner_key TEXT NOT NULL, - requester_origin_json TEXT, - controller_id TEXT, - revision INTEGER NOT NULL DEFAULT 0, - status TEXT NOT NULL, - notify_policy TEXT NOT NULL, - goal TEXT NOT NULL, - current_step TEXT, - blocked_task_id TEXT, - blocked_summary TEXT, - state_json TEXT, - wait_json TEXT, - cancel_requested_at INTEGER, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - ended_at INTEGER + ${FLOW_RUNS_COLUMNS} ); `); if (!hasFlowRunsColumn(db, "owner_key") && hasFlowRunsColumn(db, "owner_session_key")) { @@ -331,6 +394,35 @@ function ensureSchema(db: DatabaseSync) { if (!hasFlowRunsColumn(db, "cancel_requested_at")) { db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`); } + if (hasFlowRunsColumn(db, "owner_session_key")) { + // Populate the canonical fields before rebuilding so existing rows survive the legacy-column + // drop, including pre-sync-mode single-task flows and older managed flows with no controller. + db.exec(` + UPDATE flow_runs + SET owner_key = owner_session_key + WHERE (owner_key IS NULL OR trim(owner_key) = '') + `); + db.exec(` + UPDATE flow_runs + SET sync_mode = CASE + WHEN shape = 'single_task' THEN 'task_mirrored' + ELSE 'managed' + END + WHERE sync_mode IS NULL OR trim(sync_mode) = '' + `); + db.exec(` + UPDATE flow_runs + SET revision = 0 + WHERE revision IS NULL + `); + db.exec(` + UPDATE flow_runs + SET controller_id = 'core/legacy-restored' + WHERE sync_mode = 'managed' + AND (controller_id IS NULL OR trim(controller_id) = '') + `); + rebuildLegacyFlowRunsTable(db); + } db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`); db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`); diff --git a/src/tasks/task-flow-registry.store.test.ts b/src/tasks/task-flow-registry.store.test.ts index dbe0d1a9d4b..f0acbe9aa0a 100644 --- a/src/tasks/task-flow-registry.store.test.ts +++ b/src/tasks/task-flow-registry.store.test.ts @@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { + createTaskFlowForTask, createManagedTaskFlow, getTaskFlowById, requestFlowCancel, @@ -193,6 +194,82 @@ describe("task-flow-registry store runtime", () => { }); }); + it("migrates legacy owner_session_key schema before mirrored flow inserts", async () => { + await withFlowRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskFlowRegistryForTests(); + + const sqlitePath = resolveTaskFlowRegistrySqlitePath(process.env); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(sqlitePath); + // This mirrors the live pre-migration table shape that kept owner_session_key as NOT NULL, + // which made current owner_key-only mirrored inserts fail with SQLITE_CONSTRAINT_NOTNULL. + db.exec(` + DROP TABLE IF EXISTS flow_runs; + CREATE TABLE flow_runs ( + flow_id TEXT PRIMARY KEY, + owner_session_key TEXT NOT NULL, + requester_origin_json TEXT, + status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + goal TEXT NOT NULL, + current_step TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + ended_at INTEGER + ); + INSERT INTO flow_runs ( + flow_id, + owner_session_key, + status, + notify_policy, + goal, + current_step, + created_at, + updated_at + ) VALUES ( + 'legacy-flow', + 'agent:main:legacy', + 'running', + 'done_only', + 'Legacy flow', + 'legacy_step', + 10, + 15 + ); + `); + db.close(); + resetTaskFlowRegistryForTests({ persist: false }); + + const mirrored = createTaskFlowForTask({ + task: { + ownerKey: "agent:main:main", + taskId: "task-mirrored", + notifyPolicy: "silent", + status: "queued", + label: "Context engine turn maintenance", + task: "Deferred context-engine maintenance after turn.", + createdAt: 20, + }, + }); + + expect(mirrored.syncMode).toBe("task_mirrored"); + expect(mirrored.ownerKey).toBe("agent:main:main"); + expect(mirrored.controllerId).toBeUndefined(); + + const legacy = getTaskFlowById("legacy-flow"); + expect(legacy?.ownerKey).toBe("agent:main:legacy"); + expect(legacy?.controllerId).toBe("core/legacy-restored"); + + const migratedDb = new DatabaseSync(sqlitePath); + const columns = migratedDb.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{ + name?: string; + }>; + migratedDb.close(); + expect(columns.map((column) => column.name)).not.toContain("owner_session_key"); + }); + }); + it("drops malformed requester origin json from sqlite flow state", async () => { await withFlowRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root;