diff --git a/CHANGELOG.md b/CHANGELOG.md index 078e6715015..e926bf36e60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -227,6 +227,7 @@ Docs: https://docs.openclaw.ai - Update/installers: override npm `min-release-age` quarantine for OpenClaw-managed package installs, so `openclaw update`, plugin updates, and hosted installer scripts can install the requested latest release immediately. - Agents/sessions: preserve fresh post-compaction token snapshots across stale usage updates, preventing repeated auto-compaction after every message. Fixes #82576. (#82578) Thanks @njuboy11. - Agents/replies: preserve active inbound reply context at the LLM boundary so Discord referenced-message turns do not answer from stale session history. Fixes #82608. (#82801) Thanks @joshavant. +- Agents/sessions: expose session transcript lock stale and max-hold tuning, and release the embedded run's coarse transcript lock before model I/O while locking persistence and cleanup separately. Fixes #13744. Thanks @amknight. - Agents/OpenAI Responses: log redacted diagnostics for detail-less `response.failed` events while preserving failed response ids, so operators can correlate provider-side failures. Fixes #82558. - Agents/OpenRouter: strip non-replayable Anthropic/xAI reasoning provenance tags from follow-up requests, preventing poisoned thinking signatures from breaking second turns. Fixes #82335. (#82380) Thanks @hclsys. - Providers/xAI: send configurable reasoning effort only for Grok 4.3, preserving xAI's default low reasoning while omitting unsupported controls for Grok 4.20 reasoning models. (#81227) Thanks @jason-allen-oneal. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 6a9a98c442f..df27a9d72cc 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -1a4ff6c148f4c28eb2c07c77025c6ba13ed9f56d23bbb221fc6dd83781fda671 config-baseline.json -a2663c4aed132ae968e8e6ef84566d22063143f8b093e839e1063393135842f5 config-baseline.core.json +4b52f0bff12148f4695150a45c91d4b9bda2d1bfbc1162a79a2bb2cf62c3c1eb config-baseline.json +73e11d9d5c5b27d8d075202f59b9f19537ded361ea761ed0aef78dc9446bc82f config-baseline.core.json fe4f1cb00d7d1dee9746779ec3cf14236e5f672c91502268a12ad6e467a2c4ad config-baseline.channel.json e9049ce0154f484f44bb0ac174a44198269256044da5ba62a6e107e78bfd7a70 config-baseline.plugin.json diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 0725f7f59d6..a51d0e8167b 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -97,7 +97,11 @@ OpenClaw no longer creates automatic `sessions.json.bak.*` rotation backups duri Transcript mutations use a session write lock on the transcript file. Lock acquisition waits up to `session.writeLock.acquireTimeoutMs` before surfacing a busy-session error; the default is `60000` ms. Raise this only when legitimate prep, cleanup, compaction, or transcript mirror work contends -longer on slow machines. Stale-lock detection and maximum hold warnings remain separate policies. +longer on slow machines. `session.writeLock.staleMs` controls when an existing lock can be +reclaimed as stale; the default is `1800000` ms. `session.writeLock.maxHoldMs` controls the +in-process watchdog release threshold; the default is `300000` ms. Emergency env overrides are +`OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS`, `OPENCLAW_SESSION_WRITE_LOCK_STALE_MS`, and +`OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS`. Enforcement order for disk budget cleanup (`mode: "enforce"`): diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index 92d00957544..8debeed262f 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -4,7 +4,7 @@ import { acquireSessionWriteLock, appendSessionTranscriptMessage, emitSessionTranscriptUpdate, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, runAgentHarnessBeforeMessageWriteHook, type AgentMessage, type EmbeddedRunAttemptParams, @@ -128,7 +128,7 @@ export async function mirrorCodexAppServerTranscript(params: { const lock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + ...resolveSessionWriteLockOptions(params.config), }); try { const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile); diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 8adc28a15f5..542f2a052b5 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -30,10 +30,7 @@ import { isCliProvider } from "../model-selection.js"; import { resolveOpenAIRuntimeProviderForPi } from "../openai-codex-routing.js"; import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; -import { - acquireSessionWriteLock, - resolveSessionWriteLockAcquireTimeoutMs, -} from "../session-write-lock.js"; +import { acquireSessionWriteLock, resolveSessionWriteLockOptions } from "../session-write-lock.js"; import { buildWorkspaceSkillSnapshot } from "../skills.js"; import { buildUsageWithNoCost } from "../stream-message-shared.js"; import { @@ -228,7 +225,7 @@ async function persistTextTurnTranscript( }); const lock = await acquireSessionWriteLock({ sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + ...resolveSessionWriteLockOptions(params.config), allowReentrant: true, }); try { diff --git a/src/agents/pi-embedded-runner/compact.hooks.harness.ts b/src/agents/pi-embedded-runner/compact.hooks.harness.ts index 765f44f0e75..6c7631928d0 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.harness.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.harness.ts @@ -461,6 +461,11 @@ export async function loadCompactHooksHarness(): Promise<{ acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })), resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 0), resolveSessionWriteLockAcquireTimeoutMs: vi.fn(() => 60_000), + resolveSessionWriteLockOptions: vi.fn(() => ({ + timeoutMs: 60_000, + staleMs: 1_800_000, + maxHoldMs: 300_000, + })), })); vi.doMock("../../context-engine/init.js", () => ({ diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 66eaf6d3ecb..83eeeee8da9 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -99,7 +99,7 @@ import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js"; import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } from "../session-write-lock.js"; import { detectRuntimeShell } from "../shell-utils.js"; import { @@ -956,9 +956,10 @@ async function compactEmbeddedPiSessionDirectOnce( const compactionTimeoutMs = resolveCompactionTimeoutMs(params.config); const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), - maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ - timeoutMs: compactionTimeoutMs, + ...resolveSessionWriteLockOptions(params.config, { + maxHoldMsFallback: resolveSessionLockMaxHoldFromTimeout({ + timeoutMs: compactionTimeoutMs, + }), }), }); try { diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index 62d848ed0bb..4057e44ed13 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -182,6 +182,50 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); }); + it("wraps active session manager rewrites in the supplied lock", async () => { + const events: string[] = []; + const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters< + typeof buildContextEngineMaintenanceRuntimeContext + >[0]["sessionManager"]; + rewriteTranscriptEntriesInSessionManagerMock.mockImplementationOnce((_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 77, + rewrittenEntries: 1, + }; + }); + const runtimeContext = buildContextEngineMaintenanceRuntimeContext({ + sessionId: "session-1", + sessionKey: "agent:main:session-1", + sessionFile: "/tmp/session.jsonl", + sessionManager, + withSessionManagerRewriteLock: async (operation) => { + events.push("lock-start"); + try { + return await operation(); + } finally { + events.push("lock-end"); + } + }, + }); + + await runtimeContext.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + + expect(events).toEqual(["lock-start", "rewrite", "lock-end"]); + expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({ + sessionManager, + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); + }); + it("defers file rewrites onto the session lane when requested", async () => { vi.useFakeTimers(); try { @@ -419,6 +463,69 @@ describe("runContextEngineMaintenance", () => { }); }); + it("locks foreground maintenance rewrites that use the active session manager", async () => { + const events: string[] = []; + const maintain = vi.fn(async (params?: unknown) => { + events.push("maintain-start"); + await ( + params as { runtimeContext?: ContextEngineRuntimeContext } | undefined + )?.runtimeContext?.rewriteTranscriptEntries?.({ + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + events.push("maintain-end"); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); + const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters< + typeof buildContextEngineMaintenanceRuntimeContext + >[0]["sessionManager"]; + rewriteTranscriptEntriesInSessionManagerMock.mockImplementationOnce((_params?: unknown) => { + events.push("rewrite"); + return { + changed: true, + bytesFreed: 77, + rewrittenEntries: 1, + }; + }); + + await runContextEngineMaintenance({ + contextEngine: { + info: { id: "test", name: "Test Engine" }, + ingest: async () => ({ ingested: true }), + assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }), + compact: async () => ({ ok: true, compacted: false }), + maintain, + }, + sessionId: "session-foreground-manager-rewrite", + sessionKey: "agent:main:session-foreground-manager-rewrite", + sessionFile: "/tmp/session-foreground-manager-rewrite.jsonl", + reason: "turn", + sessionManager, + withSessionManagerRewriteLock: async (operation) => { + events.push("lock-start"); + try { + return await operation(); + } finally { + events.push("lock-end"); + } + }, + }); + + expect(events).toEqual(["maintain-start", "lock-start", "rewrite", "lock-end", "maintain-end"]); + expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({ + sessionManager, + replacements: [ + { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, + ], + }); + expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled(); + }); + it("defers turn maintenance to a hidden background task when enabled", async () => { await withStateDirEnv("openclaw-turn-maintenance-", async () => { vi.useFakeTimers(); diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 72170f345db..a078f8d5aff 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -60,6 +60,8 @@ type DeferredTurnMaintenanceRunState = { const activeDeferredTurnMaintenanceRuns = new Map(); +type SessionManagerRewriteLock = (operation: () => Promise | T) => Promise; + type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM"; type DeferredTurnMaintenanceProcessLike = Pick & Partial> & { @@ -277,6 +279,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { sessionKey?: string; sessionFile: string; sessionManager?: Parameters[0]["sessionManager"]; + withSessionManagerRewriteLock?: SessionManagerRewriteLock; runtimeContext?: ContextEngineRuntimeContext; agentId?: string; allowDeferredCompactionExecution?: boolean; @@ -297,10 +300,15 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { ...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}), rewriteTranscriptEntries: async (request) => { if (params.sessionManager) { - return rewriteTranscriptEntriesInSessionManager({ - sessionManager: params.sessionManager, - replacements: request.replacements, - }); + const sessionManager = params.sessionManager; + const rewriteSessionManagerEntries = () => + rewriteTranscriptEntriesInSessionManager({ + sessionManager, + replacements: request.replacements, + }); + return params.withSessionManagerRewriteLock + ? await params.withSessionManagerRewriteLock(rewriteSessionManagerEntries) + : rewriteSessionManagerEntries(); } const rewriteTranscriptEntriesInFile = async () => await rewriteTranscriptEntriesInSessionFile({ @@ -329,6 +337,7 @@ async function executeContextEngineMaintenance(params: { sessionFile: string; reason: "bootstrap" | "compaction" | "turn"; sessionManager?: Parameters[0]["sessionManager"]; + withSessionManagerRewriteLock?: SessionManagerRewriteLock; runtimeContext?: ContextEngineRuntimeContext; agentId?: string; executionMode: "foreground" | "background"; @@ -346,6 +355,8 @@ async function executeContextEngineMaintenance(params: { sessionKey: params.sessionKey, sessionFile: params.sessionFile, sessionManager: params.executionMode === "background" ? undefined : params.sessionManager, + withSessionManagerRewriteLock: + params.executionMode === "background" ? undefined : params.withSessionManagerRewriteLock, runtimeContext: params.runtimeContext, agentId: params.agentId, allowDeferredCompactionExecution: params.executionMode === "background", @@ -636,6 +647,7 @@ export async function runContextEngineMaintenance(params: { sessionFile: string; reason: "bootstrap" | "compaction" | "turn"; sessionManager?: Parameters[0]["sessionManager"]; + withSessionManagerRewriteLock?: SessionManagerRewriteLock; runtimeContext?: ContextEngineRuntimeContext; agentId?: string; executionMode?: "foreground" | "background"; @@ -681,6 +693,7 @@ export async function runContextEngineMaintenance(params: { sessionFile: params.sessionFile, reason: params.reason, sessionManager: params.sessionManager, + withSessionManagerRewriteLock: params.withSessionManagerRewriteLock, runtimeContext: params.runtimeContext, agentId: params.agentId, executionMode, diff --git a/src/agents/pi-embedded-runner/google-prompt-cache.test.ts b/src/agents/pi-embedded-runner/google-prompt-cache.test.ts index 38d323aa6ae..4fd63f38061 100644 --- a/src/agents/pi-embedded-runner/google-prompt-cache.test.ts +++ b/src/agents/pi-embedded-runner/google-prompt-cache.test.ts @@ -3,6 +3,7 @@ import type { StreamFn } from "@earendil-works/pi-agent-core"; import type { Model } from "@earendil-works/pi-ai"; import { describe, expect, it, vi } from "vitest"; import { prepareGooglePromptCacheStreamFn } from "./google-prompt-cache.js"; +import { EmbeddedAttemptSessionTakeoverError } from "./run/attempt.session-lock.js"; type SessionCustomEntry = { type: "custom"; @@ -13,6 +14,11 @@ type SessionCustomEntry = { data: unknown; }; +type TestGooglePromptCacheSessionManager = { + appendCustomEntry(customType: string, data: unknown): void | Promise; + getEntries(): SessionCustomEntry[]; +}; + function makeSessionManager(entries: SessionCustomEntry[] = []) { let counter = 0; return { @@ -27,7 +33,6 @@ function makeSessionManager(entries: SessionCustomEntry[] = []) { customType, data, }); - return id; }, getEntries() { return entries; @@ -117,7 +122,7 @@ function streamOptions(streamFn: { mock: { calls: unknown[][] } }, callIndex = 0 function preparePromptCacheStream(params: { fetchMock: ReturnType; now: number; - sessionManager: ReturnType; + sessionManager: TestGooglePromptCacheSessionManager; streamFn: StreamFn; }) { return prepareGooglePromptCacheStreamFn( @@ -159,20 +164,23 @@ describe("google prompt cache", () => { }); expect(wrapped).toBeTypeOf("function"); - void wrapped?.( - makeGoogleModel(), - { - systemPrompt: "Follow policy.", - messages: [], - tools: [ - { - name: "lookup", - description: "Look up a value", - parameters: { type: "object" }, - }, - ], - } as never, - { temperature: 0.2 } as never, + expect(fetchMock).not.toHaveBeenCalled(); + await Promise.resolve( + wrapped?.( + makeGoogleModel(), + { + systemPrompt: "Follow policy.", + messages: [], + tools: [ + { + name: "lookup", + description: "Look up a value", + parameters: { type: "object" }, + }, + ], + } as never, + { temperature: 0.2 } as never, + ), ); expect(fetchMock).toHaveBeenCalledTimes(1); @@ -230,12 +238,19 @@ describe("google prompt cache", () => { expireTime: new Date(now + 3_600_000).toISOString(), }); - await preparePromptCacheStream({ + const firstWrapped = await preparePromptCacheStream({ fetchMock, now, sessionManager, streamFn: vi.fn(() => "first" as never), }); + await Promise.resolve( + firstWrapped?.( + makeGoogleModel(), + { systemPrompt: "Follow policy.", messages: [] } as never, + {} as never, + ), + ); fetchMock.mockClear(); const { streamFn: innerStreamFn, getCapturedPayload } = createCapturingStreamFn("second"); @@ -246,10 +261,12 @@ describe("google prompt cache", () => { streamFn: innerStreamFn, }); - void wrapped?.( - makeGoogleModel(), - { systemPrompt: "Follow policy.", messages: [] } as never, - {} as never, + await Promise.resolve( + wrapped?.( + makeGoogleModel(), + { systemPrompt: "Follow policy.", messages: [] } as never, + {} as never, + ), ); expect(fetchMock).not.toHaveBeenCalled(); @@ -259,6 +276,40 @@ describe("google prompt cache", () => { expect(getCapturedPayload()?.cachedContent).toBe("cachedContents/system-cache-2"); }); + it("propagates session takeover errors from cache entry persistence", async () => { + const now = 2_500_000; + const takeoverError = new EmbeddedAttemptSessionTakeoverError("/tmp/session.jsonl"); + const sessionManager = { + appendCustomEntry: vi.fn(async () => { + throw takeoverError; + }), + getEntries: vi.fn(() => []), + }; + const fetchMock = createCacheFetchMock({ + name: "cachedContents/system-cache-takeover", + expireTime: new Date(now + 3_600_000).toISOString(), + }); + const innerStreamFn = vi.fn(() => "stream" as never); + + const wrapped = await preparePromptCacheStream({ + fetchMock, + now, + sessionManager, + streamFn: innerStreamFn, + }); + + await expect( + Promise.resolve( + wrapped?.( + makeGoogleModel(), + { systemPrompt: "Follow policy.", messages: [] } as never, + {} as never, + ), + ), + ).rejects.toBe(takeoverError); + expect(innerStreamFn).not.toHaveBeenCalled(); + }); + it("refreshes an about-to-expire cache entry instead of creating a new one", async () => { const now = 3_000_000; const expireSoon = new Date(now + 60_000).toISOString(); @@ -297,10 +348,12 @@ describe("google prompt cache", () => { streamFn: innerStreamFn, }); - void wrapped?.( - makeGoogleModel(), - { systemPrompt: "Follow policy.", messages: [] } as never, - {} as never, + await Promise.resolve( + wrapped?.( + makeGoogleModel(), + { systemPrompt: "Follow policy.", messages: [] } as never, + {} as never, + ), ); expect(fetchMock).toHaveBeenCalledTimes(1); diff --git a/src/agents/pi-embedded-runner/google-prompt-cache.ts b/src/agents/pi-embedded-runner/google-prompt-cache.ts index 2d0e412c731..18fe5972c6c 100644 --- a/src/agents/pi-embedded-runner/google-prompt-cache.ts +++ b/src/agents/pi-embedded-runner/google-prompt-cache.ts @@ -5,11 +5,13 @@ import { parseGeminiAuth } from "../../infra/gemini-auth.js"; import { normalizeGoogleApiBaseUrl } from "../../infra/google-api-base-url.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { buildGuardedModelFetch } from "../provider-transport-fetch.js"; +import { isSessionWriteLockTimeoutError } from "../session-write-lock-error.js"; import { stableStringify } from "../stable-stringify.js"; import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js"; import { mergeTransportHeaders, sanitizeTransportPayloadText } from "../transport-stream-shared.js"; import { log } from "./logger.js"; import { isGooglePromptCacheEligible, resolveCacheRetention } from "./prompt-cache-retention.js"; +import { EmbeddedAttemptSessionTakeoverError } from "./run/attempt.session-lock.js"; import { streamWithPayloadPatch } from "./stream-payload-utils.js"; const GOOGLE_PROMPT_CACHE_CUSTOM_TYPE = "openclaw.google-prompt-cache"; @@ -21,7 +23,7 @@ type CacheRetention = "short" | "long"; type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown }; type GooglePromptCacheSessionManager = { - appendCustomEntry(customType: string, data?: unknown): unknown; + appendCustomEntry(customType: string, data?: unknown): void | Promise; getEntries(): CustomEntryLike[]; }; type GooglePromptCacheModel = Model & { @@ -159,13 +161,16 @@ function readLatestGooglePromptCacheEntry( return null; } -function appendGooglePromptCacheEntry( +async function appendGooglePromptCacheEntry( sessionManager: GooglePromptCacheSessionManager, entry: GooglePromptCacheEntry, -): void { +): Promise { try { - sessionManager.appendCustomEntry(GOOGLE_PROMPT_CACHE_CUSTOM_TYPE, entry); - } catch { + await sessionManager.appendCustomEntry(GOOGLE_PROMPT_CACHE_CUSTOM_TYPE, entry); + } catch (err) { + if (err instanceof EmbeddedAttemptSessionTakeoverError || isSessionWriteLockTimeoutError(err)) { + throw err; + } // ignore persistence failures } } @@ -293,7 +298,7 @@ async function ensureGooglePromptCache( signal: params.signal, }).catch(() => null); if (refreshed) { - appendGooglePromptCacheEntry(params.sessionManager, { + await appendGooglePromptCacheEntry(params.sessionManager, { status: "ready", timestamp: now, provider: params.provider, @@ -322,7 +327,7 @@ async function ensureGooglePromptCache( systemPrompt: params.systemPrompt, }); if (!created) { - appendGooglePromptCacheEntry(params.sessionManager, { + await appendGooglePromptCacheEntry(params.sessionManager, { status: "failed", timestamp: now, provider: params.provider, @@ -336,7 +341,7 @@ async function ensureGooglePromptCache( return null; } - appendGooglePromptCacheEntry(params.sessionManager, { + await appendGooglePromptCacheEntry(params.sessionManager, { status: "ready", timestamp: now, provider: params.provider, @@ -379,28 +384,28 @@ export async function prepareGooglePromptCacheStreamFn( return undefined; } - const cachedContent = await ensureGooglePromptCache( - { - apiKey, - cacheRetention: resolvedRetention, - model: params.model, - provider: params.provider, - sessionManager: params.sessionManager, - signal: params.signal, - systemPrompt, - }, - deps, - ); - if (!cachedContent) { - log.debug( - `google prompt cache unavailable for ${params.provider}/${params.modelId}; continuing without cachedContent`, - ); - return undefined; - } - const inner = params.streamFn; - return (model, context, options) => - streamWithPayloadPatch( + return async (model, context, options) => { + const cachedContent = await ensureGooglePromptCache( + { + apiKey, + cacheRetention: resolvedRetention, + model: params.model, + provider: params.provider, + sessionManager: params.sessionManager, + signal: params.signal, + systemPrompt, + }, + deps, + ); + if (!cachedContent) { + log.debug( + `google prompt cache unavailable for ${params.provider}/${params.modelId}; continuing without cachedContent`, + ); + return inner(model, context, options); + } + + return streamWithPayloadPatch( inner, model, buildManagedContextWithoutSystemPrompt(context), @@ -409,4 +414,5 @@ export async function prepareGooglePromptCacheStreamFn( payload.cachedContent = cachedContent; }, ); + }; } diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts new file mode 100644 index 00000000000..12d550dc822 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts @@ -0,0 +1,462 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js"; +import { + createEmbeddedAttemptSessionLockController, + EmbeddedAttemptSessionTakeoverError, + installPromptSubmissionLockRelease, + installSessionEventWriteLock, + installSessionExternalHookWriteLock, +} from "./attempt.session-lock.js"; + +const lockOptions = { + sessionFile: "/tmp/session.jsonl", + timeoutMs: 60_000, + staleMs: 1_800_000, + maxHoldMs: 300_000, +}; + +const tempDirs: string[] = []; + +afterEach(async () => { + for (const dir of tempDirs.splice(0)) { + await fs.rm(dir, { recursive: true, force: true }); + } +}); + +async function createTempSessionFile(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-attempt-session-lock-")); + tempDirs.push(dir); + const sessionFile = path.join(dir, "session.jsonl"); + await fs.writeFile(sessionFile, '{"type":"session"}\n', "utf8"); + return sessionFile; +} + +describe("embedded attempt session lock lifecycle", () => { + it("releases the coarse attempt lock before prompt submission and reacquires for cleanup", async () => { + const releases: string[] = []; + const acquireSessionWriteLock = vi + .fn() + .mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) }) + .mockResolvedValueOnce({ release: vi.fn(async () => releases.push("cleanup")) }); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions, + }); + + await controller.releaseForPrompt(); + const cleanupLock = await controller.acquireForCleanup(); + await cleanupLock.release(); + + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2); + expect(acquireSessionWriteLock).toHaveBeenNthCalledWith(1, lockOptions); + expect(acquireSessionWriteLock).toHaveBeenNthCalledWith(2, lockOptions); + expect(releases).toEqual(["prep", "cleanup"]); + }); + + it("runs post-prompt transcript writes under a short reacquired lock", async () => { + const events: string[] = []; + const acquireSessionWriteLock = vi + .fn() + .mockResolvedValueOnce({ release: vi.fn(async () => events.push("prep-release")) }) + .mockResolvedValueOnce({ release: vi.fn(async () => events.push("post-release")) }); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions, + }); + + await controller.releaseForPrompt(); + await controller.withSessionWriteLock(async () => { + events.push("post-write"); + }); + + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2); + expect(events).toEqual(["prep-release", "post-write", "post-release"]); + }); + + it("reuses its active post-prompt lock for nested session writes", async () => { + const events: string[] = []; + const sessionFile = await createTempSessionFile(); + const acquireSessionWriteLock = vi + .fn() + .mockResolvedValueOnce({ release: vi.fn(async () => events.push("prep-release")) }) + .mockResolvedValueOnce({ release: vi.fn(async () => events.push("post-release")) }) + .mockRejectedValueOnce( + new SessionWriteLockTimeoutError({ + timeoutMs: lockOptions.timeoutMs, + owner: "pid=789", + lockPath: `${sessionFile}.lock`, + }), + ); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await controller.releaseForPrompt(); + await controller.withSessionWriteLock(async () => { + events.push("outer-start"); + await fs.appendFile(sessionFile, '{"type":"message","id":"local"}\n', "utf8"); + await controller.withSessionWriteLock(async () => { + events.push("inner-write"); + }); + events.push("outer-end"); + }); + + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2); + expect(events).toEqual([ + "prep-release", + "outer-start", + "inner-write", + "outer-end", + "post-release", + ]); + }); + + it("drains queued Pi session events before reacquiring for cleanup", async () => { + const events: string[] = []; + let resolveQueue!: () => void; + const session = { + _agentEventQueue: new Promise((resolve) => { + resolveQueue = resolve; + }).then(() => { + events.push("events-drained"); + }), + }; + let acquireCount = 0; + const acquireSessionWriteLock = vi.fn(async () => { + acquireCount += 1; + events.push(`acquire-${acquireCount}`); + return { + release: vi.fn(async () => { + events.push("release"); + }), + }; + }); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions, + }); + await controller.releaseForPrompt(); + const cleanupLockPromise = controller.acquireForCleanup({ session }); + + await Promise.resolve(); + expect(events).toEqual(["acquire-1", "release"]); + + resolveQueue(); + const cleanupLock = await cleanupLockPromise; + await cleanupLock.release(); + + expect(events).toEqual(["acquire-1", "release", "events-drained", "acquire-2", "release"]); + }); + + it("rejects post-prompt writes when another owner advances the session file", async () => { + const sessionFile = await createTempSessionFile(); + const release = vi.fn(async () => {}); + const acquireSessionWriteLock = vi.fn(async () => ({ release })); + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { ...lockOptions, sessionFile }, + }); + + await controller.releaseForPrompt(); + await fs.appendFile(sessionFile, '{"type":"message","id":"takeover"}\n', "utf8"); + + await expect(controller.withSessionWriteLock(() => "late-write")).rejects.toBeInstanceOf( + EmbeddedAttemptSessionTakeoverError, + ); + expect(controller.hasSessionTakeover()).toBe(true); + + const cleanupLock = await controller.acquireForCleanup(); + await cleanupLock.release(); + + expect(release).toHaveBeenCalledTimes(2); + }); + + it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => { + const releases: string[] = []; + const acquireSessionWriteLock = vi + .fn() + .mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) }) + .mockRejectedValueOnce( + new SessionWriteLockTimeoutError({ + timeoutMs: lockOptions.timeoutMs, + owner: "pid=123", + lockPath: `${lockOptions.sessionFile}.lock`, + }), + ); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions, + }); + + await controller.releaseForPrompt(); + const cleanupLock = await controller.acquireForCleanup(); + await cleanupLock.release(); + + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2); + expect(controller.hasSessionTakeover()).toBe(true); + expect(releases).toEqual(["prep"]); + }); + + it("skips cleanup lock reacquisition after a post-prompt lock timeout", async () => { + const releases: string[] = []; + const acquireSessionWriteLock = vi + .fn() + .mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) }) + .mockRejectedValueOnce( + new SessionWriteLockTimeoutError({ + timeoutMs: lockOptions.timeoutMs, + owner: "pid=456", + lockPath: `${lockOptions.sessionFile}.lock`, + }), + ); + + const controller = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions, + }); + + await controller.releaseForPrompt(); + await expect(controller.withSessionWriteLock(() => "late-write")).rejects.toBeInstanceOf( + SessionWriteLockTimeoutError, + ); + const cleanupLock = await controller.acquireForCleanup(); + await cleanupLock.release(); + + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2); + expect(controller.hasSessionTakeover()).toBe(true); + expect(releases).toEqual(["prep"]); + }); + + it("wraps provider stream submission with queued transcript drain and lock release", async () => { + const events: string[] = []; + const streamFn = vi.fn(async (..._args: unknown[]) => { + events.push("stream"); + }); + const waitForSessionEvents = vi.fn(async () => { + events.push("drain"); + }); + const releaseForPrompt = vi.fn(async () => { + events.push("release"); + }); + const session = { agent: { streamFn } }; + + installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt }); + + await session.agent.streamFn("model", "context"); + + expect(waitForSessionEvents).toHaveBeenCalledWith(session); + expect(releaseForPrompt).toHaveBeenCalledTimes(1); + expect(streamFn).toHaveBeenCalledWith("model", "context"); + expect(events).toEqual(["drain", "release", "stream"]); + }); + + it("rewraps provider stream submission after the stream function is rebuilt", async () => { + const events: string[] = []; + const firstStreamFn = vi.fn(async (..._args: unknown[]) => { + events.push("first-stream"); + }); + const secondStreamFn = vi.fn(async (..._args: unknown[]) => { + events.push("second-stream"); + }); + const waitForSessionEvents = vi.fn(async () => { + events.push("drain"); + }); + const releaseForPrompt = vi.fn(async () => { + events.push("release"); + }); + const session = { agent: { streamFn: firstStreamFn } }; + + installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt }); + installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt }); + await session.agent.streamFn("first-model"); + + session.agent.streamFn = secondStreamFn; + installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt }); + await session.agent.streamFn("second-model"); + + expect(firstStreamFn).toHaveBeenCalledTimes(1); + expect(secondStreamFn).toHaveBeenCalledTimes(1); + expect(waitForSessionEvents).toHaveBeenCalledTimes(2); + expect(releaseForPrompt).toHaveBeenCalledTimes(2); + expect(events).toEqual([ + "drain", + "release", + "first-stream", + "drain", + "release", + "second-stream", + ]); + }); + + it("locks agent events that can reach transcript writers or registered extension hooks", async () => { + const releases: string[] = []; + const acquireSessionWriteLock = vi.fn(async (_options: typeof lockOptions) => ({ + release: vi.fn(async () => { + releases.push("released"); + }), + })); + const processed: Array = []; + const hasHandlers = vi.fn(() => false); + const session = { + _extensionRunner: { hasHandlers }, + _processAgentEvent: vi.fn(async (event: { type?: string }) => { + processed.push(event.type); + }), + }; + + installSessionEventWriteLock({ + session, + withSessionWriteLock: async (run) => { + const lock = await acquireSessionWriteLock(lockOptions); + try { + return await run(); + } finally { + await lock.release(); + } + }, + }); + + await session._processAgentEvent({ type: "message_update" }); + await session._processAgentEvent({ type: "tool_execution_end" }); + await session._processAgentEvent({ type: "message_end" }); + await session._processAgentEvent({ type: "agent_end" }); + await session._processAgentEvent({}); + + expect(processed).toEqual([ + "message_update", + "tool_execution_end", + "message_end", + "agent_end", + undefined, + ]); + expect(hasHandlers).toHaveBeenCalledWith("tool_execution_end"); + expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3); + expect(acquireSessionWriteLock).toHaveBeenCalledWith(lockOptions); + expect(releases).toEqual(["released", "released", "released"]); + }); + + it("locks Pi extension hooks that can mutate the session outside agent events", async () => { + const locked: string[] = []; + const called: string[] = []; + const hasHandlers = vi.fn( + (eventType: string) => + eventType === "tool_call" || + eventType === "tool_result" || + eventType === "before_provider_request", + ); + const session = { + _extensionRunner: { hasHandlers }, + compact: vi.fn(async () => called.push("compact")), + agent: { + beforeToolCall: vi.fn(async () => called.push("tool_call")), + afterToolCall: vi.fn(async () => called.push("tool_result")), + onPayload: vi.fn(async () => { + called.push("before_provider_request"); + return { ok: true }; + }), + onResponse: vi.fn(async () => called.push("after_provider_response")), + }, + }; + + installSessionExternalHookWriteLock({ + session, + withSessionWriteLock: async (run) => { + locked.push("lock"); + return await run(); + }, + }); + + await session.agent.beforeToolCall(); + await session.agent.afterToolCall(); + await expect(session.agent.onPayload()).resolves.toEqual({ ok: true }); + await session.agent.onResponse(); + await session.compact(); + + expect(called).toEqual([ + "tool_call", + "tool_result", + "before_provider_request", + "after_provider_response", + "compact", + ]); + expect(locked).toEqual(["lock", "lock", "lock", "lock"]); + expect(hasHandlers).toHaveBeenCalledWith("tool_result"); + expect(hasHandlers).toHaveBeenCalledWith("before_provider_request"); + expect(hasHandlers).toHaveBeenCalledWith("after_provider_response"); + }); + + it("fences tool calls even when no extension hook is registered", async () => { + const events: string[] = []; + const session = { + _extensionRunner: { + hasHandlers: vi.fn(() => false), + }, + agent: { + beforeToolCall: vi.fn(async () => { + events.push("tool_call"); + }), + }, + }; + + installSessionExternalHookWriteLock({ + session, + withSessionWriteLock: async (run) => { + events.push("lock"); + return await run(); + }, + }); + + await session.agent.beforeToolCall(); + + expect(events).toEqual(["lock", "tool_call"]); + expect(session._extensionRunner.hasHandlers).not.toHaveBeenCalledWith("tool_call"); + }); + + it("drains queued session events before locking a tool-call extension hook", async () => { + const events: string[] = []; + let resolveQueue!: () => void; + const session = { + _agentEventQueue: new Promise((resolve) => { + resolveQueue = resolve; + }).then(() => { + events.push("queue-drained"); + }), + _extensionRunner: { + hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"), + }, + agent: { + beforeToolCall: vi.fn(async () => { + events.push("hook-start"); + await session._agentEventQueue; + events.push("hook-end"); + }), + }, + }; + + installSessionExternalHookWriteLock({ + session, + withSessionWriteLock: async (run) => { + events.push("lock"); + return await run(); + }, + }); + + const hookPromise = session.agent.beforeToolCall(); + await Promise.resolve(); + expect(events).toEqual([]); + + resolveQueue(); + await hookPromise; + + expect(events).toEqual(["queue-drained", "lock", "hook-start", "hook-end"]); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.session-lock.ts b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts new file mode 100644 index 00000000000..dc981a2eed1 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.session-lock.ts @@ -0,0 +1,392 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import fs from "node:fs/promises"; +import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js"; +import type { acquireSessionWriteLock } from "../../session-write-lock.js"; + +type SessionLock = Awaited>; +type AcquireSessionWriteLock = typeof acquireSessionWriteLock; + +type LockOptions = { + sessionFile: string; + timeoutMs: number; + staleMs: number; + maxHoldMs: number; +}; + +type SessionEventProcessor = { + _processAgentEvent?: (event: unknown) => Promise; + _extensionRunner?: { + hasHandlers?: (eventType: string) => boolean; + }; + __openclawSessionEventWriteLockInstalled?: boolean; +}; + +type SessionEventQueueOwner = { + _agentEventQueue?: PromiseLike; +}; + +type SessionWithAgentPrompt = { + agent?: { + streamFn?: PromptReleaseStreamFn; + }; +}; + +type SessionWithExternalHooks = SessionEventProcessor & { + compact?: LockableFunction; + agent?: { + beforeToolCall?: LockableFunction; + afterToolCall?: LockableFunction; + onPayload?: LockableFunction; + onResponse?: LockableFunction; + }; +}; + +type PromptReleaseStreamFn = ((...args: unknown[]) => unknown) & { + __openclawSessionLockPromptReleaseInstalled?: boolean; +}; + +type LockableFunction = ((...args: unknown[]) => unknown) & { + __openclawSessionWriteLockInstalled?: boolean; +}; + +function sessionHasExtensionHandlers(session: SessionEventProcessor, eventType: string): boolean { + const hasHandlers = session._extensionRunner?.hasHandlers; + if (typeof hasHandlers !== "function") { + return false; + } + try { + return hasHandlers.call(session._extensionRunner, eventType); + } catch { + return true; + } +} + +function eventMayReachTranscriptWriters(session: SessionEventProcessor, event: unknown): boolean { + const type = (event as { type?: unknown } | null)?.type; + if (type === "message_update" || type === "message_end" || type === "agent_end") { + return true; + } + if (typeof type !== "string") { + return false; + } + return sessionHasExtensionHandlers(session, type); +} + +function installLockableFunction(params: { + owner: Record; + key: string; + shouldLock: () => boolean; + waitBeforeLock?: () => Promise; + withSessionWriteLock: (run: () => Promise | T) => Promise; +}): void { + const current = params.owner[params.key] as LockableFunction | undefined; + if (typeof current !== "function" || current.__openclawSessionWriteLockInstalled === true) { + return; + } + const wrapped: LockableFunction = async function lockedExternalHook( + this: unknown, + ...args: unknown[] + ) { + if (!params.shouldLock()) { + return await current.apply(this, args); + } + await params.waitBeforeLock?.(); + return await params.withSessionWriteLock(async () => await current.apply(this, args)); + }; + wrapped.__openclawSessionWriteLockInstalled = true; + params.owner[params.key] = wrapped; +} + +type SessionFileFingerprint = + | { exists: false } + | { + exists: true; + dev: bigint; + ino: bigint; + size: bigint; + mtimeNs: bigint; + ctimeNs: bigint; + }; + +function sameSessionFileFingerprint( + left: SessionFileFingerprint | undefined, + right: SessionFileFingerprint, +): boolean { + if (!left || left.exists !== right.exists) { + return false; + } + if (!left.exists || !right.exists) { + return true; + } + return ( + left.dev === right.dev && + left.ino === right.ino && + left.size === right.size && + left.mtimeNs === right.mtimeNs && + left.ctimeNs === right.ctimeNs + ); +} + +async function readSessionFileFingerprint(sessionFile: string): Promise { + try { + const stat = await fs.stat(sessionFile, { bigint: true }); + return { + exists: true, + dev: stat.dev, + ino: stat.ino, + size: stat.size, + mtimeNs: stat.mtimeNs, + ctimeNs: stat.ctimeNs, + }; + } catch (err) { + if ((err as NodeJS.ErrnoException).code === "ENOENT") { + return { exists: false }; + } + throw err; + } +} + +async function waitForSessionEventQueue(session: unknown): Promise { + const owner = session as SessionEventQueueOwner; + for (let attempts = 0; attempts < 5; attempts += 1) { + const queue = owner?._agentEventQueue; + if (!queue || typeof queue.then !== "function") { + return; + } + await Promise.resolve(queue).catch(() => {}); + if (owner?._agentEventQueue === queue) { + return; + } + } + const queue = owner?._agentEventQueue; + if (queue && typeof queue.then === "function") { + await Promise.resolve(queue).catch(() => {}); + } +} + +export class EmbeddedAttemptSessionTakeoverError extends Error { + constructor(sessionFile: string) { + super(`session file changed while embedded prompt lock was released: ${sessionFile}`); + this.name = "EmbeddedAttemptSessionTakeoverError"; + } +} + +export function installSessionEventWriteLock(params: { + session: unknown; + withSessionWriteLock: (run: () => Promise | T) => Promise; +}): void { + const session = params.session as SessionEventProcessor; + const original = session._processAgentEvent; + if (typeof original !== "function" || session.__openclawSessionEventWriteLockInstalled === true) { + return; + } + session.__openclawSessionEventWriteLockInstalled = true; + session._processAgentEvent = async function lockedProcessAgentEvent( + this: unknown, + event: unknown, + ) { + if (!eventMayReachTranscriptWriters(session, event)) { + return await original.call(this, event); + } + return await params.withSessionWriteLock(async () => await original.call(this, event)); + }; +} + +export function installSessionExternalHookWriteLock(params: { + session: unknown; + withSessionWriteLock: (run: () => Promise | T) => Promise; +}): void { + const session = params.session as SessionWithExternalHooks; + const agent = session.agent; + if (agent) { + installLockableFunction({ + owner: agent as Record, + key: "beforeToolCall", + shouldLock: () => true, + waitBeforeLock: () => waitForSessionEventQueue(session), + withSessionWriteLock: params.withSessionWriteLock, + }); + installLockableFunction({ + owner: agent as Record, + key: "afterToolCall", + shouldLock: () => sessionHasExtensionHandlers(session, "tool_result"), + waitBeforeLock: () => waitForSessionEventQueue(session), + withSessionWriteLock: params.withSessionWriteLock, + }); + installLockableFunction({ + owner: agent as Record, + key: "onPayload", + shouldLock: () => sessionHasExtensionHandlers(session, "before_provider_request"), + waitBeforeLock: () => waitForSessionEventQueue(session), + withSessionWriteLock: params.withSessionWriteLock, + }); + installLockableFunction({ + owner: agent as Record, + key: "onResponse", + shouldLock: () => sessionHasExtensionHandlers(session, "after_provider_response"), + waitBeforeLock: () => waitForSessionEventQueue(session), + withSessionWriteLock: params.withSessionWriteLock, + }); + } + installLockableFunction({ + owner: session as Record, + key: "compact", + shouldLock: () => true, + waitBeforeLock: () => waitForSessionEventQueue(session), + withSessionWriteLock: params.withSessionWriteLock, + }); +} + +export type EmbeddedAttemptSessionLockController = { + releaseForPrompt(): Promise; + waitForSessionEvents(session: unknown): Promise; + withSessionWriteLock(run: () => Promise | T): Promise; + acquireForCleanup(params?: { session?: unknown }): Promise; + hasSessionTakeover(): boolean; +}; + +export async function createEmbeddedAttemptSessionLockController(params: { + acquireSessionWriteLock: AcquireSessionWriteLock; + lockOptions: LockOptions; +}): Promise { + const acquireLock = async (): Promise => + await params.acquireSessionWriteLock({ + sessionFile: params.lockOptions.sessionFile, + timeoutMs: params.lockOptions.timeoutMs, + staleMs: params.lockOptions.staleMs, + maxHoldMs: params.lockOptions.maxHoldMs, + }); + + let heldLock: SessionLock | undefined = await acquireLock(); + const activeWriteLock = new AsyncLocalStorage(); + let fenceFingerprint: SessionFileFingerprint | undefined; + let fenceActive = false; + let takeoverDetected = false; + + async function acquireWriteLock(): Promise<{ lock: SessionLock; owned: boolean }> { + if (heldLock) { + return { lock: heldLock, owned: false }; + } + try { + return { lock: await acquireLock(), owned: true }; + } catch (err) { + if (isSessionWriteLockTimeoutError(err)) { + takeoverDetected = true; + } + throw err; + } + } + + async function assertSessionFileFence(): Promise { + if (!fenceActive) { + return; + } + const current = await readSessionFileFingerprint(params.lockOptions.sessionFile); + if (!sameSessionFileFingerprint(fenceFingerprint, current)) { + takeoverDetected = true; + throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile); + } + } + + async function refreshSessionFileFence(): Promise { + if (fenceActive && !takeoverDetected) { + fenceFingerprint = await readSessionFileFingerprint(params.lockOptions.sessionFile); + } + } + + const noopLock: SessionLock = { release: async () => {} }; + + return { + async releaseForPrompt(): Promise { + if (!heldLock) { + return; + } + const lock = heldLock; + heldLock = undefined; + fenceFingerprint = await readSessionFileFingerprint(params.lockOptions.sessionFile); + fenceActive = true; + await lock.release(); + }, + waitForSessionEvents: waitForSessionEventQueue, + async withSessionWriteLock(run: () => Promise | T): Promise { + if (takeoverDetected) { + throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile); + } + if (activeWriteLock.getStore()) { + return await run(); + } + const { lock, owned } = await acquireWriteLock(); + try { + await assertSessionFileFence(); + const runWithLock = async () => { + const result = await run(); + await refreshSessionFileFence(); + return result; + }; + if (owned) { + return await activeWriteLock.run(lock, runWithLock); + } + return await runWithLock(); + } finally { + if (owned) { + await lock.release(); + } + } + }, + async acquireForCleanup(cleanupParams?: { session?: unknown }): Promise { + if (cleanupParams?.session) { + await waitForSessionEventQueue(cleanupParams.session); + } + if (takeoverDetected) { + return noopLock; + } + try { + heldLock ??= await acquireLock(); + } catch (err) { + if (isSessionWriteLockTimeoutError(err)) { + takeoverDetected = true; + return noopLock; + } + throw err; + } + const cleanupLock = heldLock; + heldLock = undefined; + try { + await assertSessionFileFence(); + } catch (err) { + await cleanupLock.release(); + if (err instanceof EmbeddedAttemptSessionTakeoverError) { + return noopLock; + } + throw err; + } + return cleanupLock; + }, + hasSessionTakeover(): boolean { + return takeoverDetected; + }, + }; +} + +export function installPromptSubmissionLockRelease(params: { + session: unknown; + waitForSessionEvents: (session: unknown) => Promise; + releaseForPrompt: () => Promise; +}): void { + const agent = (params.session as SessionWithAgentPrompt).agent; + if (typeof agent?.streamFn !== "function") { + return; + } + const currentStreamFn = agent.streamFn; + if (currentStreamFn.__openclawSessionLockPromptReleaseInstalled === true) { + return; + } + const originalStreamFn = currentStreamFn.bind(agent); + const wrappedStreamFn: PromptReleaseStreamFn = async (...args: unknown[]) => { + await params.waitForSessionEvents(params.session); + await params.releaseForPrompt(); + return await originalStreamFn(...args); + }; + wrappedStreamFn.__openclawSessionLockPromptReleaseInstalled = true; + agent.streamFn = wrappedStreamFn; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts index c3abd3dafd6..4be7f87ad77 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts @@ -96,6 +96,26 @@ function expectFields(actual: Record, expected: Record { + const lockId = hoisted.acquireSessionWriteLockMock.mock.calls.length; + events.push(`acquire-${lockId}`); + return { + release: async () => { + events.push(`release-${lockId}`); + }, + }; + }); + return events; +} + +function expectInitialLockReleasedBeforePostTurnWrite(events: string[]) { + expect(events.indexOf("release-1")).toBeGreaterThan(events.indexOf("acquire-1")); + expect(events.indexOf("acquire-2")).toBeGreaterThan(events.indexOf("release-1")); + expect(events.indexOf("release-2")).toBeGreaterThan(events.indexOf("acquire-2")); +} + function createTestContextEngine(params: Partial): AttemptContextEngine { return { info: { @@ -773,6 +793,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { }); it("skips blank visible prompts with replay history before provider submission", async () => { + const lockEvents = trackSessionWriteLocks(); const sessionPrompt = vi.fn(async () => { throw new Error("blank prompt should not be submitted"); }); @@ -809,6 +830,35 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { "prompt skipped event", ); expect(requireRecord(skipped.data, "prompt skipped data").reason).toBe("blank_user_prompt"); + expectInitialLockReleasedBeforePostTurnWrite(lockEvents); + }); + + it("releases the initial session lock before before_agent_run block finalizers", async () => { + const lockEvents = trackSessionWriteLocks(); + const sessionPrompt = vi.fn(async () => { + throw new Error("blocked prompt should not be submitted"); + }); + const runBeforeAgentRun = vi.fn(async () => ({ + pluginId: "test-policy", + decision: { outcome: "block", reason: "Blocked by test policy." }, + })); + hoisted.getGlobalHookRunnerMock.mockReturnValue({ + hasHooks: vi.fn((name: string) => name === "before_agent_run"), + runBeforeAgentRun, + }); + + const result = await createContextEngineAttemptRunner({ + contextEngine: createContextEngineBootstrapAndAssemble(), + sessionKey, + tempPaths, + sessionPrompt, + }); + + expect(runBeforeAgentRun).toHaveBeenCalledTimes(1); + expect(sessionPrompt).not.toHaveBeenCalled(); + expect(result.finalPromptText).toBeUndefined(); + expect(result.promptErrorSource).toBe("hook:before_agent_run"); + expectInitialLockReleasedBeforePostTurnWrite(lockEvents); }); it("uses assembled context as the default precheck authority", async () => { @@ -846,6 +896,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { }); it("honors context engines that opt into preassembly overflow authority", async () => { + const lockEvents = trackSessionWriteLocks(); let sawPrompt = false; const hugeHistory = "large raw history ".repeat(2_000); @@ -878,6 +929,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => { expect(result.promptErrorSource).toBe("precheck"); expect(result.preflightRecovery?.route).toBe("compact_only"); expect(hoisted.preemptiveCompactionCalls.at(-1)).toHaveProperty("unwindowedMessages"); + expectInitialLockReleasedBeforePostTurnWrite(lockEvents); }); it("snapshots pre-assembly messages before assemble even when the engine windows in place", async () => { diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index cec4ff433ec..963b9a39a99 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -441,6 +441,7 @@ vi.mock("../../session-write-lock.js", () => ({ acquireSessionWriteLock: (params: Parameters[0]) => hoisted.acquireSessionWriteLockMock(params), resolveSessionWriteLockAcquireTimeoutMs: () => 60000, + resolveSessionWriteLockOptions: () => ({ timeoutMs: 60000, staleMs: 1_800_000, maxHoldMs: 1 }), resolveSessionLockMaxHoldFromTimeout: () => 1, })); @@ -811,7 +812,8 @@ export type MutableSession = { isCompacting: boolean; isStreaming: boolean; agent: { - streamFn?: unknown; + prompt?: (...args: unknown[]) => Promise; + streamFn?: (...args: unknown[]) => Promise; transport?: string; reset: () => void; state: { @@ -841,6 +843,22 @@ type SessionPromptOverride = ( options?: { images?: unknown[] }, ) => Promise; +type TestAgentStream = { + result: () => Promise; + [Symbol.asyncIterator]: () => AsyncIterator; +}; + +function createCompletedAssistantStream(): TestAgentStream { + return { + async result() { + return { role: "assistant", content: "done" }; + }, + [Symbol.asyncIterator]() { + return (async function* () {})(); + }, + }; +} + let runEmbeddedAttemptPromise: | Promise | undefined; @@ -967,12 +985,30 @@ export function createDefaultEmbeddedSession(params?: { options?: { images?: unknown[] }, ) => Promise; }): MutableSession { + let pendingPrompt: + | { + prompt: string; + options?: { images?: unknown[] }; + } + | undefined; const session: MutableSession = { sessionId: "embedded-session", messages: [...(params?.initialMessages ?? [])], isCompacting: false, isStreaming: false, agent: { + prompt: async (prompt, options) => { + pendingPrompt = { prompt: String(prompt), options: options as { images?: unknown[] } }; + await session.agent.streamFn?.(); + }, + streamFn: async () => { + if (params?.prompt && pendingPrompt) { + const currentPrompt = pendingPrompt; + pendingPrompt = undefined; + await params.prompt(session, currentPrompt.prompt, currentPrompt.options); + } + return createCompletedAssistantStream(); + }, reset: () => { session.messages = []; }, @@ -987,8 +1023,8 @@ export function createDefaultEmbeddedSession(params?: { }, setActiveToolsByName: () => {}, prompt: async (prompt, options) => { + await session.agent.prompt?.(prompt, options); if (params?.prompt) { - await params.prompt(session, prompt, options); return; } session.messages = [ diff --git a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.test.ts b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.test.ts index def3e85a935..5d123b77409 100644 --- a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.test.ts @@ -115,4 +115,25 @@ describe("cleanupEmbeddedAttemptResources", () => { expect(release).toHaveBeenCalledTimes(1); }); + + it("can skip stale session-manager flushing after session takeover", async () => { + const flushPendingToolResultsAfterIdle = vi.fn(async () => {}); + const dispose = vi.fn(); + const release = vi.fn(async () => {}); + + await cleanupEmbeddedAttemptResources({ + flushPendingToolResultsAfterIdle, + session: { + agent: {}, + dispose, + }, + sessionManager: {}, + sessionLock: { release }, + skipSessionFlush: true, + }); + + expect(flushPendingToolResultsAfterIdle).not.toHaveBeenCalled(); + expect(dispose).toHaveBeenCalledTimes(1); + expect(release).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts index da5e8f4d0b8..e83f02c3b00 100644 --- a/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts +++ b/src/agents/pi-embedded-runner/run/attempt.subscription-cleanup.ts @@ -66,6 +66,7 @@ export async function cleanupEmbeddedAttemptResources(params: { sessionLock: { release(): Promise | void }; aborted?: boolean; abortSettlePromise?: Promise | null; + skipSessionFlush?: boolean; runId?: string; sessionId?: string; }): Promise { @@ -85,14 +86,16 @@ export async function cleanupEmbeddedAttemptResources(params: { // PERF: When the run was aborted (user stop / timeout), skip the expensive // waitForIdle (up to 30 s) and flush pending tool results synchronously so // the session write-lock is released without leaving orphaned tool calls. - try { - await params.flushPendingToolResultsAfterIdle({ - agent: params.session?.agent as IdleAwareAgent | null | undefined, - sessionManager: params.sessionManager as ToolResultFlushManager | null | undefined, - ...(params.aborted ? { timeoutMs: 0 } : {}), - }); - } catch { - /* best-effort */ + if (!params.skipSessionFlush) { + try { + await params.flushPendingToolResultsAfterIdle({ + agent: params.session?.agent as IdleAwareAgent | null | undefined, + sessionManager: params.sessionManager as ToolResultFlushManager | null | undefined, + ...(params.aborted ? { timeoutMs: 0 } : {}), + }); + } catch { + /* best-effort */ + } } try { params.session?.dispose(); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index ad0ea92d25f..9e932132021 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import type { AgentMessage } from "@earendil-works/pi-agent-core"; +import type { AssistantMessage } from "@earendil-works/pi-ai"; import { createAgentSession, SessionManager } from "@earendil-works/pi-coding-agent"; import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js"; import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js"; @@ -157,7 +158,7 @@ import { import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } from "../../session-write-lock.js"; import { detectRuntimeShell } from "../../shell-utils.js"; import { @@ -220,6 +221,7 @@ import { collectPromptCacheToolNames, beginPromptCacheObservation, completePromptCacheObservation, + type PromptCacheBreak, type PromptCacheChange, } from "../prompt-cache-observability.js"; import { resolveCacheRetention } from "../prompt-cache-retention.js"; @@ -319,6 +321,12 @@ import { shouldWarnOnOrphanedUserRepair, shouldInjectHeartbeatPrompt, } from "./attempt.prompt-helpers.js"; +import { + createEmbeddedAttemptSessionLockController, + installPromptSubmissionLockRelease, + installSessionExternalHookWriteLock, + installSessionEventWriteLock, +} from "./attempt.session-lock.js"; import { createYieldAbortedResponse, persistSessionsYieldContextMessage, @@ -2010,19 +2018,21 @@ export async function runEmbeddedAttempt( let systemPromptText = systemPromptOverride(); prepStages.mark("system-prompt"); - // Keep the session lock scoped to transcript/session mutations. Cold plugin - // and tool setup can be slow, and holding the lock there blocks CLI fallback - // from taking over the same session when a gateway run stalls before model I/O. - const sessionLock = await acquireSessionWriteLock({ - sessionFile: params.sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), - maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ + const sessionWriteLockOptions = resolveSessionWriteLockOptions(params.config, { + maxHoldMsFallback: resolveSessionLockMaxHoldFromTimeout({ timeoutMs: resolveRunTimeoutWithCompactionGraceMs({ runTimeoutMs: params.timeoutMs, compactionTimeoutMs: resolveCompactionTimeoutMs(params.config), }), }), }); + const sessionLockController = await createEmbeddedAttemptSessionLockController({ + acquireSessionWriteLock, + lockOptions: { + sessionFile: params.sessionFile, + ...sessionWriteLockOptions, + }, + }); let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; @@ -2327,6 +2337,14 @@ export async function runEmbeddedAttempt( } session.setActiveToolsByName(sessionToolAllowlist); const activeSession = session; + installSessionEventWriteLock({ + session: activeSession, + withSessionWriteLock: (operation) => sessionLockController.withSessionWriteLock(operation), + }); + installSessionExternalHookWriteLock({ + session: activeSession, + withSessionWriteLock: (operation) => sessionLockController.withSessionWriteLock(operation), + }); prepStages.mark("agent-session"); if (isRawModelRun) { // Raw model probes should measure exactly the requested prompt against @@ -3224,11 +3242,13 @@ export async function runEmbeddedAttempt( }, abort: abortRun, }; - let lastAssistant: AgentMessage | undefined; + let lastAssistant: AssistantMessage | undefined; let currentAttemptAssistant: EmbeddedRunAttemptResult["currentAttemptAssistant"]; let attemptUsage: NormalizedUsage | undefined; - let cacheBreak: ReturnType = null; + let cacheBreak: PromptCacheBreak | null = null; let promptCache: EmbeddedRunAttemptResult["promptCache"]; + let lastCallUsage: NormalizedUsage | undefined; + let compactionOccurredThisAttempt = false; let finalPromptText: string | undefined; if (params.replyOperation) { params.replyOperation.attachBackend(queueHandle); @@ -3724,7 +3744,14 @@ export async function runEmbeddedAttempt( model: params.model, modelId: params.modelId, provider: params.provider, - sessionManager, + sessionManager: { + appendCustomEntry: async (customType, data) => { + await sessionLockController.withSessionWriteLock(() => { + activeSessionManager.appendCustomEntry(customType, data); + }); + }, + getEntries: () => activeSessionManager.getEntries(), + }, signal: runAbortController.signal, streamFn: activeSession.agent.streamFn, systemPrompt: systemPromptText, @@ -3732,6 +3759,12 @@ export async function runEmbeddedAttempt( if (googlePromptCacheStreamFn) { activeSession.agent.streamFn = googlePromptCacheStreamFn; } + installPromptSubmissionLockRelease({ + session: activeSession, + waitForSessionEvents: (sessionToDrain) => + sessionLockController.waitForSessionEvents(sessionToDrain), + releaseForPrompt: () => sessionLockController.releaseForPrompt(), + }); } // Detect and load images referenced in the visible prompt for vision-capable models. @@ -4027,12 +4060,18 @@ export async function runEmbeddedAttempt( runId: params.runId, sessionId: params.sessionId, }); - stripSessionsYieldArtifacts(activeSession); - if (yieldMessage) { - await persistSessionsYieldContextMessage(activeSession, yieldMessage); - } + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.withSessionWriteLock(async () => { + stripSessionsYieldArtifacts(activeSession); + if (yieldMessage) { + await persistSessionsYieldContextMessage(activeSession, yieldMessage); + } + }); } else if (isMidTurnPrecheckSignal(err)) { - handleMidTurnPrecheckRequest(err.request); + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.withSessionWriteLock(() => { + handleMidTurnPrecheckRequest(err.request); + }); } else { promptError = err; promptErrorSource = "prompt"; @@ -4046,17 +4085,23 @@ export async function runEmbeddedAttempt( if (pendingMidTurnPrecheckRequest) { const request = pendingMidTurnPrecheckRequest; pendingMidTurnPrecheckRequest = null; - removeTrailingMidTurnPrecheckAssistantError({ - activeSession, - sessionManager, + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.withSessionWriteLock(() => { + removeTrailingMidTurnPrecheckAssistantError({ + activeSession, + sessionManager: activeSessionManager, + }); + if (!preflightRecovery && promptErrorSource !== "precheck") { + promptError = null; + promptErrorSource = null; + handleMidTurnPrecheckRequest(request); + } }); - if (!preflightRecovery && promptErrorSource !== "precheck") { - promptError = null; - promptErrorSource = null; - handleMidTurnPrecheckRequest(request); - } } + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.releaseForPrompt(); + // Capture snapshot before compaction wait so we have complete messages if timeout occurs // Check compaction state before and after to avoid race condition where compaction starts during capture // Use session state (not subscription) for snapshot decisions - need instantaneous compaction status @@ -4112,117 +4157,122 @@ export async function runEmbeddedAttempt( } } - // Check if ANY compaction occurred during the entire attempt (prompt + retry). - // Using a cumulative count (> 0) instead of a delta check avoids missing - // compactions that complete during activeSession.prompt() before the delta - // baseline is sampled. - const compactionOccurredThisAttempt = getCompactionCount() > 0; - // Append cache-TTL timestamp AFTER prompt + compaction retry completes. - // Previously this was before the prompt, which caused a custom entry to be - // inserted between compaction and the next prompt — breaking the - // prepareCompaction() guard that checks the last entry type, leading to - // double-compaction. See: https://github.com/openclaw/openclaw/issues/9282 - // Skip when timed out during compaction — session state may be inconsistent. - // Also skip when compaction ran this attempt — appending a custom entry - // after compaction would break the guard again. See: #28491 - appendAttemptCacheTtlIfNeeded({ - sessionManager, - timedOutDuringCompaction, - compactionOccurredThisAttempt, - config: params.config, - provider: params.provider, - modelId: params.modelId, - modelApi: params.model.api, - isCacheTtlEligibleProvider, - }); + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.withSessionWriteLock(async () => { + // Check if ANY compaction occurred during the entire attempt (prompt + retry). + // Using a cumulative count (> 0) instead of a delta check avoids missing + // compactions that complete during activeSession.prompt() before the delta + // baseline is sampled. + compactionOccurredThisAttempt = getCompactionCount() > 0; + // Append cache-TTL timestamp AFTER prompt + compaction retry completes. + // Previously this was before the prompt, which caused a custom entry to be + // inserted between compaction and the next prompt — breaking the + // prepareCompaction() guard that checks the last entry type, leading to + // double-compaction. See: https://github.com/openclaw/openclaw/issues/9282 + // Skip when timed out during compaction — session state may be inconsistent. + // Also skip when compaction ran this attempt — appending a custom entry + // after compaction would break the guard again. See: #28491 + appendAttemptCacheTtlIfNeeded({ + sessionManager: activeSessionManager, + timedOutDuringCompaction, + compactionOccurredThisAttempt, + config: params.config, + provider: params.provider, + modelId: params.modelId, + modelApi: params.model.api, + isCacheTtlEligibleProvider, + }); - // If timeout occurred during compaction, use pre-compaction snapshot when available - // (compaction restructures messages but does not add user/assistant turns). - const snapshotSelection = selectCompactionTimeoutSnapshot({ - timedOutDuringCompaction, - preCompactionSnapshot, - preCompactionSessionId, - currentSnapshot: activeSession.messages.slice(), - currentSessionId: activeSession.sessionId, - }); - if (timedOutDuringCompaction) { - if (!isProbeSession) { - log.warn( - `using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`, - ); + // If timeout occurred during compaction, use pre-compaction snapshot when available + // (compaction restructures messages but does not add user/assistant turns). + const snapshotSelection = selectCompactionTimeoutSnapshot({ + timedOutDuringCompaction, + preCompactionSnapshot, + preCompactionSessionId, + currentSnapshot: activeSession.messages.slice(), + currentSessionId: activeSession.sessionId, + }); + if (timedOutDuringCompaction) { + if (!isProbeSession) { + log.warn( + `using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`, + ); + } } - } - messagesSnapshot = projectToolSearchTargetTranscriptMessages( - snapshotSelection.messagesSnapshot, - toolSearchTargetTranscriptProjections, - ); - sessionIdUsed = snapshotSelection.sessionIdUsed; + messagesSnapshot = projectToolSearchTargetTranscriptMessages( + snapshotSelection.messagesSnapshot, + toolSearchTargetTranscriptProjections, + ); + sessionIdUsed = snapshotSelection.sessionIdUsed; - lastAssistant = messagesSnapshot - .slice() - .toReversed() - .find((m) => m.role === "assistant"); - currentAttemptAssistant = findCurrentAttemptAssistantMessage({ - messagesSnapshot, - prePromptMessageCount, - }); - attemptUsage = getUsageTotals(); - cacheBreak = cacheObservabilityEnabled - ? completePromptCacheObservation({ - sessionId: params.sessionId, - sessionKey: params.sessionKey, - usage: attemptUsage, - }) - : null; - const lastCallUsage = normalizeUsage(currentAttemptAssistant?.usage); - const promptCacheObservation = - cacheObservabilityEnabled && - (cacheBreak || promptCacheChangesForTurn || typeof attemptUsage?.cacheRead === "number") - ? { - broke: Boolean(cacheBreak), - ...(typeof cacheBreak?.previousCacheRead === "number" - ? { previousCacheRead: cacheBreak.previousCacheRead } - : {}), - ...(typeof cacheBreak?.cacheRead === "number" - ? { cacheRead: cacheBreak.cacheRead } - : typeof attemptUsage?.cacheRead === "number" - ? { cacheRead: attemptUsage.cacheRead } + lastAssistant = messagesSnapshot + .slice() + .toReversed() + .find((message): message is AssistantMessage => message.role === "assistant"); + currentAttemptAssistant = findCurrentAttemptAssistantMessage({ + messagesSnapshot, + prePromptMessageCount, + }); + attemptUsage = getUsageTotals(); + cacheBreak = cacheObservabilityEnabled + ? completePromptCacheObservation({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + usage: attemptUsage, + }) + : null; + lastCallUsage = normalizeUsage(currentAttemptAssistant?.usage); + const promptCacheObservation = + cacheObservabilityEnabled && + (cacheBreak || promptCacheChangesForTurn || typeof attemptUsage?.cacheRead === "number") + ? { + broke: Boolean(cacheBreak), + ...(typeof cacheBreak?.previousCacheRead === "number" + ? { previousCacheRead: cacheBreak.previousCacheRead } : {}), - changes: cacheBreak?.changes ?? promptCacheChangesForTurn, - } - : undefined; - const fallbackLastCacheTouchAt = readLastCacheTtlTimestamp(sessionManager, { - provider: params.provider, - modelId: params.modelId, - }); - promptCache = buildContextEnginePromptCacheInfo({ - retention: effectivePromptCacheRetention, - lastCallUsage, - observation: promptCacheObservation, - lastCacheTouchAt: resolvePromptCacheTouchTimestamp({ + ...(typeof cacheBreak?.cacheRead === "number" + ? { cacheRead: cacheBreak.cacheRead } + : typeof attemptUsage?.cacheRead === "number" + ? { cacheRead: attemptUsage.cacheRead } + : {}), + changes: cacheBreak?.changes ?? promptCacheChangesForTurn, + } + : undefined; + const fallbackLastCacheTouchAt = readLastCacheTtlTimestamp(activeSessionManager, { + provider: params.provider, + modelId: params.modelId, + }); + promptCache = buildContextEnginePromptCacheInfo({ + retention: effectivePromptCacheRetention, lastCallUsage, - assistantTimestamp: currentAttemptAssistant?.timestamp, - fallbackLastCacheTouchAt, - }), + observation: promptCacheObservation, + lastCacheTouchAt: resolvePromptCacheTouchTimestamp({ + lastCallUsage, + assistantTimestamp: currentAttemptAssistant?.timestamp, + fallbackLastCacheTouchAt, + }), + }); + + if (promptError && promptErrorSource === "prompt" && !compactionOccurredThisAttempt) { + try { + activeSessionManager.appendCustomEntry("openclaw:prompt-error", { + timestamp: Date.now(), + runId: params.runId, + sessionId: params.sessionId, + provider: params.provider, + model: params.modelId, + api: params.model.api, + error: formatErrorMessage(promptError), + }); + } catch (entryErr) { + log.warn(`failed to persist prompt error entry: ${String(entryErr)}`); + } + } }); - if (promptError && promptErrorSource === "prompt" && !compactionOccurredThisAttempt) { - try { - sessionManager.appendCustomEntry("openclaw:prompt-error", { - timestamp: Date.now(), - runId: params.runId, - sessionId: params.sessionId, - provider: params.provider, - model: params.modelId, - api: params.model.api, - error: formatErrorMessage(promptError), - }); - } catch (entryErr) { - log.warn(`failed to persist prompt error entry: ${String(entryErr)}`); - } - } - - // Let the active context engine run its post-turn lifecycle. + // Let the active context engine run its post-turn lifecycle. These hooks + // may call runtime LLM capabilities, so only their transcript rewrite + // helper reacquires the session write lock. if (activeContextEngine) { const afterTurnRuntimeContext = buildAfterTurnRuntimeContextFromUsage({ attempt: params, @@ -4254,64 +4304,69 @@ export async function runEmbeddedAttempt( sessionFile: contextParams.sessionFile, reason: contextParams.reason, sessionManager: contextParams.sessionManager as never, + withSessionManagerRewriteLock: async (operation) => + await sessionLockController.withSessionWriteLock(operation), runtimeContext: contextParams.runtimeContext, config: params.config, agentId: sessionAgentId, }), - sessionManager, + sessionManager: activeSessionManager, config: params.config, warn: (message) => log.warn(message), }); } - if ( - shouldPersistCompletedBootstrapTurn({ - shouldRecordCompletedBootstrapTurn, - promptError, - aborted, - timedOutDuringCompaction, - compactionOccurredThisAttempt, - }) - ) { - try { - sessionManager.appendCustomEntry(FULL_BOOTSTRAP_COMPLETED_CUSTOM_TYPE, { - timestamp: Date.now(), - runId: params.runId, - sessionId: params.sessionId, - }); - } catch (entryErr) { - log.warn(`failed to persist bootstrap completion entry: ${String(entryErr)}`); - } - } - - if ( - compactionOccurredThisAttempt && - !promptError && - !aborted && - !timedOut && - !idleTimedOut && - !timedOutDuringCompaction && - shouldRotateCompactionTranscript(params.config) - ) { - try { - const rotation = await rotateTranscriptAfterCompaction({ - sessionManager, - sessionFile: params.sessionFile, - }); - if (rotation.rotated) { - sessionIdUsed = rotation.sessionId ?? sessionIdUsed; - sessionFileUsed = rotation.sessionFile ?? sessionFileUsed; - log.info( - `[compaction] rotated active transcript after automatic compaction ` + - `(sessionKey=${params.sessionKey ?? params.sessionId})`, - ); + await sessionLockController.waitForSessionEvents(activeSession); + await sessionLockController.withSessionWriteLock(async () => { + if ( + shouldPersistCompletedBootstrapTurn({ + shouldRecordCompletedBootstrapTurn, + promptError, + aborted, + timedOutDuringCompaction, + compactionOccurredThisAttempt, + }) + ) { + try { + activeSessionManager.appendCustomEntry(FULL_BOOTSTRAP_COMPLETED_CUSTOM_TYPE, { + timestamp: Date.now(), + runId: params.runId, + sessionId: params.sessionId, + }); + } catch (entryErr) { + log.warn(`failed to persist bootstrap completion entry: ${String(entryErr)}`); } - } catch (err) { - log.warn("[compaction] automatic transcript rotation failed", { - errorMessage: formatErrorMessage(err), - }); } - } + + if ( + compactionOccurredThisAttempt && + !promptError && + !aborted && + !timedOut && + !idleTimedOut && + !timedOutDuringCompaction && + shouldRotateCompactionTranscript(params.config) + ) { + try { + const rotation = await rotateTranscriptAfterCompaction({ + sessionManager: activeSessionManager, + sessionFile: params.sessionFile, + }); + if (rotation.rotated) { + sessionIdUsed = rotation.sessionId ?? sessionIdUsed; + sessionFileUsed = rotation.sessionFile ?? sessionFileUsed; + log.info( + `[compaction] rotated active transcript after automatic compaction ` + + `(sessionKey=${params.sessionKey ?? params.sessionId})`, + ); + } + } catch (err) { + log.warn("[compaction] automatic transcript rotation failed", { + errorMessage: formatErrorMessage(err), + }); + } + } + }); cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, @@ -4384,20 +4439,22 @@ export async function runEmbeddedAttempt( ) .map((entry) => ({ toolName: entry.toolName, meta: entry.meta })); if (cacheObservabilityEnabled) { - if (cacheBreak) { + const cacheBreakForLog = cacheBreak as PromptCacheBreak | null; + if (cacheBreakForLog) { const changeSummary = - cacheBreak.changes?.map((change) => `${change.code}(${change.detail})`).join(", ") ?? - "no tracked cache input change"; + cacheBreakForLog.changes + ?.map((change) => `${change.code}(${change.detail})`) + .join(", ") ?? "no tracked cache input change"; log.warn( - `[prompt-cache] cache read dropped ${cacheBreak.previousCacheRead} -> ${cacheBreak.cacheRead} ` + + `[prompt-cache] cache read dropped ${cacheBreakForLog.previousCacheRead} -> ${cacheBreakForLog.cacheRead} ` + `for ${params.provider}/${params.modelId} via ${streamStrategy}; ${changeSummary}`, ); cacheTrace?.recordStage("cache:result", { options: { - previousCacheRead: cacheBreak.previousCacheRead, - cacheRead: cacheBreak.cacheRead, + previousCacheRead: cacheBreakForLog.previousCacheRead, + cacheRead: cacheBreakForLog.cacheRead, changes: - cacheBreak.changes?.map((change) => ({ + cacheBreakForLog.changes?.map((change) => ({ code: change.code, detail: change.detail, })) ?? undefined, @@ -4727,6 +4784,7 @@ export async function runEmbeddedAttempt( timedOut || idleTimedOut || timedOutDuringCompaction; + const cleanupSessionLock = await sessionLockController.acquireForCleanup({ session }); await cleanupEmbeddedAttemptResources({ removeToolResultContextGuard, flushPendingToolResultsAfterIdle, @@ -4734,11 +4792,12 @@ export async function runEmbeddedAttempt( sessionManager, bundleMcpRuntime, bundleLspRuntime, - sessionLock, + sessionLock: cleanupSessionLock, // PERF: If the run was aborted (user stop, timeout, etc.), skip the idle wait // and flush pending results synchronously so we can release the session lock ASAP. aborted: cleanupAborted, abortSettlePromise: cleanupAborted ? buildAbortSettlePromise() : null, + skipSessionFlush: sessionLockController.hasSessionTakeover(), runId: params.runId, sessionId: params.sessionId, }); diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.ts b/src/agents/pi-embedded-runner/tool-result-truncation.ts index d15d2b9217b..4758827f9a1 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.ts @@ -9,7 +9,7 @@ import { resolveAgentContextLimits } from "../agent-scope.js"; import { acquireSessionWriteLock, type SessionWriteLockAcquireTimeoutConfig, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } from "../session-write-lock.js"; import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js"; import { log } from "./logger.js"; @@ -777,7 +777,7 @@ export async function truncateOversizedToolResultsInSession(params: { try { sessionLock = await acquireSessionWriteLock({ sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + ...resolveSessionWriteLockOptions(params.config), }); const state = await readTranscriptFileState(sessionFile); return await truncateOversizedToolResultsInTranscriptState({ diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts index 7ca70777136..e0a7b552c4c 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts @@ -343,7 +343,9 @@ describe("rewriteTranscriptEntriesInSessionFile", () => { expect(result.changed).toBe(true); expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ sessionFile, + staleMs: 1_800_000, timeoutMs: 60_000, + maxHoldMs: 300_000, }); expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ sessionFile, sessionKey: "agent:main:test" }); diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.ts b/src/agents/pi-embedded-runner/transcript-rewrite.ts index 11186608060..ddc091bb55b 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.ts @@ -11,7 +11,7 @@ import { getRawSessionAppendMessage } from "../session-raw-append-message.js"; import { acquireSessionWriteLock, type SessionWriteLockAcquireTimeoutConfig, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } from "../session-write-lock.js"; import { log } from "./logger.js"; import { @@ -366,7 +366,7 @@ export async function rewriteTranscriptEntriesInSessionFile(params: { try { sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + ...resolveSessionWriteLockOptions(params.config), }); const state = await readTranscriptFileState(params.sessionFile); const result = rewriteTranscriptEntriesInState({ diff --git a/src/agents/session-write-lock.test.ts b/src/agents/session-write-lock.test.ts index 45f356f9842..7e45559032d 100644 --- a/src/agents/session-write-lock.test.ts +++ b/src/agents/session-write-lock.test.ts @@ -10,6 +10,7 @@ let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLock let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest; let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout; let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs; +let resolveSessionWriteLockOptions: typeof import("./session-write-lock.js").resolveSessionWriteLockOptions; async function expectLockRemovedOnlyAfterFinalRelease(params: { lockPath: string; @@ -146,6 +147,7 @@ describe("acquireSessionWriteLock", () => { resetSessionWriteLockStateForTest, resolveSessionLockMaxHoldFromTimeout, resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } = await import("./session-write-lock.js")); }); @@ -369,6 +371,91 @@ describe("acquireSessionWriteLock", () => { ).toBe(60_000); }); + it("resolves session write-lock stale and max-hold policy", () => { + expect( + resolveSessionWriteLockOptions({ + session: { + writeLock: { + acquireTimeoutMs: 90_000, + staleMs: 45_000, + maxHoldMs: 30_000, + }, + }, + }), + ).toEqual({ + timeoutMs: 90_000, + staleMs: 45_000, + maxHoldMs: 30_000, + }); + }); + + it("lets session write-lock env override config for emergency tuning", () => { + expect( + resolveSessionWriteLockOptions( + { + session: { + writeLock: { + acquireTimeoutMs: 90_000, + staleMs: 45_000, + maxHoldMs: 30_000, + }, + }, + }, + { + env: { + OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS: "120000", + OPENCLAW_SESSION_WRITE_LOCK_STALE_MS: "60000", + OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS: "50000", + }, + }, + ), + ).toEqual({ + timeoutMs: 120_000, + staleMs: 60_000, + maxHoldMs: 50_000, + }); + }); + + it("uses resolved stale policy when cleaning stale lock files", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-policy-")); + const sessionsDir = path.join(root, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + const nowMs = Date.now(); + const lockPath = path.join(sessionsDir, "configured-live.jsonl.lock"); + + try { + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + createdAt: new Date(nowMs - 45_000).toISOString(), + }), + "utf8", + ); + + const configOnly = await cleanStaleLockFiles({ + sessionsDir, + config: { session: { writeLock: { staleMs: 30_000 } } }, + nowMs, + removeStale: false, + readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"], + }); + expect(configOnly.locks[0]?.stale).toBe(true); + + const envOverride = await cleanStaleLockFiles({ + sessionsDir, + config: { session: { writeLock: { staleMs: 30_000 } } }, + env: { OPENCLAW_SESSION_WRITE_LOCK_STALE_MS: "60000" }, + nowMs, + removeStale: false, + readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"], + }); + expect(envOverride.locks[0]?.stale).toBe(false); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + it("clamps max hold for effectively no-timeout runs", () => { expect( resolveSessionLockMaxHoldFromTimeout({ diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts index a548be75929..98165c972e0 100644 --- a/src/agents/session-write-lock.ts +++ b/src/agents/session-write-lock.ts @@ -36,8 +36,8 @@ type CleanupSignal = (typeof CLEANUP_SIGNALS)[number]; const CLEANUP_STATE_KEY = Symbol.for("openclaw.sessionWriteLockCleanupState"); const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState"); -const DEFAULT_STALE_MS = 30 * 60 * 1000; -const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000; +export const DEFAULT_SESSION_WRITE_LOCK_STALE_MS = 30 * 60 * 1000; +export const DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS = 5 * 60 * 1000; export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000; const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000; const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000; @@ -74,18 +74,113 @@ export type SessionWriteLockAcquireTimeoutConfig = { session?: { writeLock?: { acquireTimeoutMs?: number; + staleMs?: number; + maxHoldMs?: number; }; }; }; +type SessionWriteLockMsKey = "acquireTimeoutMs" | "staleMs" | "maxHoldMs"; + +const SESSION_WRITE_LOCK_ENV: Record = { + acquireTimeoutMs: "OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS", + staleMs: "OPENCLAW_SESSION_WRITE_LOCK_STALE_MS", + maxHoldMs: "OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS", +}; + +function readPositiveMsEnv( + env: NodeJS.ProcessEnv, + key: string, + opts: { allowInfinity?: boolean } = {}, +): number | undefined { + const raw = env[key]?.trim(); + if (!raw) { + return undefined; + } + const value = Number(raw); + return parsePositiveMs(value, opts); +} + +function parsePositiveMs( + value: number | undefined, + opts: { allowInfinity?: boolean } = {}, +): number | undefined { + if (typeof value !== "number" || Number.isNaN(value) || value <= 0) { + return undefined; + } + if (value === Number.POSITIVE_INFINITY) { + return opts.allowInfinity ? value : undefined; + } + if (!Number.isFinite(value)) { + return undefined; + } + return value; +} + +function resolveSessionWriteLockMs(params: { + config?: SessionWriteLockAcquireTimeoutConfig; + env?: NodeJS.ProcessEnv; + key: SessionWriteLockMsKey; + fallback: number; + allowInfinity?: boolean; +}): number { + const opts = { allowInfinity: params.allowInfinity }; + return ( + readPositiveMsEnv(params.env ?? process.env, SESSION_WRITE_LOCK_ENV[params.key], opts) ?? + parsePositiveMs(params.config?.session?.writeLock?.[params.key], opts) ?? + params.fallback + ); +} + export function resolveSessionWriteLockAcquireTimeoutMs( config?: SessionWriteLockAcquireTimeoutConfig, + env?: NodeJS.ProcessEnv, ): number { - return resolvePositiveMs( - config?.session?.writeLock?.acquireTimeoutMs, - DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS, - { allowInfinity: true }, - ); + return resolveSessionWriteLockMs({ + config, + env, + key: "acquireTimeoutMs", + fallback: DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS, + allowInfinity: true, + }); +} + +export function resolveSessionWriteLockStaleMs( + config?: SessionWriteLockAcquireTimeoutConfig, + env?: NodeJS.ProcessEnv, +): number { + return resolveSessionWriteLockMs({ + config, + env, + key: "staleMs", + fallback: DEFAULT_SESSION_WRITE_LOCK_STALE_MS, + }); +} + +export function resolveSessionWriteLockMaxHoldMs( + config?: SessionWriteLockAcquireTimeoutConfig, + params: { env?: NodeJS.ProcessEnv; fallback?: number } = {}, +): number { + return resolveSessionWriteLockMs({ + config, + env: params.env, + key: "maxHoldMs", + fallback: params.fallback ?? DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS, + }); +} + +export function resolveSessionWriteLockOptions( + config?: SessionWriteLockAcquireTimeoutConfig, + params: { env?: NodeJS.ProcessEnv; maxHoldMsFallback?: number } = {}, +): { timeoutMs: number; staleMs: number; maxHoldMs: number } { + return { + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(config, params.env), + staleMs: resolveSessionWriteLockStaleMs(config, params.env), + maxHoldMs: resolveSessionWriteLockMaxHoldMs(config, { + env: params.env, + fallback: params.maxHoldMsFallback, + }), + }; } function resolveCleanupState(): CleanupState { @@ -137,7 +232,7 @@ export function resolveSessionLockMaxHoldFromTimeout(params: { graceMs?: number; minMs?: number; }): number { - const minMs = resolvePositiveMs(params.minMs, DEFAULT_MAX_HOLD_MS); + const minMs = resolvePositiveMs(params.minMs, DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS); const timeoutMs = resolvePositiveMs(params.timeoutMs, minMs, { allowInfinity: true }); if (timeoutMs === Number.POSITIVE_INFINITY) { return MAX_LOCK_HOLD_MS; @@ -159,7 +254,9 @@ async function runLockWatchdogCheck(nowMs = Date.now()): Promise { let released = 0; for (const held of SESSION_LOCKS.heldEntries()) { const maxHoldMs = - typeof held.metadata.maxHoldMs === "number" ? held.metadata.maxHoldMs : DEFAULT_MAX_HOLD_MS; + typeof held.metadata.maxHoldMs === "number" + ? held.metadata.maxHoldMs + : DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS; const heldForMs = nowMs - held.acquiredAt; if (heldForMs <= maxHoldMs) { continue; @@ -547,6 +644,8 @@ function inspectLockPayloadForSession(params: { export async function cleanStaleLockFiles(params: { sessionsDir: string; + config?: SessionWriteLockAcquireTimeoutConfig; + env?: NodeJS.ProcessEnv; staleMs?: number; removeStale?: boolean; nowMs?: number; @@ -557,7 +656,10 @@ export async function cleanStaleLockFiles(params: { }; }): Promise<{ locks: SessionLockInspection[]; cleaned: SessionLockInspection[] }> { const sessionsDir = path.resolve(params.sessionsDir); - const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); + const staleMs = resolvePositiveMs( + params.staleMs, + resolveSessionWriteLockStaleMs(params.config, params.env), + ); const removeStale = params.removeStale !== false; const nowMs = params.nowMs ?? Date.now(); const ownerProcessArgsReader = params.readOwnerProcessArgs ?? readProcessArgsSync; @@ -622,11 +724,12 @@ export async function acquireSessionWriteLock(params: { }> { registerCleanupHandlers(); const allowReentrant = params.allowReentrant ?? false; - const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), { + const defaultOptions = resolveSessionWriteLockOptions(); + const timeoutMs = resolvePositiveMs(params.timeoutMs, defaultOptions.timeoutMs, { allowInfinity: true, }); - const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); - const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS); + const staleMs = resolvePositiveMs(params.staleMs, defaultOptions.staleMs); + const maxHoldMs = resolvePositiveMs(params.maxHoldMs, defaultOptions.maxHoldMs); const sessionFile = path.resolve(params.sessionFile); const sessionDir = path.dirname(sessionFile); const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile); diff --git a/src/commands/doctor-session-locks.test.ts b/src/commands/doctor-session-locks.test.ts index 73c49a529e8..e7fc2d9916d 100644 --- a/src/commands/doctor-session-locks.test.ts +++ b/src/commands/doctor-session-locks.test.ts @@ -104,6 +104,30 @@ describe("noteSessionLockHealth", () => { await expect(fs.access(freshLock)).resolves.toBeUndefined(); }); + it("uses configured stale threshold when repairing lock files", async () => { + const sessionsDir = state.sessionsDir(); + await fs.mkdir(sessionsDir, { recursive: true }); + + const configuredStaleLock = path.join(sessionsDir, "configured-stale.jsonl.lock"); + await fs.writeFile( + configuredStaleLock, + JSON.stringify({ pid: process.pid, createdAt: new Date(Date.now() - 45_000).toISOString() }), + "utf8", + ); + + await noteSessionLockHealth({ + shouldRepair: true, + config: { session: { writeLock: { staleMs: 30_000 } } }, + readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"], + }); + + expect(note).toHaveBeenCalledTimes(1); + const [message] = firstNoteCall(); + expect(message).toContain("stale=yes (too-old)"); + expect(message).toContain("[removed]"); + await expectPathMissing(configuredStaleLock); + }); + it("removes fresh live locks when the owner is not an OpenClaw process", async () => { const sessionsDir = state.sessionsDir(); await fs.mkdir(sessionsDir, { recursive: true }); diff --git a/src/commands/doctor-session-locks.ts b/src/commands/doctor-session-locks.ts index c7377783c3f..4db1ab5a52e 100644 --- a/src/commands/doctor-session-locks.ts +++ b/src/commands/doctor-session-locks.ts @@ -1,15 +1,15 @@ import { resolveAgentSessionDirs } from "../agents/session-dirs.js"; import { cleanStaleLockFiles, + resolveSessionWriteLockStaleMs, type SessionLockInspection, type SessionLockOwnerProcessArgsReader, + type SessionWriteLockAcquireTimeoutConfig, } from "../agents/session-write-lock.js"; import { resolveStateDir } from "../config/paths.js"; import { note } from "../terminal/note.js"; import { shortenHomePath } from "../utils.js"; -const DEFAULT_STALE_MS = 30 * 60 * 1000; - function formatAge(ageMs: number | null): string { if (ageMs === null) { return "unknown"; @@ -41,11 +41,13 @@ function formatLockLine(lock: SessionLockInspection): string { export async function noteSessionLockHealth(params?: { shouldRepair?: boolean; + config?: SessionWriteLockAcquireTimeoutConfig; + env?: NodeJS.ProcessEnv; staleMs?: number; readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader; }) { const shouldRepair = params?.shouldRepair === true; - const staleMs = params?.staleMs ?? DEFAULT_STALE_MS; + const staleMs = params?.staleMs ?? resolveSessionWriteLockStaleMs(params?.config, params?.env); let sessionDirs: string[] = []; try { sessionDirs = await resolveAgentSessionDirs(resolveStateDir(process.env)); diff --git a/src/config/schema.help.quality.test.ts b/src/config/schema.help.quality.test.ts index 988ba182c83..52d716690b2 100644 --- a/src/config/schema.help.quality.test.ts +++ b/src/config/schema.help.quality.test.ts @@ -708,10 +708,18 @@ describe("config help copy quality", () => { expect(/raw|unnormalized/i.test(rawKeyPrefix)).toBe(true); }); - it("documents session write-lock acquire timeout defaults", () => { + it("documents session write-lock policy defaults", () => { const acquireTimeout = FIELD_HELP["session.writeLock.acquireTimeoutMs"]; expect(acquireTimeout.includes("60000")).toBe(true); expect(/transcript|lock/i.test(acquireTimeout)).toBe(true); + + const stale = FIELD_HELP["session.writeLock.staleMs"]; + expect(stale.includes("1800000")).toBe(true); + expect(stale.includes("OPENCLAW_SESSION_WRITE_LOCK_STALE_MS")).toBe(true); + + const maxHold = FIELD_HELP["session.writeLock.maxHoldMs"]; + expect(maxHold.includes("300000")).toBe(true); + expect(maxHold.includes("OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS")).toBe(true); }); it("documents session maintenance duration/size examples and deprecations", () => { diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index ca9faf028d6..f7a47e0cdb2 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -1610,9 +1610,13 @@ export const FIELD_HELP: Record = { "session.sendPolicy.rules[].match.rawKeyPrefix": "Matches the raw, unnormalized session-key prefix for exact full-key policy targeting. Use this when normalized keyPrefix is too broad and you need agent-prefixed or transport-specific precision.", "session.writeLock": - "Groups session transcript write-lock acquisition controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default wait.", + "Groups session transcript write-lock controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default policies.", "session.writeLock.acquireTimeoutMs": - "Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; raise for slow disks or long prep/cleanup, lower only when quick failure is preferred.", + "Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; env override: OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS.", + "session.writeLock.staleMs": + "Milliseconds before an existing session transcript lock can be treated as stale and reclaimed. Default: 1800000; env override: OPENCLAW_SESSION_WRITE_LOCK_STALE_MS.", + "session.writeLock.maxHoldMs": + "Milliseconds a held in-process session transcript lock may remain held before the watchdog releases it. Default: 300000; env override: OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS.", "session.agentToAgent": "Groups controls for inter-agent session exchanges, including loop prevention limits on reply chaining. Keep defaults unless you run advanced agent-to-agent automation with strict turn caps.", "session.agentToAgent.maxPingPongTurns": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 942c5ad7623..21b1ca87c23 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -791,6 +791,8 @@ export const FIELD_LABELS: Record = { "session.sendPolicy.rules[].match.rawKeyPrefix": "Session Send Rule Raw Key Prefix", "session.writeLock": "Session Write Lock", "session.writeLock.acquireTimeoutMs": "Session Write Lock Acquire Timeout", + "session.writeLock.staleMs": "Session Write Lock Stale Timeout", + "session.writeLock.maxHoldMs": "Session Write Lock Max Hold", "session.agentToAgent": "Session Agent-to-Agent", "session.agentToAgent.maxPingPongTurns": "Agent-to-Agent Ping-Pong Turns", "session.threadBindings": "Session Thread Bindings", diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index 498c38763a1..2ef32514df5 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -5,7 +5,7 @@ import { StringDecoder } from "node:string_decoder"; import type { AgentMessage } from "@earendil-works/pi-agent-core"; import { acquireSessionWriteLock, - resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, } from "../../agents/session-write-lock.js"; import { redactTranscriptMessage } from "../../agents/transcript-redact.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; @@ -264,7 +264,7 @@ async function appendSessionTranscriptMessageLocked( ): Promise<{ messageId: string; message: TMessage }> { const lock = await acquireSessionWriteLock({ sessionFile: params.transcriptPath, - timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + ...resolveSessionWriteLockOptions(params.config), allowReentrant: true, }); try { diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 5c3cc20f791..3ce5700c987 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -219,6 +219,10 @@ export type SessionConfig = { export type SessionWriteLockConfig = { /** How long to wait while acquiring a session transcript write lock. Default: 60000. */ acquireTimeoutMs?: number; + /** When an existing lock can be treated as stale and reclaimed. Default: 1800000. */ + staleMs?: number; + /** Maximum in-process hold time before the watchdog releases the lock. Default: 300000. */ + maxHoldMs?: number; }; export type SessionMaintenanceMode = "enforce" | "warn"; diff --git a/src/config/zod-schema.session-maintenance-extensions.test.ts b/src/config/zod-schema.session-maintenance-extensions.test.ts index ab71751f0a4..d0430b24be9 100644 --- a/src/config/zod-schema.session-maintenance-extensions.test.ts +++ b/src/config/zod-schema.session-maintenance-extensions.test.ts @@ -6,12 +6,14 @@ describe("SessionSchema maintenance extensions", () => { const result = SessionSchema.safeParse({ writeLock: { acquireTimeoutMs: 60_000, + staleMs: 1_800_000, + maxHoldMs: 300_000, }, }); expect(result.success).toBe(true); }); - it("rejects invalid session write-lock acquire timeout values", () => { + it("rejects invalid session write-lock timeout values", () => { expect(() => SessionSchema.parse({ writeLock: { @@ -19,6 +21,22 @@ describe("SessionSchema maintenance extensions", () => { }, }), ).toThrow(/acquireTimeoutMs|number/i); + + expect(() => + SessionSchema.parse({ + writeLock: { + staleMs: 0, + }, + }), + ).toThrow(/staleMs|number/i); + + expect(() => + SessionSchema.parse({ + writeLock: { + maxHoldMs: 0, + }, + }), + ).toThrow(/maxHoldMs|number/i); }); it("accepts valid maintenance extensions", () => { diff --git a/src/config/zod-schema.session.ts b/src/config/zod-schema.session.ts index 475253ba881..d627778003f 100644 --- a/src/config/zod-schema.session.ts +++ b/src/config/zod-schema.session.ts @@ -59,6 +59,8 @@ export const SessionSchema = z writeLock: z .object({ acquireTimeoutMs: z.number().int().positive().optional(), + staleMs: z.number().int().positive().optional(), + maxHoldMs: z.number().int().positive().optional(), }) .strict() .optional(), diff --git a/src/flows/doctor-health-contributions.ts b/src/flows/doctor-health-contributions.ts index e23ab3ec7eb..8c92b7f5f16 100644 --- a/src/flows/doctor-health-contributions.ts +++ b/src/flows/doctor-health-contributions.ts @@ -344,7 +344,11 @@ async function runCodexSessionRouteHealth(ctx: DoctorHealthFlowContext): Promise async function runSessionLocksHealth(ctx: DoctorHealthFlowContext): Promise { const { noteSessionLockHealth } = await import("../commands/doctor-session-locks.js"); - await noteSessionLockHealth({ shouldRepair: ctx.prompter.shouldRepair }); + await noteSessionLockHealth({ + shouldRepair: ctx.prompter.shouldRepair, + config: ctx.cfg, + env: ctx.env, + }); } async function runSessionTranscriptsHealth(ctx: DoctorHealthFlowContext): Promise { diff --git a/src/gateway/server-startup-post-attach.ts b/src/gateway/server-startup-post-attach.ts index 4b1d7b4902e..332041ca299 100644 --- a/src/gateway/server-startup-post-attach.ts +++ b/src/gateway/server-startup-post-attach.ts @@ -23,7 +23,6 @@ import type { refreshLatestUpdateRestartSentinel } from "./server-restart-sentin import type { logGatewayStartup } from "./server-startup-log.js"; import type { startGatewayTailscaleExposure } from "./server-tailscale.js"; -const SESSION_LOCK_STALE_MS = 30 * 60 * 1000; const ACP_BACKEND_READY_TIMEOUT_MS = 5_000; const ACP_BACKEND_READY_POLL_MS = 50; const PRIMARY_MODEL_PREWARM_TIMEOUT_MS = 5_000; @@ -563,7 +562,7 @@ export async function startGatewaySidecars(params: { for (const sessionsDir of sessionDirs) { const result = await cleanStaleLockFiles({ sessionsDir, - staleMs: SESSION_LOCK_STALE_MS, + config: params.cfg, removeStale: true, log: { warn: (message) => params.log.warn(message) }, }); diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index b705ce441b4..cc5349243d9 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -162,6 +162,7 @@ export { isSubagentSessionKey } from "../routing/session-key.js"; export { acquireSessionWriteLock, resolveSessionWriteLockAcquireTimeoutMs, + resolveSessionWriteLockOptions, type SessionWriteLockAcquireTimeoutConfig, } from "../agents/session-write-lock.js"; export { appendSessionTranscriptMessage } from "../config/sessions/transcript-append.js";