From f7ed29e11812eb3d0d5697979fcb2c099623114f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 14:54:48 +0100 Subject: [PATCH] fix: thread session write-lock timeout config --- CHANGELOG.md | 1 + docs/concepts/agent-loop.md | 3 +- .../session-management-compaction.md | 5 ++ extensions/codex/src/app-server/compact.ts | 1 + .../codex/src/app-server/run-attempt.ts | 3 ++ .../codex/src/app-server/transcript-mirror.ts | 5 +- src/agents/agent-command.ts | 2 + .../command/attempt-execution.cli.test.ts | 2 + src/agents/command/attempt-execution.ts | 13 ++++- src/agents/command/cli-compaction.ts | 1 + .../harness/context-engine-lifecycle.ts | 7 +++ .../pi-embedded-runner/compact.queued.ts | 1 + src/agents/pi-embedded-runner/compact.ts | 2 + .../context-engine-maintenance.test.ts | 48 +++++++++++++++++-- .../context-engine-maintenance.ts | 12 +++++ src/agents/pi-embedded-runner/run.ts | 3 ++ src/agents/pi-embedded-runner/run/attempt.ts | 5 ++ .../tool-result-truncation.ts | 13 ++++- .../transcript-rewrite.test.ts | 1 + .../pi-embedded-runner/transcript-rewrite.ts | 8 +++- src/agents/session-write-lock.test.ts | 16 +++++++ src/agents/session-write-lock.ts | 23 ++++++++- .../reply/dispatch-acp-transcript.runtime.ts | 1 + src/config/config.schema-regressions.test.ts | 12 +++++ src/config/schema.help.quality.test.ts | 6 +++ src/config/schema.help.ts | 4 ++ src/config/schema.labels.ts | 2 + src/config/sessions/transcript-append.ts | 9 +++- src/config/sessions/transcript.ts | 5 ++ src/config/types.base.ts | 7 +++ ...ema.session-maintenance-extensions.test.ts | 20 ++++++++ src/config/zod-schema.session.ts | 6 +++ .../server-methods/chat-transcript-inject.ts | 3 ++ src/gateway/server-methods/chat.ts | 11 ++++- src/infra/outbound/deliver.test.ts | 7 ++- src/infra/outbound/deliver.ts | 1 + .../outbound/outbound-send-service.test.ts | 2 +- src/infra/outbound/outbound-send-service.ts | 1 + src/plugin-sdk/agent-harness-runtime.ts | 6 ++- 39 files changed, 258 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dd987a5c49..27f10972267 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Sessions/transcripts: use one `session.writeLock.acquireTimeoutMs` policy for session transcript lock acquisitions and raise the default wait to 60 seconds, avoiding user-visible lock timeouts during legitimate slow prep, cleanup, compaction, and mirror work. Fixes #75894. Thanks @shandutta. - Control UI: contain the standalone iOS PWA viewport with safe-area-aware document locking, so Add-to-Home-Screen launches cannot scroll past the device bounds. Refs #76072. Thanks @kvncrw. - Agents/restart recovery: match cleaned transcript locks by exact transcript lock paths plus the canonical session fallback, so interrupted main sessions using topic-suffixed transcripts resume after gateway restart. Refs #76052. Thanks @anyech. - Agents/runtime: cache the stable system-prompt prefix and reuse prompt-report tool schema stats during dispatch prep, reducing repeated CPU work before streaming starts. Fixes #75999; supersedes #76061. Thanks @zackchiutw and @STLI69. diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index 002554092d5..a81a0ba3a28 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -50,7 +50,8 @@ wired end-to-end. See [Command Queue](/concepts/queue). - Transcript writes are also protected by a session write lock on the session file. The lock is process-aware and file-based, so it catches writers that bypass the in-process queue or come from - another process. + another process. Session transcript writers wait up to `session.writeLock.acquireTimeoutMs` + before reporting the session as busy; the default is `60000` ms. - Session write locks are non-reentrant by default. If a helper intentionally nests acquisition of the same lock while preserving one logical writer, it must opt in explicitly with `allowReentrant: true`. diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 82040d6a8ef..4b1e8dfa240 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -94,6 +94,11 @@ configured age, count, or disk budget. OpenClaw no longer creates automatic `sessions.json.bak.*` rotation backups during Gateway writes. The legacy `session.maintenance.rotateBytes` key is ignored and `openclaw doctor --fix` removes it from older configs. +Transcript mutations use a session write lock on the transcript file. Lock acquisition waits up to +`session.writeLock.acquireTimeoutMs` before surfacing a busy-session error; the default is `60000` +ms. Raise this only when legitimate prep, cleanup, compaction, or transcript mirror work contends +longer on slow machines. Stale-lock detection and maximum hold warnings remain separate policies. + Enforcement order for disk budget cleanup (`mode: "enforce"`): 1. Remove oldest archived, orphan transcript, or orphan trajectory artifacts first. diff --git a/extensions/codex/src/app-server/compact.ts b/extensions/codex/src/app-server/compact.ts index 651cfbe7f52..7b222ea565d 100644 --- a/extensions/codex/src/app-server/compact.ts +++ b/extensions/codex/src/app-server/compact.ts @@ -74,6 +74,7 @@ export async function maybeCompactCodexAppServerSession( sessionFile: params.sessionFile, reason: "compaction", runtimeContext: params.contextEngineRuntimeContext, + config: params.config, }); } catch (error) { embeddedAgentLog.warn( diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index bfd765f6190..a4b99559948 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -430,6 +430,7 @@ export async function runCodexAppServerAttempt( tokenBudget: params.contextTokenBudget, }), runMaintenance: runHarnessContextEngineMaintenance, + config: params.config, warn: (message) => embeddedAgentLog.warn(message), }); historyMessages = @@ -1178,6 +1179,7 @@ export async function runCodexAppServerAttempt( promptCache: result.promptCache, }), runMaintenance: runHarnessContextEngineMaintenance, + config: params.config, warn: (message) => embeddedAgentLog.warn(message), }); } @@ -1638,6 +1640,7 @@ async function mirrorTranscriptBestEffort(params: { sessionKey: params.sessionKey, messages: params.result.messagesSnapshot, idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`, + config: params.params.config, }); } catch (error) { embeddedAgentLog.warn("failed to mirror codex app-server transcript", { error }); diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index e606ac00fe9..8742ab4d8e4 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -6,8 +6,10 @@ import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-c import { acquireSessionWriteLock, emitSessionTranscriptUpdate, + resolveSessionWriteLockAcquireTimeoutMs, runAgentHarnessBeforeMessageWriteHook, type AgentMessage, + type SessionWriteLockAcquireTimeoutConfig, } from "openclaw/plugin-sdk/agent-harness-runtime"; const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; @@ -25,6 +27,7 @@ export async function mirrorCodexAppServerTranscript(params: { agentId?: string; messages: AgentMessage[]; idempotencyScope?: string; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { const messages = params.messages.filter( (message) => message.role === "user" || message.role === "assistant", @@ -36,7 +39,7 @@ export async function mirrorCodexAppServerTranscript(params: { await fs.mkdir(path.dirname(params.sessionFile), { recursive: true }); const lock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, - timeoutMs: 10_000, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), }); try { const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile); diff --git a/src/agents/agent-command.ts b/src/agents/agent-command.ts index fc1b90a90cd..e23fabe7518 100644 --- a/src/agents/agent-command.ts +++ b/src/agents/agent-command.ts @@ -584,6 +584,7 @@ async function agentCommandInternal( sessionAgentId, threadId: opts.threadId, sessionCwd: resolveAcpSessionCwd(acpResolution.meta) ?? workspaceDir, + config: cfg, }); } catch (error) { log.warn( @@ -1208,6 +1209,7 @@ async function agentCommandInternal( sessionAgentId, threadId: opts.threadId, sessionCwd: workspaceDir, + config: cfg, }); sessionEntry = await ( await loadCliCompactionRuntime() diff --git a/src/agents/command/attempt-execution.cli.test.ts b/src/agents/command/attempt-execution.cli.test.ts index f118f4cfe88..4b56dc3cac6 100644 --- a/src/agents/command/attempt-execution.cli.test.ts +++ b/src/agents/command/attempt-execution.cli.test.ts @@ -387,6 +387,7 @@ describe("CLI attempt execution", () => { storePath, sessionAgentId: "main", sessionCwd: tmpDir, + config: {}, }); const sessionFile = updatedEntry?.sessionFile; @@ -443,6 +444,7 @@ describe("CLI attempt execution", () => { storePath, sessionAgentId: "main", sessionCwd: tmpDir, + config: {}, }); const messages = await readSessionMessages(updatedEntry?.sessionFile ?? ""); diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index e478fe12e78..1ef26a11f0d 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -21,7 +21,10 @@ import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model- import { isCliProvider } from "../model-selection.js"; import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js"; import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js"; -import { acquireSessionWriteLock } from "../session-write-lock.js"; +import { + acquireSessionWriteLock, + resolveSessionWriteLockAcquireTimeoutMs, +} from "../session-write-lock.js"; import { buildWorkspaceSkillSnapshot } from "../skills.js"; import { buildUsageWithNoCost } from "../stream-message-shared.js"; import { @@ -76,6 +79,7 @@ type PersistTextTurnTranscriptParams = { sessionAgentId: string; threadId?: string | number; sessionCwd: string; + config: OpenClawConfig; assistant: { api: string; provider: string; @@ -193,7 +197,7 @@ async function persistTextTurnTranscript( }); const lock = await acquireSessionWriteLock({ sessionFile, - timeoutMs: 10_000, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), allowReentrant: true, }); try { @@ -202,6 +206,7 @@ async function persistTextTurnTranscript( transcriptPath: sessionFile, sessionId: params.sessionId, cwd: params.sessionCwd, + config: params.config, message: { role: "user", content: promptText, @@ -215,6 +220,7 @@ async function persistTextTurnTranscript( transcriptPath: sessionFile, sessionId: params.sessionId, cwd: params.sessionCwd, + config: params.config, message: { role: "assistant", content: [{ type: "text", text: replyText }], @@ -264,6 +270,7 @@ export async function persistAcpTurnTranscript(params: { sessionAgentId: string; threadId?: string | number; sessionCwd: string; + config: OpenClawConfig; }): Promise { return await persistTextTurnTranscript({ ...params, @@ -287,6 +294,7 @@ export async function persistCliTurnTranscript(params: { sessionAgentId: string; threadId?: string | number; sessionCwd: string; + config: OpenClawConfig; }): Promise { const replyText = resolveCliTranscriptReplyText(params.result); const provider = params.result.meta.agentMeta?.provider?.trim() ?? "cli"; @@ -304,6 +312,7 @@ export async function persistCliTurnTranscript(params: { sessionAgentId: params.sessionAgentId, threadId: params.threadId, sessionCwd: params.sessionCwd, + config: params.config, assistant: { api: "cli", provider, diff --git a/src/agents/command/cli-compaction.ts b/src/agents/command/cli-compaction.ts index 7167f6852e4..3cd0c702b41 100644 --- a/src/agents/command/cli-compaction.ts +++ b/src/agents/command/cli-compaction.ts @@ -166,6 +166,7 @@ async function compactCliTranscript(params: { reason: "compaction", sessionManager: params.sessionManager, runtimeContext, + config: params.cfg, }); return true; } diff --git a/src/agents/harness/context-engine-lifecycle.ts b/src/agents/harness/context-engine-lifecycle.ts index 2754d1934de..e0a6c1e650c 100644 --- a/src/agents/harness/context-engine-lifecycle.ts +++ b/src/agents/harness/context-engine-lifecycle.ts @@ -6,6 +6,7 @@ import { buildAfterTurnRuntimeContext, buildAfterTurnRuntimeContextFromUsage, } from "../pi-embedded-runner/run/attempt.prompt-helpers.js"; +import type { SessionWriteLockAcquireTimeoutConfig } from "../session-write-lock.js"; export type HarnessContextEngine = ContextEngine; @@ -21,6 +22,7 @@ export async function bootstrapHarnessContextEngine(params: { sessionManager?: unknown; runtimeContext?: ContextEngineRuntimeContext; runMaintenance?: typeof runHarnessContextEngineMaintenance; + config?: SessionWriteLockAcquireTimeoutConfig; warn: (message: string) => void; }): Promise { if ( @@ -45,6 +47,7 @@ export async function bootstrapHarnessContextEngine(params: { reason: "bootstrap", sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, + config: params.config, }); } catch (bootstrapErr) { params.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`); @@ -97,6 +100,7 @@ export async function finalizeHarnessContextEngineTurn(params: { runtimeContext?: ContextEngineRuntimeContext; runMaintenance?: typeof runHarnessContextEngineMaintenance; sessionManager?: unknown; + config?: SessionWriteLockAcquireTimeoutConfig; warn: (message: string) => void; }) { if (!params.contextEngine) { @@ -165,6 +169,7 @@ export async function finalizeHarnessContextEngineTurn(params: { reason: "turn", sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, + config: params.config, }); } @@ -201,6 +206,7 @@ export async function runHarnessContextEngineMaintenance(params: { sessionManager?: unknown; runtimeContext?: ContextEngineRuntimeContext; executionMode?: "foreground" | "background"; + config?: SessionWriteLockAcquireTimeoutConfig; }) { return await runContextEngineMaintenance({ contextEngine: params.contextEngine, @@ -213,6 +219,7 @@ export async function runHarnessContextEngineMaintenance(params: { >[0]["sessionManager"], runtimeContext: params.runtimeContext, executionMode: params.executionMode, + config: params.config, }); } diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index c35cc4adb45..64b5c25bc34 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -230,6 +230,7 @@ export async function compactEmbeddedPiSession( sessionFile: postCompactionSessionFile, reason: "compaction", runtimeContext, + config: params.config, }); } if (engineOwnsCompaction && result.ok && result.compacted) { diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index c9290525666..71d7390f583 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -90,6 +90,7 @@ import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js"; import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, + resolveSessionWriteLockAcquireTimeoutMs, } from "../session-write-lock.js"; import { detectRuntimeShell } from "../shell-utils.js"; import { @@ -904,6 +905,7 @@ async function compactEmbeddedPiSessionDirectOnce( const compactionTimeoutMs = resolveCompactionTimeoutMs(params.config); const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ timeoutMs: compactionTimeoutMs, }), diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts index ec600053d40..3b9ea9dcdb9 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.test.ts @@ -115,6 +115,7 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => { sessionFile: "/tmp/session.jsonl", sessionId: "session-1", sessionKey: "agent:main:session-1", + config: undefined, request: { replacements: [ { entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } }, @@ -357,6 +358,7 @@ describe("runContextEngineMaintenance", () => { reason: "turn", executionMode: "background", sessionManager, + config: { session: { writeLock: { acquireTimeoutMs: 75_000 } } }, }); expect(rewriteTranscriptEntriesInSessionManagerMock).not.toHaveBeenCalled(); @@ -364,6 +366,7 @@ describe("runContextEngineMaintenance", () => { sessionFile: "/tmp/session-background-file-rewrite.jsonl", sessionId: "session-background-file-rewrite", sessionKey: "agent:main:session-background-file-rewrite", + config: { session: { writeLock: { acquireTimeoutMs: 75_000 } } }, request: { replacements: [ { @@ -397,11 +400,27 @@ describe("runContextEngineMaintenance", () => { }); await Promise.resolve(); - const maintain = vi.fn(async (_params?: unknown) => ({ - changed: false, - bytesFreed: 0, - rewrittenEntries: 0, - })); + const maintain = vi.fn(async (params?: unknown) => { + await ( + params as { runtimeContext?: ContextEngineRuntimeContext } | undefined + )?.runtimeContext?.rewriteTranscriptEntries?.({ + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }); + return { + changed: false, + bytesFreed: 0, + rewrittenEntries: 0, + }; + }); const backgroundEngine = { info: { @@ -429,6 +448,7 @@ describe("runContextEngineMaintenance", () => { tokenBudget: 2048, currentTokenCount: 1536, }, + config: { session: { writeLock: { acquireTimeoutMs: 91_000 } } }, }); expect(result).toBeUndefined(); @@ -461,6 +481,24 @@ describe("runContextEngineMaintenance", () => { currentTokenCount: 1536, }), }); + expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({ + sessionFile: "/tmp/session.jsonl", + sessionId: "session-1", + sessionKey, + config: { session: { writeLock: { acquireTimeoutMs: 91_000 } } }, + request: { + replacements: [ + { + entryId: "entry-1", + message: castAgentMessage({ + role: "assistant", + content: [{ type: "text", text: "done" }], + timestamp: 2, + }), + }, + ], + }, + }); const completedTask = getTaskById(queuedTasks[0].taskId); expect(completedTask).toMatchObject({ diff --git a/src/agents/pi-embedded-runner/context-engine-maintenance.ts b/src/agents/pi-embedded-runner/context-engine-maintenance.ts index 54903920e88..949dfc357c8 100644 --- a/src/agents/pi-embedded-runner/context-engine-maintenance.ts +++ b/src/agents/pi-embedded-runner/context-engine-maintenance.ts @@ -22,6 +22,7 @@ import { updateTaskNotifyPolicyForOwner, } from "../../tasks/task-owner-access.js"; import { findActiveSessionTask } from "../session-async-task-status.js"; +import type { SessionWriteLockAcquireTimeoutConfig } from "../session-write-lock.js"; import { resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { @@ -45,6 +46,7 @@ type DeferredTurnMaintenanceScheduleParams = { sessionFile: string; sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; + config?: SessionWriteLockAcquireTimeoutConfig; }; type DeferredTurnMaintenanceRunState = { @@ -275,6 +277,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { runtimeContext?: ContextEngineRuntimeContext; allowDeferredCompactionExecution?: boolean; deferTranscriptRewriteToSessionLane?: boolean; + config?: SessionWriteLockAcquireTimeoutConfig; }): ContextEngineRuntimeContext { return { ...params.runtimeContext, @@ -291,6 +294,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: { sessionFile: params.sessionFile, sessionId: params.sessionId, sessionKey: params.sessionKey, + config: params.config, request, }); const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId); @@ -314,6 +318,7 @@ async function executeContextEngineMaintenance(params: { sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; executionMode: "foreground" | "background"; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { if (typeof params.contextEngine.maintain !== "function") { return undefined; @@ -330,6 +335,7 @@ async function executeContextEngineMaintenance(params: { runtimeContext: params.runtimeContext, allowDeferredCompactionExecution: params.executionMode === "background", deferTranscriptRewriteToSessionLane: params.executionMode === "background", + config: params.config, }), }); if (result.changed) { @@ -350,6 +356,7 @@ async function runDeferredTurnMaintenanceWorker(params: { sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; runId: string; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { let surfacedUserNotice = false; let longRunningTimer: ReturnType | null = null; @@ -428,6 +435,7 @@ async function runDeferredTurnMaintenanceWorker(params: { reason: "turn", sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, + config: params.config, executionMode: "background", }); if (longRunningTimer) { @@ -550,6 +558,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule sessionFile: params.sessionFile, sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, + config: params.config, runId: task.runId!, }), ); @@ -606,6 +615,7 @@ export async function runContextEngineMaintenance(params: { sessionManager?: Parameters[0]["sessionManager"]; runtimeContext?: ContextEngineRuntimeContext; executionMode?: "foreground" | "background"; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { if (typeof params.contextEngine?.maintain !== "function") { return undefined; @@ -626,6 +636,7 @@ export async function runContextEngineMaintenance(params: { sessionFile: params.sessionFile, sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, + config: params.config, }); } catch (err) { log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`); @@ -643,6 +654,7 @@ export async function runContextEngineMaintenance(params: { sessionManager: params.sessionManager, runtimeContext: params.runtimeContext, executionMode, + config: params.config, }); } catch (err) { log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 2011980dcb0..99bf4e3f6f4 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1480,6 +1480,7 @@ export async function runEmbeddedPiAgent( sessionFile: activeSessionFile, reason: "compaction", runtimeContext: overflowCompactionRuntimeContext, + config: params.config, }); } } catch (compactErr) { @@ -1513,6 +1514,7 @@ export async function runEmbeddedPiAgent( }), sessionId: activeSessionId, sessionKey: params.sessionKey, + config: params.config, }); if (truncResult.truncated) { log.info( @@ -1564,6 +1566,7 @@ export async function runEmbeddedPiAgent( maxCharsOverride: toolResultMaxChars, sessionId: activeSessionId, sessionKey: params.sessionKey, + config: params.config, }); if (truncResult.truncated) { log.info( diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 8ae7d8718bc..02be76253f3 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -139,6 +139,7 @@ import { import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, + resolveSessionWriteLockAcquireTimeoutMs, } from "../../session-write-lock.js"; import { detectRuntimeShell } from "../../shell-utils.js"; import { @@ -1381,6 +1382,7 @@ export async function runEmbeddedAttempt( // from taking over the same session when a gateway run stalls before model I/O. const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), maxHoldMs: resolveSessionLockMaxHoldFromTimeout({ timeoutMs: resolveRunTimeoutWithCompactionGraceMs({ runTimeoutMs: params.timeoutMs, @@ -1453,6 +1455,7 @@ export async function runEmbeddedAttempt( reason: contextParams.reason, sessionManager: contextParams.sessionManager as never, runtimeContext: contextParams.runtimeContext, + config: params.config, }), warn: (message) => log.warn(message), }); @@ -3244,8 +3247,10 @@ export async function runEmbeddedAttempt( reason: contextParams.reason, sessionManager: contextParams.sessionManager as never, runtimeContext: contextParams.runtimeContext, + config: params.config, }), sessionManager, + config: params.config, warn: (message) => log.warn(message), }); } diff --git a/src/agents/pi-embedded-runner/tool-result-truncation.ts b/src/agents/pi-embedded-runner/tool-result-truncation.ts index aec21afc018..ba5fa8d219a 100644 --- a/src/agents/pi-embedded-runner/tool-result-truncation.ts +++ b/src/agents/pi-embedded-runner/tool-result-truncation.ts @@ -6,7 +6,11 @@ import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js"; import { resolveAgentContextLimits } from "../agent-scope.js"; -import { acquireSessionWriteLock } from "../session-write-lock.js"; +import { + acquireSessionWriteLock, + type SessionWriteLockAcquireTimeoutConfig, + resolveSessionWriteLockAcquireTimeoutMs, +} from "../session-write-lock.js"; import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js"; import { log } from "./logger.js"; import { @@ -679,6 +683,7 @@ async function truncateOversizedToolResultsInTranscriptState(params: { maxCharsOverride?: number; sessionId?: string; sessionKey?: string; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> { const { state, contextWindowTokens } = params; const maxChars = Math.max( @@ -758,12 +763,16 @@ export async function truncateOversizedToolResultsInSession(params: { maxCharsOverride?: number; sessionId?: string; sessionKey?: string; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> { const { sessionFile, contextWindowTokens } = params; let sessionLock: Awaited> | undefined; try { - sessionLock = await acquireSessionWriteLock({ sessionFile }); + sessionLock = await acquireSessionWriteLock({ + sessionFile, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), + }); const state = await readTranscriptFileState(sessionFile); return await truncateOversizedToolResultsInTranscriptState({ state, diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts index d1ff459375c..bbb76340ce5 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.test.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.test.ts @@ -327,6 +327,7 @@ describe("rewriteTranscriptEntriesInSessionFile", () => { expect(result.changed).toBe(true); expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ sessionFile, + timeoutMs: 60_000, }); expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ sessionFile }); diff --git a/src/agents/pi-embedded-runner/transcript-rewrite.ts b/src/agents/pi-embedded-runner/transcript-rewrite.ts index 011e711a1c8..3209d432dd9 100644 --- a/src/agents/pi-embedded-runner/transcript-rewrite.ts +++ b/src/agents/pi-embedded-runner/transcript-rewrite.ts @@ -8,7 +8,11 @@ import type { import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { getRawSessionAppendMessage } from "../session-raw-append-message.js"; -import { acquireSessionWriteLock } from "../session-write-lock.js"; +import { + acquireSessionWriteLock, + type SessionWriteLockAcquireTimeoutConfig, + resolveSessionWriteLockAcquireTimeoutMs, +} from "../session-write-lock.js"; import { log } from "./logger.js"; import { persistTranscriptStateMutation, @@ -356,11 +360,13 @@ export async function rewriteTranscriptEntriesInSessionFile(params: { sessionId?: string; sessionKey?: string; request: TranscriptRewriteRequest; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { let sessionLock: Awaited> | undefined; try { sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), }); const state = await readTranscriptFileState(params.sessionFile); const result = rewriteTranscriptEntriesInState({ diff --git a/src/agents/session-write-lock.test.ts b/src/agents/session-write-lock.test.ts index fae3e8b4e0b..9893047a113 100644 --- a/src/agents/session-write-lock.test.ts +++ b/src/agents/session-write-lock.test.ts @@ -9,6 +9,7 @@ let acquireSessionWriteLock: typeof import("./session-write-lock.js").acquireSes let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLockFiles; let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest; let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout; +let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs; vi.mock("../shared/pid-alive.js", async () => { const original = @@ -133,6 +134,7 @@ describe("acquireSessionWriteLock", () => { cleanStaleLockFiles, resetSessionWriteLockStateForTest, resolveSessionLockMaxHoldFromTimeout, + resolveSessionWriteLockAcquireTimeoutMs, } = await import("./session-write-lock.js")); }); @@ -335,6 +337,20 @@ describe("acquireSessionWriteLock", () => { expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 1_000, minMs: 5_000 })).toBe(121_000); }); + it("resolves the session write-lock acquire timeout", () => { + expect(resolveSessionWriteLockAcquireTimeoutMs()).toBe(60_000); + expect( + resolveSessionWriteLockAcquireTimeoutMs({ + session: { writeLock: { acquireTimeoutMs: 90_000 } }, + }), + ).toBe(90_000); + expect( + resolveSessionWriteLockAcquireTimeoutMs({ + session: { writeLock: { acquireTimeoutMs: 0 } }, + }), + ).toBe(60_000); + }); + it("clamps max hold for effectively no-timeout runs", () => { expect( resolveSessionLockMaxHoldFromTimeout({ diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts index 10298681246..e15e812c4e0 100644 --- a/src/agents/session-write-lock.ts +++ b/src/agents/session-write-lock.ts @@ -48,6 +48,7 @@ const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState"); const DEFAULT_STALE_MS = 30 * 60 * 1000; const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000; +export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000; const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000; const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000; // A payload-less lock can be left behind if shutdown lands between open("wx") @@ -74,6 +75,24 @@ type LockInspectionDetails = Pick< const HELD_LOCKS = resolveProcessScopedMap(HELD_LOCKS_KEY); +export type SessionWriteLockAcquireTimeoutConfig = { + session?: { + writeLock?: { + acquireTimeoutMs?: number; + }; + }; +}; + +export function resolveSessionWriteLockAcquireTimeoutMs( + config?: SessionWriteLockAcquireTimeoutConfig, +): number { + return resolvePositiveMs( + config?.session?.writeLock?.acquireTimeoutMs, + DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS, + { allowInfinity: true }, + ); +} + function resolveCleanupState(): CleanupState { const proc = process as NodeJS.Process & { [CLEANUP_STATE_KEY]?: CleanupState; @@ -563,7 +582,9 @@ export async function acquireSessionWriteLock(params: { }> { registerCleanupHandlers(); const allowReentrant = params.allowReentrant ?? false; - const timeoutMs = resolvePositiveMs(params.timeoutMs, 10_000, { allowInfinity: true }); + const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), { + allowInfinity: true, + }); const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS); const sessionFile = path.resolve(params.sessionFile); diff --git a/src/auto-reply/reply/dispatch-acp-transcript.runtime.ts b/src/auto-reply/reply/dispatch-acp-transcript.runtime.ts index 1c517475f76..9d80dc0c035 100644 --- a/src/auto-reply/reply/dispatch-acp-transcript.runtime.ts +++ b/src/auto-reply/reply/dispatch-acp-transcript.runtime.ts @@ -52,5 +52,6 @@ export async function persistAcpDispatchTranscript(params: { sessionAgentId, threadId: params.threadId, sessionCwd: resolveAcpSessionCwd(params.meta) ?? process.cwd(), + config: params.cfg, }); } diff --git a/src/config/config.schema-regressions.test.ts b/src/config/config.schema-regressions.test.ts index 34f03e7c614..6bfade346c8 100644 --- a/src/config/config.schema-regressions.test.ts +++ b/src/config/config.schema-regressions.test.ts @@ -2,6 +2,18 @@ import { describe, expect, it } from "vitest"; import { validateConfigObject } from "./validation.js"; describe("config schema regressions", () => { + it("accepts session write-lock acquire timeout", () => { + const res = validateConfigObject({ + session: { + writeLock: { + acquireTimeoutMs: 60_000, + }, + }, + }); + + expect(res.ok).toBe(true); + }); + it('accepts memorySearch fallback "voyage"', () => { const res = validateConfigObject({ agents: { diff --git a/src/config/schema.help.quality.test.ts b/src/config/schema.help.quality.test.ts index 3646d5ef4fe..ee95f2abb71 100644 --- a/src/config/schema.help.quality.test.ts +++ b/src/config/schema.help.quality.test.ts @@ -689,6 +689,12 @@ describe("config help copy quality", () => { expect(/raw|unnormalized/i.test(rawKeyPrefix)).toBe(true); }); + it("documents session write-lock acquire timeout defaults", () => { + const acquireTimeout = FIELD_HELP["session.writeLock.acquireTimeoutMs"]; + expect(acquireTimeout.includes("60000")).toBe(true); + expect(/transcript|lock/i.test(acquireTimeout)).toBe(true); + }); + it("documents session maintenance duration/size examples and deprecations", () => { const pruneAfter = FIELD_HELP["session.maintenance.pruneAfter"]; expect(pruneAfter.includes("30d")).toBe(true); diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index ea4b5548a85..636d2026eab 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -1447,6 +1447,10 @@ export const FIELD_HELP: Record = { "Matches a normalized session-key prefix after internal key normalization steps in policy consumers. Use this for general prefix controls, and prefer rawKeyPrefix when exact full-key matching is required.", "session.sendPolicy.rules[].match.rawKeyPrefix": "Matches the raw, unnormalized session-key prefix for exact full-key policy targeting. Use this when normalized keyPrefix is too broad and you need agent-prefixed or transport-specific precision.", + "session.writeLock": + "Groups session transcript write-lock acquisition controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default wait.", + "session.writeLock.acquireTimeoutMs": + "Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; raise for slow disks or long prep/cleanup, lower only when quick failure is preferred.", "session.agentToAgent": "Groups controls for inter-agent session exchanges, including loop prevention limits on reply chaining. Keep defaults unless you run advanced agent-to-agent automation with strict turn caps.", "session.agentToAgent.maxPingPongTurns": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 4684a0dac0b..8b5587dade0 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -716,6 +716,8 @@ export const FIELD_LABELS: Record = { "session.sendPolicy.rules[].match.chatType": "Session Send Rule Chat Type", "session.sendPolicy.rules[].match.keyPrefix": "Session Send Rule Key Prefix", "session.sendPolicy.rules[].match.rawKeyPrefix": "Session Send Rule Raw Key Prefix", + "session.writeLock": "Session Write Lock", + "session.writeLock.acquireTimeoutMs": "Session Write Lock Acquire Timeout", "session.agentToAgent": "Session Agent-to-Agent", "session.agentToAgent.maxPingPongTurns": "Agent-to-Agent Ping-Pong Turns", "session.threadBindings": "Session Thread Bindings", diff --git a/src/config/sessions/transcript-append.ts b/src/config/sessions/transcript-append.ts index f7a2d8e49cc..8b505183728 100644 --- a/src/config/sessions/transcript-append.ts +++ b/src/config/sessions/transcript-append.ts @@ -3,7 +3,11 @@ import fs from "node:fs/promises"; import path from "node:path"; import { StringDecoder } from "node:string_decoder"; import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; -import { acquireSessionWriteLock } from "../../agents/session-write-lock.js"; +import { + acquireSessionWriteLock, + type SessionWriteLockAcquireTimeoutConfig, + resolveSessionWriteLockAcquireTimeoutMs, +} from "../../agents/session-write-lock.js"; const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024; const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024; @@ -188,10 +192,11 @@ export async function appendSessionTranscriptMessage(params: { sessionId?: string; cwd?: string; useRawWhenLinear?: boolean; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise<{ messageId: string }> { const lock = await acquireSessionWriteLock({ sessionFile: params.transcriptPath, - timeoutMs: 10_000, + timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config), allowReentrant: true, }); try { diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index b0d65b89a75..a3d2ea63481 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import path from "node:path"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { SessionWriteLockAcquireTimeoutConfig } from "../../agents/session-write-lock.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { extractAssistantVisibleText } from "../../shared/chat-message-content.js"; @@ -164,6 +165,7 @@ export async function appendAssistantMessageToSessionTranscript(params: { /** Optional override for store path (mostly for tests). */ storePath?: string; updateMode?: SessionTranscriptUpdateMode; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { const sessionKey = params.sessionKey.trim(); if (!sessionKey) { @@ -184,6 +186,7 @@ export async function appendAssistantMessageToSessionTranscript(params: { storePath: params.storePath, idempotencyKey: params.idempotencyKey, updateMode: params.updateMode, + config: params.config, message: { role: "assistant" as const, content: [{ type: "text", text: mirrorText }], @@ -217,6 +220,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { idempotencyKey?: string; storePath?: string; updateMode?: SessionTranscriptUpdateMode; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { const sessionKey = params.sessionKey.trim(); if (!sessionKey) { @@ -283,6 +287,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { const { messageId } = await appendSessionTranscriptMessage({ transcriptPath: sessionFile, message, + config: params.config, }); switch (params.updateMode ?? "inline") { diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 8523fe53bc3..29cc00e9007 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -183,6 +183,8 @@ export type SessionConfig = { typingMode?: TypingMode; mainKey?: string; sendPolicy?: SessionSendPolicyConfig; + /** Session transcript write-lock acquisition policy. */ + writeLock?: SessionWriteLockConfig; agentToAgent?: { /** Max ping-pong turns between requester/target (0–5). Default: 5. */ maxPingPongTurns?: number; @@ -193,6 +195,11 @@ export type SessionConfig = { maintenance?: SessionMaintenanceConfig; }; +export type SessionWriteLockConfig = { + /** How long to wait while acquiring a session transcript write lock. Default: 60000. */ + acquireTimeoutMs?: number; +}; + export type SessionMaintenanceMode = "enforce" | "warn"; export type SessionMaintenanceConfig = { diff --git a/src/config/zod-schema.session-maintenance-extensions.test.ts b/src/config/zod-schema.session-maintenance-extensions.test.ts index 6efe8b39907..a30590bc655 100644 --- a/src/config/zod-schema.session-maintenance-extensions.test.ts +++ b/src/config/zod-schema.session-maintenance-extensions.test.ts @@ -2,6 +2,26 @@ import { describe, expect, it } from "vitest"; import { SessionSchema } from "./zod-schema.session.js"; describe("SessionSchema maintenance extensions", () => { + it("accepts session write-lock acquire timeout", () => { + expect(() => + SessionSchema.parse({ + writeLock: { + acquireTimeoutMs: 60_000, + }, + }), + ).not.toThrow(); + }); + + it("rejects invalid session write-lock acquire timeout values", () => { + expect(() => + SessionSchema.parse({ + writeLock: { + acquireTimeoutMs: 0, + }, + }), + ).toThrow(/acquireTimeoutMs|number/i); + }); + it("accepts valid maintenance extensions", () => { expect(() => SessionSchema.parse({ diff --git a/src/config/zod-schema.session.ts b/src/config/zod-schema.session.ts index 191d58f07c0..98c6e801044 100644 --- a/src/config/zod-schema.session.ts +++ b/src/config/zod-schema.session.ts @@ -55,6 +55,12 @@ export const SessionSchema = z typingMode: TypingModeSchema.optional(), mainKey: z.string().optional(), sendPolicy: SessionSendPolicySchema.optional(), + writeLock: z + .object({ + acquireTimeoutMs: z.number().int().positive().optional(), + }) + .strict() + .optional(), agentToAgent: z .object({ maxPingPongTurns: z.number().int().min(0).max(5).optional(), diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index 94c761ee35d..ef5299b322a 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -1,4 +1,5 @@ import type { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { SessionWriteLockAcquireTimeoutConfig } from "../../agents/session-write-lock.js"; import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js"; import { formatErrorMessage } from "../../infra/errors.js"; import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; @@ -51,6 +52,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: { idempotencyKey?: string; abortMeta?: GatewayInjectedAbortMeta; now?: number; + config?: SessionWriteLockAcquireTimeoutConfig; }): Promise { const now = params.now ?? Date.now(); const usage = { @@ -106,6 +108,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: { message: messageBody, now, useRawWhenLinear: true, + config: params.config, }); emitSessionTranscriptUpdate({ sessionFile: params.transcriptPath, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 9555554f4ea..662c4fd2367 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -956,6 +956,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: { sessionKey: string; message: string; savedImages: SavedMedia[]; + cfg: OpenClawConfig; }) { const mediaFields = resolveChatSendTranscriptMediaFields(params.savedImages); if (!("MediaPath" in mediaFields)) { @@ -990,6 +991,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: { await rewriteTranscriptEntriesInSessionFile({ sessionFile: params.transcriptPath, sessionKey: params.sessionKey, + config: params.cfg, request: { replacements: [ { @@ -1319,6 +1321,7 @@ async function appendAssistantTranscriptMessage(params: { origin: AbortOrigin; runId: string; }; + cfg?: OpenClawConfig; }): Promise { const transcriptPath = resolveTranscriptPath({ sessionId: params.sessionId, @@ -1357,6 +1360,7 @@ async function appendAssistantTranscriptMessage(params: { content: params.content, idempotencyKey: params.idempotencyKey, abortMeta: params.abortMeta, + config: params.cfg, }); } @@ -1393,7 +1397,7 @@ async function persistAbortedPartials(params: { if (params.snapshots.length === 0) { return; } - const { storePath, entry } = loadSessionEntry(params.sessionKey); + const { cfg, storePath, entry } = loadSessionEntry(params.sessionKey); for (const snapshot of params.snapshots) { const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId; const appended = await appendAssistantTranscriptMessage({ @@ -1403,6 +1407,7 @@ async function persistAbortedPartials(params: { sessionFile: entry?.sessionFile, createIfMissing: true, idempotencyKey: `${snapshot.runId}:assistant`, + cfg, abortMeta: { aborted: true, origin: snapshot.abortOrigin, @@ -2221,6 +2226,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionKey, message: parsedMessage, savedImages: await persistedImagesPromise, + cfg, }); }; const appendWebchatAgentMediaTranscriptIfNeeded = async (payload: ReplyPayload) => { @@ -2284,6 +2290,7 @@ export const chatHandlers: GatewayRequestHandlers = { agentId, createIfMissing: true, idempotencyKey: `${clientRunId}:assistant-media`, + cfg, }); if (appended.ok) { if (appended.messageId && assistantContent?.length) { @@ -2490,6 +2497,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionFile: latestEntry?.sessionFile, agentId, createIfMissing: true, + cfg, }); if (appended.ok) { if (appended.messageId && assistantContent?.length) { @@ -2646,6 +2654,7 @@ export const chatHandlers: GatewayRequestHandlers = { sessionFile: entry?.sessionFile, agentId: resolveSessionAgentId({ sessionKey, config: cfg }), createIfMissing: true, + cfg, }); if (!appended.ok || !appended.messageId || !appended.message) { respond( diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 60abf0a418e..18e5c539c28 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -973,8 +973,9 @@ describe("deliverOutboundPayloads", () => { "fmt:hello **boss**:2", ]); + const cfg = { channels: { line: {} } } as OpenClawConfig; await deliverOutboundPayloads({ - cfg: { channels: { line: {} } } as OpenClawConfig, + cfg, channel: "line", to: "U123", payloads: [{ text: "photo", mediaUrl: "file:///tmp/f.png" }], @@ -1716,8 +1717,9 @@ describe("deliverOutboundPayloads", () => { ); mocks.appendAssistantMessageToSessionTranscript.mockClear(); + const cfg = { channels: { line: {} } } as OpenClawConfig; await deliverOutboundPayloads({ - cfg: { channels: { line: {} } } as OpenClawConfig, + cfg, channel: "line", to: "U123", payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }], @@ -1733,6 +1735,7 @@ describe("deliverOutboundPayloads", () => { expect.objectContaining({ text: "report.pdf", idempotencyKey: "idem-deliver-1", + config: cfg, }), ); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index ab2f659a845..b5976da781d 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -1299,6 +1299,7 @@ async function deliverOutboundPayloadsCore( sessionKey: params.mirror.sessionKey, text: mirrorText, idempotencyKey: params.mirror.idempotencyKey, + config: params.cfg, }); } } diff --git a/src/infra/outbound/outbound-send-service.test.ts b/src/infra/outbound/outbound-send-service.test.ts index bda0afd6737..790b48e1bfe 100644 --- a/src/infra/outbound/outbound-send-service.test.ts +++ b/src/infra/outbound/outbound-send-service.test.ts @@ -116,7 +116,7 @@ describe("executeSendAction", () => { }>, ) { expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith( - expect.objectContaining(expected), + expect.objectContaining({ ...expected, config: {} }), ); } diff --git a/src/infra/outbound/outbound-send-service.ts b/src/infra/outbound/outbound-send-service.ts index 64ed1d78a0c..ad254b8dc9b 100644 --- a/src/infra/outbound/outbound-send-service.ts +++ b/src/infra/outbound/outbound-send-service.ts @@ -159,6 +159,7 @@ export async function executeSendAction(params: { text: mirrorText, mediaUrls: mirrorMediaUrls, idempotencyKey: params.ctx.mirror.idempotencyKey, + config: params.ctx.cfg, }); }, }); diff --git a/src/plugin-sdk/agent-harness-runtime.ts b/src/plugin-sdk/agent-harness-runtime.ts index 60ac91280fb..fda963b4bbb 100644 --- a/src/plugin-sdk/agent-harness-runtime.ts +++ b/src/plugin-sdk/agent-harness-runtime.ts @@ -106,7 +106,11 @@ export { export { normalizeProviderToolSchemas } from "../agents/pi-embedded-runner/tool-schema-runtime.js"; export { resolveSandboxContext } from "../agents/sandbox.js"; export { isSubagentSessionKey } from "../routing/session-key.js"; -export { acquireSessionWriteLock } from "../agents/session-write-lock.js"; +export { + acquireSessionWriteLock, + resolveSessionWriteLockAcquireTimeoutMs, + type SessionWriteLockAcquireTimeoutConfig, +} from "../agents/session-write-lock.js"; export { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; export { isToolWrappedWithBeforeToolCallHook,