mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:30:43 +00:00
fix: thread session write-lock timeout config
This commit is contained in:
@@ -43,6 +43,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Sessions/transcripts: use one `session.writeLock.acquireTimeoutMs` policy for session transcript lock acquisitions and raise the default wait to 60 seconds, avoiding user-visible lock timeouts during legitimate slow prep, cleanup, compaction, and mirror work. Fixes #75894. Thanks @shandutta.
|
||||
- Control UI: contain the standalone iOS PWA viewport with safe-area-aware document locking, so Add-to-Home-Screen launches cannot scroll past the device bounds. Refs #76072. Thanks @kvncrw.
|
||||
- Agents/restart recovery: match cleaned transcript locks by exact transcript lock paths plus the canonical session fallback, so interrupted main sessions using topic-suffixed transcripts resume after gateway restart. Refs #76052. Thanks @anyech.
|
||||
- Agents/runtime: cache the stable system-prompt prefix and reuse prompt-report tool schema stats during dispatch prep, reducing repeated CPU work before streaming starts. Fixes #75999; supersedes #76061. Thanks @zackchiutw and @STLI69.
|
||||
|
||||
@@ -50,7 +50,8 @@ wired end-to-end.
|
||||
See [Command Queue](/concepts/queue).
|
||||
- Transcript writes are also protected by a session write lock on the session file. The lock is
|
||||
process-aware and file-based, so it catches writers that bypass the in-process queue or come from
|
||||
another process.
|
||||
another process. Session transcript writers wait up to `session.writeLock.acquireTimeoutMs`
|
||||
before reporting the session as busy; the default is `60000` ms.
|
||||
- Session write locks are non-reentrant by default. If a helper intentionally nests acquisition of
|
||||
the same lock while preserving one logical writer, it must opt in explicitly with
|
||||
`allowReentrant: true`.
|
||||
|
||||
@@ -94,6 +94,11 @@ configured age, count, or disk budget.
|
||||
|
||||
OpenClaw no longer creates automatic `sessions.json.bak.*` rotation backups during Gateway writes. The legacy `session.maintenance.rotateBytes` key is ignored and `openclaw doctor --fix` removes it from older configs.
|
||||
|
||||
Transcript mutations use a session write lock on the transcript file. Lock acquisition waits up to
|
||||
`session.writeLock.acquireTimeoutMs` before surfacing a busy-session error; the default is `60000`
|
||||
ms. Raise this only when legitimate prep, cleanup, compaction, or transcript mirror work contends
|
||||
longer on slow machines. Stale-lock detection and maximum hold warnings remain separate policies.
|
||||
|
||||
Enforcement order for disk budget cleanup (`mode: "enforce"`):
|
||||
|
||||
1. Remove oldest archived, orphan transcript, or orphan trajectory artifacts first.
|
||||
|
||||
@@ -74,6 +74,7 @@ export async function maybeCompactCodexAppServerSession(
|
||||
sessionFile: params.sessionFile,
|
||||
reason: "compaction",
|
||||
runtimeContext: params.contextEngineRuntimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
} catch (error) {
|
||||
embeddedAgentLog.warn(
|
||||
|
||||
@@ -430,6 +430,7 @@ export async function runCodexAppServerAttempt(
|
||||
tokenBudget: params.contextTokenBudget,
|
||||
}),
|
||||
runMaintenance: runHarnessContextEngineMaintenance,
|
||||
config: params.config,
|
||||
warn: (message) => embeddedAgentLog.warn(message),
|
||||
});
|
||||
historyMessages =
|
||||
@@ -1178,6 +1179,7 @@ export async function runCodexAppServerAttempt(
|
||||
promptCache: result.promptCache,
|
||||
}),
|
||||
runMaintenance: runHarnessContextEngineMaintenance,
|
||||
config: params.config,
|
||||
warn: (message) => embeddedAgentLog.warn(message),
|
||||
});
|
||||
}
|
||||
@@ -1638,6 +1640,7 @@ async function mirrorTranscriptBestEffort(params: {
|
||||
sessionKey: params.sessionKey,
|
||||
messages: params.result.messagesSnapshot,
|
||||
idempotencyScope: `codex-app-server:${params.threadId}:${params.turnId}`,
|
||||
config: params.params.config,
|
||||
});
|
||||
} catch (error) {
|
||||
embeddedAgentLog.warn("failed to mirror codex app-server transcript", { error });
|
||||
|
||||
@@ -6,8 +6,10 @@ import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-c
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
emitSessionTranscriptUpdate,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
runAgentHarnessBeforeMessageWriteHook,
|
||||
type AgentMessage,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "openclaw/plugin-sdk/agent-harness-runtime";
|
||||
|
||||
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
|
||||
@@ -25,6 +27,7 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
agentId?: string;
|
||||
messages: AgentMessage[];
|
||||
idempotencyScope?: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<void> {
|
||||
const messages = params.messages.filter(
|
||||
(message) => message.role === "user" || message.role === "assistant",
|
||||
@@ -36,7 +39,7 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
await fs.mkdir(path.dirname(params.sessionFile), { recursive: true });
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: 10_000,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
});
|
||||
try {
|
||||
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
|
||||
|
||||
@@ -584,6 +584,7 @@ async function agentCommandInternal(
|
||||
sessionAgentId,
|
||||
threadId: opts.threadId,
|
||||
sessionCwd: resolveAcpSessionCwd(acpResolution.meta) ?? workspaceDir,
|
||||
config: cfg,
|
||||
});
|
||||
} catch (error) {
|
||||
log.warn(
|
||||
@@ -1208,6 +1209,7 @@ async function agentCommandInternal(
|
||||
sessionAgentId,
|
||||
threadId: opts.threadId,
|
||||
sessionCwd: workspaceDir,
|
||||
config: cfg,
|
||||
});
|
||||
sessionEntry = await (
|
||||
await loadCliCompactionRuntime()
|
||||
|
||||
@@ -387,6 +387,7 @@ describe("CLI attempt execution", () => {
|
||||
storePath,
|
||||
sessionAgentId: "main",
|
||||
sessionCwd: tmpDir,
|
||||
config: {},
|
||||
});
|
||||
|
||||
const sessionFile = updatedEntry?.sessionFile;
|
||||
@@ -443,6 +444,7 @@ describe("CLI attempt execution", () => {
|
||||
storePath,
|
||||
sessionAgentId: "main",
|
||||
sessionCwd: tmpDir,
|
||||
config: {},
|
||||
});
|
||||
|
||||
const messages = await readSessionMessages(updatedEntry?.sessionFile ?? "");
|
||||
|
||||
@@ -21,7 +21,10 @@ import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-
|
||||
import { isCliProvider } from "../model-selection.js";
|
||||
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
|
||||
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
|
||||
import { acquireSessionWriteLock } from "../session-write-lock.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../session-write-lock.js";
|
||||
import { buildWorkspaceSkillSnapshot } from "../skills.js";
|
||||
import { buildUsageWithNoCost } from "../stream-message-shared.js";
|
||||
import {
|
||||
@@ -76,6 +79,7 @@ type PersistTextTurnTranscriptParams = {
|
||||
sessionAgentId: string;
|
||||
threadId?: string | number;
|
||||
sessionCwd: string;
|
||||
config: OpenClawConfig;
|
||||
assistant: {
|
||||
api: string;
|
||||
provider: string;
|
||||
@@ -193,7 +197,7 @@ async function persistTextTurnTranscript(
|
||||
});
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile,
|
||||
timeoutMs: 10_000,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
allowReentrant: true,
|
||||
});
|
||||
try {
|
||||
@@ -202,6 +206,7 @@ async function persistTextTurnTranscript(
|
||||
transcriptPath: sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
cwd: params.sessionCwd,
|
||||
config: params.config,
|
||||
message: {
|
||||
role: "user",
|
||||
content: promptText,
|
||||
@@ -215,6 +220,7 @@ async function persistTextTurnTranscript(
|
||||
transcriptPath: sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
cwd: params.sessionCwd,
|
||||
config: params.config,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: replyText }],
|
||||
@@ -264,6 +270,7 @@ export async function persistAcpTurnTranscript(params: {
|
||||
sessionAgentId: string;
|
||||
threadId?: string | number;
|
||||
sessionCwd: string;
|
||||
config: OpenClawConfig;
|
||||
}): Promise<SessionEntry | undefined> {
|
||||
return await persistTextTurnTranscript({
|
||||
...params,
|
||||
@@ -287,6 +294,7 @@ export async function persistCliTurnTranscript(params: {
|
||||
sessionAgentId: string;
|
||||
threadId?: string | number;
|
||||
sessionCwd: string;
|
||||
config: OpenClawConfig;
|
||||
}): Promise<SessionEntry | undefined> {
|
||||
const replyText = resolveCliTranscriptReplyText(params.result);
|
||||
const provider = params.result.meta.agentMeta?.provider?.trim() ?? "cli";
|
||||
@@ -304,6 +312,7 @@ export async function persistCliTurnTranscript(params: {
|
||||
sessionAgentId: params.sessionAgentId,
|
||||
threadId: params.threadId,
|
||||
sessionCwd: params.sessionCwd,
|
||||
config: params.config,
|
||||
assistant: {
|
||||
api: "cli",
|
||||
provider,
|
||||
|
||||
@@ -166,6 +166,7 @@ async function compactCliTranscript(params: {
|
||||
reason: "compaction",
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext,
|
||||
config: params.cfg,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
buildAfterTurnRuntimeContext,
|
||||
buildAfterTurnRuntimeContextFromUsage,
|
||||
} from "../pi-embedded-runner/run/attempt.prompt-helpers.js";
|
||||
import type { SessionWriteLockAcquireTimeoutConfig } from "../session-write-lock.js";
|
||||
|
||||
export type HarnessContextEngine = ContextEngine;
|
||||
|
||||
@@ -21,6 +22,7 @@ export async function bootstrapHarnessContextEngine(params: {
|
||||
sessionManager?: unknown;
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
runMaintenance?: typeof runHarnessContextEngineMaintenance;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
warn: (message: string) => void;
|
||||
}): Promise<void> {
|
||||
if (
|
||||
@@ -45,6 +47,7 @@ export async function bootstrapHarnessContextEngine(params: {
|
||||
reason: "bootstrap",
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
} catch (bootstrapErr) {
|
||||
params.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`);
|
||||
@@ -97,6 +100,7 @@ export async function finalizeHarnessContextEngineTurn(params: {
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
runMaintenance?: typeof runHarnessContextEngineMaintenance;
|
||||
sessionManager?: unknown;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
warn: (message: string) => void;
|
||||
}) {
|
||||
if (!params.contextEngine) {
|
||||
@@ -165,6 +169,7 @@ export async function finalizeHarnessContextEngineTurn(params: {
|
||||
reason: "turn",
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -201,6 +206,7 @@ export async function runHarnessContextEngineMaintenance(params: {
|
||||
sessionManager?: unknown;
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
executionMode?: "foreground" | "background";
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}) {
|
||||
return await runContextEngineMaintenance({
|
||||
contextEngine: params.contextEngine,
|
||||
@@ -213,6 +219,7 @@ export async function runHarnessContextEngineMaintenance(params: {
|
||||
>[0]["sessionManager"],
|
||||
runtimeContext: params.runtimeContext,
|
||||
executionMode: params.executionMode,
|
||||
config: params.config,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -230,6 +230,7 @@ export async function compactEmbeddedPiSession(
|
||||
sessionFile: postCompactionSessionFile,
|
||||
reason: "compaction",
|
||||
runtimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
}
|
||||
if (engineOwnsCompaction && result.ok && result.compacted) {
|
||||
|
||||
@@ -90,6 +90,7 @@ import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../session-write-lock.js";
|
||||
import { detectRuntimeShell } from "../shell-utils.js";
|
||||
import {
|
||||
@@ -904,6 +905,7 @@ async function compactEmbeddedPiSessionDirectOnce(
|
||||
const compactionTimeoutMs = resolveCompactionTimeoutMs(params.config);
|
||||
const sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
maxHoldMs: resolveSessionLockMaxHoldFromTimeout({
|
||||
timeoutMs: compactionTimeoutMs,
|
||||
}),
|
||||
|
||||
@@ -115,6 +115,7 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => {
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
config: undefined,
|
||||
request: {
|
||||
replacements: [
|
||||
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
|
||||
@@ -357,6 +358,7 @@ describe("runContextEngineMaintenance", () => {
|
||||
reason: "turn",
|
||||
executionMode: "background",
|
||||
sessionManager,
|
||||
config: { session: { writeLock: { acquireTimeoutMs: 75_000 } } },
|
||||
});
|
||||
|
||||
expect(rewriteTranscriptEntriesInSessionManagerMock).not.toHaveBeenCalled();
|
||||
@@ -364,6 +366,7 @@ describe("runContextEngineMaintenance", () => {
|
||||
sessionFile: "/tmp/session-background-file-rewrite.jsonl",
|
||||
sessionId: "session-background-file-rewrite",
|
||||
sessionKey: "agent:main:session-background-file-rewrite",
|
||||
config: { session: { writeLock: { acquireTimeoutMs: 75_000 } } },
|
||||
request: {
|
||||
replacements: [
|
||||
{
|
||||
@@ -397,11 +400,27 @@ describe("runContextEngineMaintenance", () => {
|
||||
});
|
||||
await Promise.resolve();
|
||||
|
||||
const maintain = vi.fn(async (_params?: unknown) => ({
|
||||
changed: false,
|
||||
bytesFreed: 0,
|
||||
rewrittenEntries: 0,
|
||||
}));
|
||||
const maintain = vi.fn(async (params?: unknown) => {
|
||||
await (
|
||||
params as { runtimeContext?: ContextEngineRuntimeContext } | undefined
|
||||
)?.runtimeContext?.rewriteTranscriptEntries?.({
|
||||
replacements: [
|
||||
{
|
||||
entryId: "entry-1",
|
||||
message: castAgentMessage({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "done" }],
|
||||
timestamp: 2,
|
||||
}),
|
||||
},
|
||||
],
|
||||
});
|
||||
return {
|
||||
changed: false,
|
||||
bytesFreed: 0,
|
||||
rewrittenEntries: 0,
|
||||
};
|
||||
});
|
||||
|
||||
const backgroundEngine = {
|
||||
info: {
|
||||
@@ -429,6 +448,7 @@ describe("runContextEngineMaintenance", () => {
|
||||
tokenBudget: 2048,
|
||||
currentTokenCount: 1536,
|
||||
},
|
||||
config: { session: { writeLock: { acquireTimeoutMs: 91_000 } } },
|
||||
});
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
@@ -461,6 +481,24 @@ describe("runContextEngineMaintenance", () => {
|
||||
currentTokenCount: 1536,
|
||||
}),
|
||||
});
|
||||
expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionId: "session-1",
|
||||
sessionKey,
|
||||
config: { session: { writeLock: { acquireTimeoutMs: 91_000 } } },
|
||||
request: {
|
||||
replacements: [
|
||||
{
|
||||
entryId: "entry-1",
|
||||
message: castAgentMessage({
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "done" }],
|
||||
timestamp: 2,
|
||||
}),
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
const completedTask = getTaskById(queuedTasks[0].taskId);
|
||||
expect(completedTask).toMatchObject({
|
||||
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
updateTaskNotifyPolicyForOwner,
|
||||
} from "../../tasks/task-owner-access.js";
|
||||
import { findActiveSessionTask } from "../session-async-task-status.js";
|
||||
import type { SessionWriteLockAcquireTimeoutConfig } from "../session-write-lock.js";
|
||||
import { resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
@@ -45,6 +46,7 @@ type DeferredTurnMaintenanceScheduleParams = {
|
||||
sessionFile: string;
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
};
|
||||
|
||||
type DeferredTurnMaintenanceRunState = {
|
||||
@@ -275,6 +277,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
allowDeferredCompactionExecution?: boolean;
|
||||
deferTranscriptRewriteToSessionLane?: boolean;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): ContextEngineRuntimeContext {
|
||||
return {
|
||||
...params.runtimeContext,
|
||||
@@ -291,6 +294,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
config: params.config,
|
||||
request,
|
||||
});
|
||||
const rewriteSessionKey = normalizeSessionKey(params.sessionKey ?? params.sessionId);
|
||||
@@ -314,6 +318,7 @@ async function executeContextEngineMaintenance(params: {
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
executionMode: "foreground" | "background";
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<ContextEngineMaintenanceResult | undefined> {
|
||||
if (typeof params.contextEngine.maintain !== "function") {
|
||||
return undefined;
|
||||
@@ -330,6 +335,7 @@ async function executeContextEngineMaintenance(params: {
|
||||
runtimeContext: params.runtimeContext,
|
||||
allowDeferredCompactionExecution: params.executionMode === "background",
|
||||
deferTranscriptRewriteToSessionLane: params.executionMode === "background",
|
||||
config: params.config,
|
||||
}),
|
||||
});
|
||||
if (result.changed) {
|
||||
@@ -350,6 +356,7 @@ async function runDeferredTurnMaintenanceWorker(params: {
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
runId: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<void> {
|
||||
let surfacedUserNotice = false;
|
||||
let longRunningTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
@@ -428,6 +435,7 @@ async function runDeferredTurnMaintenanceWorker(params: {
|
||||
reason: "turn",
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
config: params.config,
|
||||
executionMode: "background",
|
||||
});
|
||||
if (longRunningTimer) {
|
||||
@@ -550,6 +558,7 @@ function scheduleDeferredTurnMaintenance(params: DeferredTurnMaintenanceSchedule
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
config: params.config,
|
||||
runId: task.runId!,
|
||||
}),
|
||||
);
|
||||
@@ -606,6 +615,7 @@ export async function runContextEngineMaintenance(params: {
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
executionMode?: "foreground" | "background";
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<ContextEngineMaintenanceResult | undefined> {
|
||||
if (typeof params.contextEngine?.maintain !== "function") {
|
||||
return undefined;
|
||||
@@ -626,6 +636,7 @@ export async function runContextEngineMaintenance(params: {
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`failed to schedule deferred context engine maintenance: ${String(err)}`);
|
||||
@@ -643,6 +654,7 @@ export async function runContextEngineMaintenance(params: {
|
||||
sessionManager: params.sessionManager,
|
||||
runtimeContext: params.runtimeContext,
|
||||
executionMode,
|
||||
config: params.config,
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`);
|
||||
|
||||
@@ -1480,6 +1480,7 @@ export async function runEmbeddedPiAgent(
|
||||
sessionFile: activeSessionFile,
|
||||
reason: "compaction",
|
||||
runtimeContext: overflowCompactionRuntimeContext,
|
||||
config: params.config,
|
||||
});
|
||||
}
|
||||
} catch (compactErr) {
|
||||
@@ -1513,6 +1514,7 @@ export async function runEmbeddedPiAgent(
|
||||
}),
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
config: params.config,
|
||||
});
|
||||
if (truncResult.truncated) {
|
||||
log.info(
|
||||
@@ -1564,6 +1566,7 @@ export async function runEmbeddedPiAgent(
|
||||
maxCharsOverride: toolResultMaxChars,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
config: params.config,
|
||||
});
|
||||
if (truncResult.truncated) {
|
||||
log.info(
|
||||
|
||||
@@ -139,6 +139,7 @@ import {
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../../session-write-lock.js";
|
||||
import { detectRuntimeShell } from "../../shell-utils.js";
|
||||
import {
|
||||
@@ -1381,6 +1382,7 @@ export async function runEmbeddedAttempt(
|
||||
// from taking over the same session when a gateway run stalls before model I/O.
|
||||
const sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
maxHoldMs: resolveSessionLockMaxHoldFromTimeout({
|
||||
timeoutMs: resolveRunTimeoutWithCompactionGraceMs({
|
||||
runTimeoutMs: params.timeoutMs,
|
||||
@@ -1453,6 +1455,7 @@ export async function runEmbeddedAttempt(
|
||||
reason: contextParams.reason,
|
||||
sessionManager: contextParams.sessionManager as never,
|
||||
runtimeContext: contextParams.runtimeContext,
|
||||
config: params.config,
|
||||
}),
|
||||
warn: (message) => log.warn(message),
|
||||
});
|
||||
@@ -3244,8 +3247,10 @@ export async function runEmbeddedAttempt(
|
||||
reason: contextParams.reason,
|
||||
sessionManager: contextParams.sessionManager as never,
|
||||
runtimeContext: contextParams.runtimeContext,
|
||||
config: params.config,
|
||||
}),
|
||||
sessionManager,
|
||||
config: params.config,
|
||||
warn: (message) => log.warn(message),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -6,7 +6,11 @@ import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
|
||||
import { resolveAgentContextLimits } from "../agent-scope.js";
|
||||
import { acquireSessionWriteLock } from "../session-write-lock.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../session-write-lock.js";
|
||||
import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
@@ -679,6 +683,7 @@ async function truncateOversizedToolResultsInTranscriptState(params: {
|
||||
maxCharsOverride?: number;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> {
|
||||
const { state, contextWindowTokens } = params;
|
||||
const maxChars = Math.max(
|
||||
@@ -758,12 +763,16 @@ export async function truncateOversizedToolResultsInSession(params: {
|
||||
maxCharsOverride?: number;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<{ truncated: boolean; truncatedCount: number; reason?: string }> {
|
||||
const { sessionFile, contextWindowTokens } = params;
|
||||
let sessionLock: Awaited<ReturnType<typeof acquireSessionWriteLock>> | undefined;
|
||||
|
||||
try {
|
||||
sessionLock = await acquireSessionWriteLock({ sessionFile });
|
||||
sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
});
|
||||
const state = await readTranscriptFileState(sessionFile);
|
||||
return await truncateOversizedToolResultsInTranscriptState({
|
||||
state,
|
||||
|
||||
@@ -327,6 +327,7 @@ describe("rewriteTranscriptEntriesInSessionFile", () => {
|
||||
expect(result.changed).toBe(true);
|
||||
expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({
|
||||
sessionFile,
|
||||
timeoutMs: 60_000,
|
||||
});
|
||||
expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(listener).toHaveBeenCalledWith({ sessionFile });
|
||||
|
||||
@@ -8,7 +8,11 @@ import type {
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import { getRawSessionAppendMessage } from "../session-raw-append-message.js";
|
||||
import { acquireSessionWriteLock } from "../session-write-lock.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../session-write-lock.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
persistTranscriptStateMutation,
|
||||
@@ -356,11 +360,13 @@ export async function rewriteTranscriptEntriesInSessionFile(params: {
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
request: TranscriptRewriteRequest;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<TranscriptRewriteResult> {
|
||||
let sessionLock: Awaited<ReturnType<typeof acquireSessionWriteLock>> | undefined;
|
||||
try {
|
||||
sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
});
|
||||
const state = await readTranscriptFileState(params.sessionFile);
|
||||
const result = rewriteTranscriptEntriesInState({
|
||||
|
||||
@@ -9,6 +9,7 @@ let acquireSessionWriteLock: typeof import("./session-write-lock.js").acquireSes
|
||||
let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLockFiles;
|
||||
let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest;
|
||||
let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout;
|
||||
let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs;
|
||||
|
||||
vi.mock("../shared/pid-alive.js", async () => {
|
||||
const original =
|
||||
@@ -133,6 +134,7 @@ describe("acquireSessionWriteLock", () => {
|
||||
cleanStaleLockFiles,
|
||||
resetSessionWriteLockStateForTest,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} = await import("./session-write-lock.js"));
|
||||
});
|
||||
|
||||
@@ -335,6 +337,20 @@ describe("acquireSessionWriteLock", () => {
|
||||
expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 1_000, minMs: 5_000 })).toBe(121_000);
|
||||
});
|
||||
|
||||
it("resolves the session write-lock acquire timeout", () => {
|
||||
expect(resolveSessionWriteLockAcquireTimeoutMs()).toBe(60_000);
|
||||
expect(
|
||||
resolveSessionWriteLockAcquireTimeoutMs({
|
||||
session: { writeLock: { acquireTimeoutMs: 90_000 } },
|
||||
}),
|
||||
).toBe(90_000);
|
||||
expect(
|
||||
resolveSessionWriteLockAcquireTimeoutMs({
|
||||
session: { writeLock: { acquireTimeoutMs: 0 } },
|
||||
}),
|
||||
).toBe(60_000);
|
||||
});
|
||||
|
||||
it("clamps max hold for effectively no-timeout runs", () => {
|
||||
expect(
|
||||
resolveSessionLockMaxHoldFromTimeout({
|
||||
|
||||
@@ -48,6 +48,7 @@ const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState");
|
||||
|
||||
const DEFAULT_STALE_MS = 30 * 60 * 1000;
|
||||
const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000;
|
||||
export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
|
||||
const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000;
|
||||
const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000;
|
||||
// A payload-less lock can be left behind if shutdown lands between open("wx")
|
||||
@@ -74,6 +75,24 @@ type LockInspectionDetails = Pick<
|
||||
|
||||
const HELD_LOCKS = resolveProcessScopedMap<HeldLock>(HELD_LOCKS_KEY);
|
||||
|
||||
export type SessionWriteLockAcquireTimeoutConfig = {
|
||||
session?: {
|
||||
writeLock?: {
|
||||
acquireTimeoutMs?: number;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
export function resolveSessionWriteLockAcquireTimeoutMs(
|
||||
config?: SessionWriteLockAcquireTimeoutConfig,
|
||||
): number {
|
||||
return resolvePositiveMs(
|
||||
config?.session?.writeLock?.acquireTimeoutMs,
|
||||
DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS,
|
||||
{ allowInfinity: true },
|
||||
);
|
||||
}
|
||||
|
||||
function resolveCleanupState(): CleanupState {
|
||||
const proc = process as NodeJS.Process & {
|
||||
[CLEANUP_STATE_KEY]?: CleanupState;
|
||||
@@ -563,7 +582,9 @@ export async function acquireSessionWriteLock(params: {
|
||||
}> {
|
||||
registerCleanupHandlers();
|
||||
const allowReentrant = params.allowReentrant ?? false;
|
||||
const timeoutMs = resolvePositiveMs(params.timeoutMs, 10_000, { allowInfinity: true });
|
||||
const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), {
|
||||
allowInfinity: true,
|
||||
});
|
||||
const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS);
|
||||
const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS);
|
||||
const sessionFile = path.resolve(params.sessionFile);
|
||||
|
||||
@@ -52,5 +52,6 @@ export async function persistAcpDispatchTranscript(params: {
|
||||
sessionAgentId,
|
||||
threadId: params.threadId,
|
||||
sessionCwd: resolveAcpSessionCwd(params.meta) ?? process.cwd(),
|
||||
config: params.cfg,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,6 +2,18 @@ import { describe, expect, it } from "vitest";
|
||||
import { validateConfigObject } from "./validation.js";
|
||||
|
||||
describe("config schema regressions", () => {
|
||||
it("accepts session write-lock acquire timeout", () => {
|
||||
const res = validateConfigObject({
|
||||
session: {
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 60_000,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.ok).toBe(true);
|
||||
});
|
||||
|
||||
it('accepts memorySearch fallback "voyage"', () => {
|
||||
const res = validateConfigObject({
|
||||
agents: {
|
||||
|
||||
@@ -689,6 +689,12 @@ describe("config help copy quality", () => {
|
||||
expect(/raw|unnormalized/i.test(rawKeyPrefix)).toBe(true);
|
||||
});
|
||||
|
||||
it("documents session write-lock acquire timeout defaults", () => {
|
||||
const acquireTimeout = FIELD_HELP["session.writeLock.acquireTimeoutMs"];
|
||||
expect(acquireTimeout.includes("60000")).toBe(true);
|
||||
expect(/transcript|lock/i.test(acquireTimeout)).toBe(true);
|
||||
});
|
||||
|
||||
it("documents session maintenance duration/size examples and deprecations", () => {
|
||||
const pruneAfter = FIELD_HELP["session.maintenance.pruneAfter"];
|
||||
expect(pruneAfter.includes("30d")).toBe(true);
|
||||
|
||||
@@ -1447,6 +1447,10 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"Matches a normalized session-key prefix after internal key normalization steps in policy consumers. Use this for general prefix controls, and prefer rawKeyPrefix when exact full-key matching is required.",
|
||||
"session.sendPolicy.rules[].match.rawKeyPrefix":
|
||||
"Matches the raw, unnormalized session-key prefix for exact full-key policy targeting. Use this when normalized keyPrefix is too broad and you need agent-prefixed or transport-specific precision.",
|
||||
"session.writeLock":
|
||||
"Groups session transcript write-lock acquisition controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default wait.",
|
||||
"session.writeLock.acquireTimeoutMs":
|
||||
"Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; raise for slow disks or long prep/cleanup, lower only when quick failure is preferred.",
|
||||
"session.agentToAgent":
|
||||
"Groups controls for inter-agent session exchanges, including loop prevention limits on reply chaining. Keep defaults unless you run advanced agent-to-agent automation with strict turn caps.",
|
||||
"session.agentToAgent.maxPingPongTurns":
|
||||
|
||||
@@ -716,6 +716,8 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"session.sendPolicy.rules[].match.chatType": "Session Send Rule Chat Type",
|
||||
"session.sendPolicy.rules[].match.keyPrefix": "Session Send Rule Key Prefix",
|
||||
"session.sendPolicy.rules[].match.rawKeyPrefix": "Session Send Rule Raw Key Prefix",
|
||||
"session.writeLock": "Session Write Lock",
|
||||
"session.writeLock.acquireTimeoutMs": "Session Write Lock Acquire Timeout",
|
||||
"session.agentToAgent": "Session Agent-to-Agent",
|
||||
"session.agentToAgent.maxPingPongTurns": "Agent-to-Agent Ping-Pong Turns",
|
||||
"session.threadBindings": "Session Thread Bindings",
|
||||
|
||||
@@ -3,7 +3,11 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { StringDecoder } from "node:string_decoder";
|
||||
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
|
||||
import { acquireSessionWriteLock } from "../../agents/session-write-lock.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../../agents/session-write-lock.js";
|
||||
|
||||
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
|
||||
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
|
||||
@@ -188,10 +192,11 @@ export async function appendSessionTranscriptMessage(params: {
|
||||
sessionId?: string;
|
||||
cwd?: string;
|
||||
useRawWhenLinear?: boolean;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<{ messageId: string }> {
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.transcriptPath,
|
||||
timeoutMs: 10_000,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
allowReentrant: true,
|
||||
});
|
||||
try {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import type { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import type { SessionWriteLockAcquireTimeoutConfig } from "../../agents/session-write-lock.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
import { extractAssistantVisibleText } from "../../shared/chat-message-content.js";
|
||||
@@ -164,6 +165,7 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
/** Optional override for store path (mostly for tests). */
|
||||
storePath?: string;
|
||||
updateMode?: SessionTranscriptUpdateMode;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<SessionTranscriptAppendResult> {
|
||||
const sessionKey = params.sessionKey.trim();
|
||||
if (!sessionKey) {
|
||||
@@ -184,6 +186,7 @@ export async function appendAssistantMessageToSessionTranscript(params: {
|
||||
storePath: params.storePath,
|
||||
idempotencyKey: params.idempotencyKey,
|
||||
updateMode: params.updateMode,
|
||||
config: params.config,
|
||||
message: {
|
||||
role: "assistant" as const,
|
||||
content: [{ type: "text", text: mirrorText }],
|
||||
@@ -217,6 +220,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
idempotencyKey?: string;
|
||||
storePath?: string;
|
||||
updateMode?: SessionTranscriptUpdateMode;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<SessionTranscriptAppendResult> {
|
||||
const sessionKey = params.sessionKey.trim();
|
||||
if (!sessionKey) {
|
||||
@@ -283,6 +287,7 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
const { messageId } = await appendSessionTranscriptMessage({
|
||||
transcriptPath: sessionFile,
|
||||
message,
|
||||
config: params.config,
|
||||
});
|
||||
|
||||
switch (params.updateMode ?? "inline") {
|
||||
|
||||
@@ -183,6 +183,8 @@ export type SessionConfig = {
|
||||
typingMode?: TypingMode;
|
||||
mainKey?: string;
|
||||
sendPolicy?: SessionSendPolicyConfig;
|
||||
/** Session transcript write-lock acquisition policy. */
|
||||
writeLock?: SessionWriteLockConfig;
|
||||
agentToAgent?: {
|
||||
/** Max ping-pong turns between requester/target (0–5). Default: 5. */
|
||||
maxPingPongTurns?: number;
|
||||
@@ -193,6 +195,11 @@ export type SessionConfig = {
|
||||
maintenance?: SessionMaintenanceConfig;
|
||||
};
|
||||
|
||||
export type SessionWriteLockConfig = {
|
||||
/** How long to wait while acquiring a session transcript write lock. Default: 60000. */
|
||||
acquireTimeoutMs?: number;
|
||||
};
|
||||
|
||||
export type SessionMaintenanceMode = "enforce" | "warn";
|
||||
|
||||
export type SessionMaintenanceConfig = {
|
||||
|
||||
@@ -2,6 +2,26 @@ import { describe, expect, it } from "vitest";
|
||||
import { SessionSchema } from "./zod-schema.session.js";
|
||||
|
||||
describe("SessionSchema maintenance extensions", () => {
|
||||
it("accepts session write-lock acquire timeout", () => {
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 60_000,
|
||||
},
|
||||
}),
|
||||
).not.toThrow();
|
||||
});
|
||||
|
||||
it("rejects invalid session write-lock acquire timeout values", () => {
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 0,
|
||||
},
|
||||
}),
|
||||
).toThrow(/acquireTimeoutMs|number/i);
|
||||
});
|
||||
|
||||
it("accepts valid maintenance extensions", () => {
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
|
||||
@@ -55,6 +55,12 @@ export const SessionSchema = z
|
||||
typingMode: TypingModeSchema.optional(),
|
||||
mainKey: z.string().optional(),
|
||||
sendPolicy: SessionSendPolicySchema.optional(),
|
||||
writeLock: z
|
||||
.object({
|
||||
acquireTimeoutMs: z.number().int().positive().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
agentToAgent: z
|
||||
.object({
|
||||
maxPingPongTurns: z.number().int().min(0).max(5).optional(),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import type { SessionWriteLockAcquireTimeoutConfig } from "../../agents/session-write-lock.js";
|
||||
import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js";
|
||||
@@ -51,6 +52,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: {
|
||||
idempotencyKey?: string;
|
||||
abortMeta?: GatewayInjectedAbortMeta;
|
||||
now?: number;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
}): Promise<GatewayInjectedTranscriptAppendResult> {
|
||||
const now = params.now ?? Date.now();
|
||||
const usage = {
|
||||
@@ -106,6 +108,7 @@ export async function appendInjectedAssistantMessageToTranscript(params: {
|
||||
message: messageBody,
|
||||
now,
|
||||
useRawWhenLinear: true,
|
||||
config: params.config,
|
||||
});
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: params.transcriptPath,
|
||||
|
||||
@@ -956,6 +956,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: {
|
||||
sessionKey: string;
|
||||
message: string;
|
||||
savedImages: SavedMedia[];
|
||||
cfg: OpenClawConfig;
|
||||
}) {
|
||||
const mediaFields = resolveChatSendTranscriptMediaFields(params.savedImages);
|
||||
if (!("MediaPath" in mediaFields)) {
|
||||
@@ -990,6 +991,7 @@ async function rewriteChatSendUserTurnMediaPaths(params: {
|
||||
await rewriteTranscriptEntriesInSessionFile({
|
||||
sessionFile: params.transcriptPath,
|
||||
sessionKey: params.sessionKey,
|
||||
config: params.cfg,
|
||||
request: {
|
||||
replacements: [
|
||||
{
|
||||
@@ -1319,6 +1321,7 @@ async function appendAssistantTranscriptMessage(params: {
|
||||
origin: AbortOrigin;
|
||||
runId: string;
|
||||
};
|
||||
cfg?: OpenClawConfig;
|
||||
}): Promise<TranscriptAppendResult> {
|
||||
const transcriptPath = resolveTranscriptPath({
|
||||
sessionId: params.sessionId,
|
||||
@@ -1357,6 +1360,7 @@ async function appendAssistantTranscriptMessage(params: {
|
||||
content: params.content,
|
||||
idempotencyKey: params.idempotencyKey,
|
||||
abortMeta: params.abortMeta,
|
||||
config: params.cfg,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1393,7 +1397,7 @@ async function persistAbortedPartials(params: {
|
||||
if (params.snapshots.length === 0) {
|
||||
return;
|
||||
}
|
||||
const { storePath, entry } = loadSessionEntry(params.sessionKey);
|
||||
const { cfg, storePath, entry } = loadSessionEntry(params.sessionKey);
|
||||
for (const snapshot of params.snapshots) {
|
||||
const sessionId = entry?.sessionId ?? snapshot.sessionId ?? snapshot.runId;
|
||||
const appended = await appendAssistantTranscriptMessage({
|
||||
@@ -1403,6 +1407,7 @@ async function persistAbortedPartials(params: {
|
||||
sessionFile: entry?.sessionFile,
|
||||
createIfMissing: true,
|
||||
idempotencyKey: `${snapshot.runId}:assistant`,
|
||||
cfg,
|
||||
abortMeta: {
|
||||
aborted: true,
|
||||
origin: snapshot.abortOrigin,
|
||||
@@ -2221,6 +2226,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionKey,
|
||||
message: parsedMessage,
|
||||
savedImages: await persistedImagesPromise,
|
||||
cfg,
|
||||
});
|
||||
};
|
||||
const appendWebchatAgentMediaTranscriptIfNeeded = async (payload: ReplyPayload) => {
|
||||
@@ -2284,6 +2290,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
agentId,
|
||||
createIfMissing: true,
|
||||
idempotencyKey: `${clientRunId}:assistant-media`,
|
||||
cfg,
|
||||
});
|
||||
if (appended.ok) {
|
||||
if (appended.messageId && assistantContent?.length) {
|
||||
@@ -2490,6 +2497,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionFile: latestEntry?.sessionFile,
|
||||
agentId,
|
||||
createIfMissing: true,
|
||||
cfg,
|
||||
});
|
||||
if (appended.ok) {
|
||||
if (appended.messageId && assistantContent?.length) {
|
||||
@@ -2646,6 +2654,7 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
sessionFile: entry?.sessionFile,
|
||||
agentId: resolveSessionAgentId({ sessionKey, config: cfg }),
|
||||
createIfMissing: true,
|
||||
cfg,
|
||||
});
|
||||
if (!appended.ok || !appended.messageId || !appended.message) {
|
||||
respond(
|
||||
|
||||
@@ -973,8 +973,9 @@ describe("deliverOutboundPayloads", () => {
|
||||
"fmt:hello **boss**:2",
|
||||
]);
|
||||
|
||||
const cfg = { channels: { line: {} } } as OpenClawConfig;
|
||||
await deliverOutboundPayloads({
|
||||
cfg: { channels: { line: {} } } as OpenClawConfig,
|
||||
cfg,
|
||||
channel: "line",
|
||||
to: "U123",
|
||||
payloads: [{ text: "photo", mediaUrl: "file:///tmp/f.png" }],
|
||||
@@ -1716,8 +1717,9 @@ describe("deliverOutboundPayloads", () => {
|
||||
);
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockClear();
|
||||
|
||||
const cfg = { channels: { line: {} } } as OpenClawConfig;
|
||||
await deliverOutboundPayloads({
|
||||
cfg: { channels: { line: {} } } as OpenClawConfig,
|
||||
cfg,
|
||||
channel: "line",
|
||||
to: "U123",
|
||||
payloads: [{ text: "caption", mediaUrl: "https://example.com/files/report.pdf?sig=1" }],
|
||||
@@ -1733,6 +1735,7 @@ describe("deliverOutboundPayloads", () => {
|
||||
expect.objectContaining({
|
||||
text: "report.pdf",
|
||||
idempotencyKey: "idem-deliver-1",
|
||||
config: cfg,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -1299,6 +1299,7 @@ async function deliverOutboundPayloadsCore(
|
||||
sessionKey: params.mirror.sessionKey,
|
||||
text: mirrorText,
|
||||
idempotencyKey: params.mirror.idempotencyKey,
|
||||
config: params.cfg,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ describe("executeSendAction", () => {
|
||||
}>,
|
||||
) {
|
||||
expect(mocks.appendAssistantMessageToSessionTranscript).toHaveBeenCalledWith(
|
||||
expect.objectContaining(expected),
|
||||
expect.objectContaining({ ...expected, config: {} }),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -159,6 +159,7 @@ export async function executeSendAction(params: {
|
||||
text: mirrorText,
|
||||
mediaUrls: mirrorMediaUrls,
|
||||
idempotencyKey: params.ctx.mirror.idempotencyKey,
|
||||
config: params.ctx.cfg,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
@@ -106,7 +106,11 @@ export {
|
||||
export { normalizeProviderToolSchemas } from "../agents/pi-embedded-runner/tool-schema-runtime.js";
|
||||
export { resolveSandboxContext } from "../agents/sandbox.js";
|
||||
export { isSubagentSessionKey } from "../routing/session-key.js";
|
||||
export { acquireSessionWriteLock } from "../agents/session-write-lock.js";
|
||||
export {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "../agents/session-write-lock.js";
|
||||
export { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
export {
|
||||
isToolWrappedWithBeforeToolCallHook,
|
||||
|
||||
Reference in New Issue
Block a user