mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 15:14:45 +00:00
Release embedded session write lock before model I/O (#82891)
Summary: - The PR narrows embedded PI session transcript write-lock scope, adds stale/max-hold config plumbing, and updates affected transcript, doctor, gateway, SDK, Codex mirroring, docs, and regression-test surfaces. - Reproducibility: yes. Current main source still holds the embedded session write lock from early attempt set ... cksmith Testbox contention proof on unmodified main; I did not rerun the live repro in this read-only pass. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(agents): narrow context engine session lock - PR branch already contained follow-up commit before automerge: fix session lock runner build types - PR branch already contained follow-up commit before automerge: Release embedded session write lock before model I/O - PR branch already contained follow-up commit before automerge: fix(clawsweeper): address review for automerge-openclaw-openclaw-8289… Validation: - ClawSweeper review passed for head4c6dd7ed6e. - Required merge gates passed before the squash merge. Prepared head SHA:4c6dd7ed6eReview: https://github.com/openclaw/openclaw/pull/82891#issuecomment-4469282923 Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
This commit is contained in:
@@ -227,6 +227,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Update/installers: override npm `min-release-age` quarantine for OpenClaw-managed package installs, so `openclaw update`, plugin updates, and hosted installer scripts can install the requested latest release immediately.
|
||||
- Agents/sessions: preserve fresh post-compaction token snapshots across stale usage updates, preventing repeated auto-compaction after every message. Fixes #82576. (#82578) Thanks @njuboy11.
|
||||
- Agents/replies: preserve active inbound reply context at the LLM boundary so Discord referenced-message turns do not answer from stale session history. Fixes #82608. (#82801) Thanks @joshavant.
|
||||
- Agents/sessions: expose session transcript lock stale and max-hold tuning, and release the embedded run's coarse transcript lock before model I/O while locking persistence and cleanup separately. Fixes #13744. Thanks @amknight.
|
||||
- Agents/OpenAI Responses: log redacted diagnostics for detail-less `response.failed` events while preserving failed response ids, so operators can correlate provider-side failures. Fixes #82558.
|
||||
- Agents/OpenRouter: strip non-replayable Anthropic/xAI reasoning provenance tags from follow-up requests, preventing poisoned thinking signatures from breaking second turns. Fixes #82335. (#82380) Thanks @hclsys.
|
||||
- Providers/xAI: send configurable reasoning effort only for Grok 4.3, preserving xAI's default low reasoning while omitting unsupported controls for Grok 4.20 reasoning models. (#81227) Thanks @jason-allen-oneal.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
1a4ff6c148f4c28eb2c07c77025c6ba13ed9f56d23bbb221fc6dd83781fda671 config-baseline.json
|
||||
a2663c4aed132ae968e8e6ef84566d22063143f8b093e839e1063393135842f5 config-baseline.core.json
|
||||
4b52f0bff12148f4695150a45c91d4b9bda2d1bfbc1162a79a2bb2cf62c3c1eb config-baseline.json
|
||||
73e11d9d5c5b27d8d075202f59b9f19537ded361ea761ed0aef78dc9446bc82f config-baseline.core.json
|
||||
fe4f1cb00d7d1dee9746779ec3cf14236e5f672c91502268a12ad6e467a2c4ad config-baseline.channel.json
|
||||
e9049ce0154f484f44bb0ac174a44198269256044da5ba62a6e107e78bfd7a70 config-baseline.plugin.json
|
||||
|
||||
@@ -97,7 +97,11 @@ OpenClaw no longer creates automatic `sessions.json.bak.*` rotation backups duri
|
||||
Transcript mutations use a session write lock on the transcript file. Lock acquisition waits up to
|
||||
`session.writeLock.acquireTimeoutMs` before surfacing a busy-session error; the default is `60000`
|
||||
ms. Raise this only when legitimate prep, cleanup, compaction, or transcript mirror work contends
|
||||
longer on slow machines. Stale-lock detection and maximum hold warnings remain separate policies.
|
||||
longer on slow machines. `session.writeLock.staleMs` controls when an existing lock can be
|
||||
reclaimed as stale; the default is `1800000` ms. `session.writeLock.maxHoldMs` controls the
|
||||
in-process watchdog release threshold; the default is `300000` ms. Emergency env overrides are
|
||||
`OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS`, `OPENCLAW_SESSION_WRITE_LOCK_STALE_MS`, and
|
||||
`OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS`.
|
||||
|
||||
Enforcement order for disk budget cleanup (`mode: "enforce"`):
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
acquireSessionWriteLock,
|
||||
appendSessionTranscriptMessage,
|
||||
emitSessionTranscriptUpdate,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
runAgentHarnessBeforeMessageWriteHook,
|
||||
type AgentMessage,
|
||||
type EmbeddedRunAttemptParams,
|
||||
@@ -128,7 +128,7 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
});
|
||||
try {
|
||||
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
|
||||
|
||||
@@ -30,10 +30,7 @@ import { isCliProvider } from "../model-selection.js";
|
||||
import { resolveOpenAIRuntimeProviderForPi } from "../openai-codex-routing.js";
|
||||
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
|
||||
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
} from "../session-write-lock.js";
|
||||
import { acquireSessionWriteLock, resolveSessionWriteLockOptions } from "../session-write-lock.js";
|
||||
import { buildWorkspaceSkillSnapshot } from "../skills.js";
|
||||
import { buildUsageWithNoCost } from "../stream-message-shared.js";
|
||||
import {
|
||||
@@ -228,7 +225,7 @@ async function persistTextTurnTranscript(
|
||||
});
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
allowReentrant: true,
|
||||
});
|
||||
try {
|
||||
|
||||
@@ -461,6 +461,11 @@ export async function loadCompactHooksHarness(): Promise<{
|
||||
acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })),
|
||||
resolveSessionLockMaxHoldFromTimeout: vi.fn(() => 0),
|
||||
resolveSessionWriteLockAcquireTimeoutMs: vi.fn(() => 60_000),
|
||||
resolveSessionWriteLockOptions: vi.fn(() => ({
|
||||
timeoutMs: 60_000,
|
||||
staleMs: 1_800_000,
|
||||
maxHoldMs: 300_000,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.doMock("../../context-engine/init.js", () => ({
|
||||
|
||||
@@ -99,7 +99,7 @@ import { sanitizeToolUseResultPairing } from "../session-transcript-repair.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} from "../session-write-lock.js";
|
||||
import { detectRuntimeShell } from "../shell-utils.js";
|
||||
import {
|
||||
@@ -956,9 +956,10 @@ async function compactEmbeddedPiSessionDirectOnce(
|
||||
const compactionTimeoutMs = resolveCompactionTimeoutMs(params.config);
|
||||
const sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
maxHoldMs: resolveSessionLockMaxHoldFromTimeout({
|
||||
timeoutMs: compactionTimeoutMs,
|
||||
...resolveSessionWriteLockOptions(params.config, {
|
||||
maxHoldMsFallback: resolveSessionLockMaxHoldFromTimeout({
|
||||
timeoutMs: compactionTimeoutMs,
|
||||
}),
|
||||
}),
|
||||
});
|
||||
try {
|
||||
|
||||
@@ -182,6 +182,50 @@ describe("buildContextEngineMaintenanceRuntimeContext", () => {
|
||||
expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("wraps active session manager rewrites in the supplied lock", async () => {
|
||||
const events: string[] = [];
|
||||
const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters<
|
||||
typeof buildContextEngineMaintenanceRuntimeContext
|
||||
>[0]["sessionManager"];
|
||||
rewriteTranscriptEntriesInSessionManagerMock.mockImplementationOnce((_params?: unknown) => {
|
||||
events.push("rewrite");
|
||||
return {
|
||||
changed: true,
|
||||
bytesFreed: 77,
|
||||
rewrittenEntries: 1,
|
||||
};
|
||||
});
|
||||
const runtimeContext = buildContextEngineMaintenanceRuntimeContext({
|
||||
sessionId: "session-1",
|
||||
sessionKey: "agent:main:session-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionManager,
|
||||
withSessionManagerRewriteLock: async (operation) => {
|
||||
events.push("lock-start");
|
||||
try {
|
||||
return await operation();
|
||||
} finally {
|
||||
events.push("lock-end");
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await runtimeContext.rewriteTranscriptEntries?.({
|
||||
replacements: [
|
||||
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
|
||||
],
|
||||
});
|
||||
|
||||
expect(events).toEqual(["lock-start", "rewrite", "lock-end"]);
|
||||
expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({
|
||||
sessionManager,
|
||||
replacements: [
|
||||
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
|
||||
],
|
||||
});
|
||||
expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("defers file rewrites onto the session lane when requested", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
@@ -419,6 +463,69 @@ describe("runContextEngineMaintenance", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("locks foreground maintenance rewrites that use the active session manager", async () => {
|
||||
const events: string[] = [];
|
||||
const maintain = vi.fn(async (params?: unknown) => {
|
||||
events.push("maintain-start");
|
||||
await (
|
||||
params as { runtimeContext?: ContextEngineRuntimeContext } | undefined
|
||||
)?.runtimeContext?.rewriteTranscriptEntries?.({
|
||||
replacements: [
|
||||
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
|
||||
],
|
||||
});
|
||||
events.push("maintain-end");
|
||||
return {
|
||||
changed: false,
|
||||
bytesFreed: 0,
|
||||
rewrittenEntries: 0,
|
||||
};
|
||||
});
|
||||
const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters<
|
||||
typeof buildContextEngineMaintenanceRuntimeContext
|
||||
>[0]["sessionManager"];
|
||||
rewriteTranscriptEntriesInSessionManagerMock.mockImplementationOnce((_params?: unknown) => {
|
||||
events.push("rewrite");
|
||||
return {
|
||||
changed: true,
|
||||
bytesFreed: 77,
|
||||
rewrittenEntries: 1,
|
||||
};
|
||||
});
|
||||
|
||||
await runContextEngineMaintenance({
|
||||
contextEngine: {
|
||||
info: { id: "test", name: "Test Engine" },
|
||||
ingest: async () => ({ ingested: true }),
|
||||
assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }),
|
||||
compact: async () => ({ ok: true, compacted: false }),
|
||||
maintain,
|
||||
},
|
||||
sessionId: "session-foreground-manager-rewrite",
|
||||
sessionKey: "agent:main:session-foreground-manager-rewrite",
|
||||
sessionFile: "/tmp/session-foreground-manager-rewrite.jsonl",
|
||||
reason: "turn",
|
||||
sessionManager,
|
||||
withSessionManagerRewriteLock: async (operation) => {
|
||||
events.push("lock-start");
|
||||
try {
|
||||
return await operation();
|
||||
} finally {
|
||||
events.push("lock-end");
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
expect(events).toEqual(["maintain-start", "lock-start", "rewrite", "lock-end", "maintain-end"]);
|
||||
expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({
|
||||
sessionManager,
|
||||
replacements: [
|
||||
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
|
||||
],
|
||||
});
|
||||
expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("defers turn maintenance to a hidden background task when enabled", async () => {
|
||||
await withStateDirEnv("openclaw-turn-maintenance-", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
@@ -60,6 +60,8 @@ type DeferredTurnMaintenanceRunState = {
|
||||
|
||||
const activeDeferredTurnMaintenanceRuns = new Map<string, DeferredTurnMaintenanceRunState>();
|
||||
|
||||
type SessionManagerRewriteLock = <T>(operation: () => Promise<T> | T) => Promise<T>;
|
||||
|
||||
type DeferredTurnMaintenanceSignal = "SIGINT" | "SIGTERM";
|
||||
type DeferredTurnMaintenanceProcessLike = Pick<NodeJS.Process, "on" | "off"> &
|
||||
Partial<Pick<NodeJS.Process, "listenerCount" | "kill" | "pid">> & {
|
||||
@@ -277,6 +279,7 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
sessionKey?: string;
|
||||
sessionFile: string;
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
withSessionManagerRewriteLock?: SessionManagerRewriteLock;
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
agentId?: string;
|
||||
allowDeferredCompactionExecution?: boolean;
|
||||
@@ -297,10 +300,15 @@ export function buildContextEngineMaintenanceRuntimeContext(params: {
|
||||
...(params.allowDeferredCompactionExecution ? { allowDeferredCompactionExecution: true } : {}),
|
||||
rewriteTranscriptEntries: async (request) => {
|
||||
if (params.sessionManager) {
|
||||
return rewriteTranscriptEntriesInSessionManager({
|
||||
sessionManager: params.sessionManager,
|
||||
replacements: request.replacements,
|
||||
});
|
||||
const sessionManager = params.sessionManager;
|
||||
const rewriteSessionManagerEntries = () =>
|
||||
rewriteTranscriptEntriesInSessionManager({
|
||||
sessionManager,
|
||||
replacements: request.replacements,
|
||||
});
|
||||
return params.withSessionManagerRewriteLock
|
||||
? await params.withSessionManagerRewriteLock(rewriteSessionManagerEntries)
|
||||
: rewriteSessionManagerEntries();
|
||||
}
|
||||
const rewriteTranscriptEntriesInFile = async () =>
|
||||
await rewriteTranscriptEntriesInSessionFile({
|
||||
@@ -329,6 +337,7 @@ async function executeContextEngineMaintenance(params: {
|
||||
sessionFile: string;
|
||||
reason: "bootstrap" | "compaction" | "turn";
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
withSessionManagerRewriteLock?: SessionManagerRewriteLock;
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
agentId?: string;
|
||||
executionMode: "foreground" | "background";
|
||||
@@ -346,6 +355,8 @@ async function executeContextEngineMaintenance(params: {
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionManager: params.executionMode === "background" ? undefined : params.sessionManager,
|
||||
withSessionManagerRewriteLock:
|
||||
params.executionMode === "background" ? undefined : params.withSessionManagerRewriteLock,
|
||||
runtimeContext: params.runtimeContext,
|
||||
agentId: params.agentId,
|
||||
allowDeferredCompactionExecution: params.executionMode === "background",
|
||||
@@ -636,6 +647,7 @@ export async function runContextEngineMaintenance(params: {
|
||||
sessionFile: string;
|
||||
reason: "bootstrap" | "compaction" | "turn";
|
||||
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
|
||||
withSessionManagerRewriteLock?: SessionManagerRewriteLock;
|
||||
runtimeContext?: ContextEngineRuntimeContext;
|
||||
agentId?: string;
|
||||
executionMode?: "foreground" | "background";
|
||||
@@ -681,6 +693,7 @@ export async function runContextEngineMaintenance(params: {
|
||||
sessionFile: params.sessionFile,
|
||||
reason: params.reason,
|
||||
sessionManager: params.sessionManager,
|
||||
withSessionManagerRewriteLock: params.withSessionManagerRewriteLock,
|
||||
runtimeContext: params.runtimeContext,
|
||||
agentId: params.agentId,
|
||||
executionMode,
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { StreamFn } from "@earendil-works/pi-agent-core";
|
||||
import type { Model } from "@earendil-works/pi-ai";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { prepareGooglePromptCacheStreamFn } from "./google-prompt-cache.js";
|
||||
import { EmbeddedAttemptSessionTakeoverError } from "./run/attempt.session-lock.js";
|
||||
|
||||
type SessionCustomEntry = {
|
||||
type: "custom";
|
||||
@@ -13,6 +14,11 @@ type SessionCustomEntry = {
|
||||
data: unknown;
|
||||
};
|
||||
|
||||
type TestGooglePromptCacheSessionManager = {
|
||||
appendCustomEntry(customType: string, data: unknown): void | Promise<void>;
|
||||
getEntries(): SessionCustomEntry[];
|
||||
};
|
||||
|
||||
function makeSessionManager(entries: SessionCustomEntry[] = []) {
|
||||
let counter = 0;
|
||||
return {
|
||||
@@ -27,7 +33,6 @@ function makeSessionManager(entries: SessionCustomEntry[] = []) {
|
||||
customType,
|
||||
data,
|
||||
});
|
||||
return id;
|
||||
},
|
||||
getEntries() {
|
||||
return entries;
|
||||
@@ -117,7 +122,7 @@ function streamOptions(streamFn: { mock: { calls: unknown[][] } }, callIndex = 0
|
||||
function preparePromptCacheStream(params: {
|
||||
fetchMock: ReturnType<typeof vi.fn>;
|
||||
now: number;
|
||||
sessionManager: ReturnType<typeof makeSessionManager>;
|
||||
sessionManager: TestGooglePromptCacheSessionManager;
|
||||
streamFn: StreamFn;
|
||||
}) {
|
||||
return prepareGooglePromptCacheStreamFn(
|
||||
@@ -159,20 +164,23 @@ describe("google prompt cache", () => {
|
||||
});
|
||||
|
||||
expect(wrapped).toBeTypeOf("function");
|
||||
void wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{
|
||||
systemPrompt: "Follow policy.",
|
||||
messages: [],
|
||||
tools: [
|
||||
{
|
||||
name: "lookup",
|
||||
description: "Look up a value",
|
||||
parameters: { type: "object" },
|
||||
},
|
||||
],
|
||||
} as never,
|
||||
{ temperature: 0.2 } as never,
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
await Promise.resolve(
|
||||
wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{
|
||||
systemPrompt: "Follow policy.",
|
||||
messages: [],
|
||||
tools: [
|
||||
{
|
||||
name: "lookup",
|
||||
description: "Look up a value",
|
||||
parameters: { type: "object" },
|
||||
},
|
||||
],
|
||||
} as never,
|
||||
{ temperature: 0.2 } as never,
|
||||
),
|
||||
);
|
||||
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
@@ -230,12 +238,19 @@ describe("google prompt cache", () => {
|
||||
expireTime: new Date(now + 3_600_000).toISOString(),
|
||||
});
|
||||
|
||||
await preparePromptCacheStream({
|
||||
const firstWrapped = await preparePromptCacheStream({
|
||||
fetchMock,
|
||||
now,
|
||||
sessionManager,
|
||||
streamFn: vi.fn(() => "first" as never),
|
||||
});
|
||||
await Promise.resolve(
|
||||
firstWrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
),
|
||||
);
|
||||
|
||||
fetchMock.mockClear();
|
||||
const { streamFn: innerStreamFn, getCapturedPayload } = createCapturingStreamFn("second");
|
||||
@@ -246,10 +261,12 @@ describe("google prompt cache", () => {
|
||||
streamFn: innerStreamFn,
|
||||
});
|
||||
|
||||
void wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
await Promise.resolve(
|
||||
wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
),
|
||||
);
|
||||
|
||||
expect(fetchMock).not.toHaveBeenCalled();
|
||||
@@ -259,6 +276,40 @@ describe("google prompt cache", () => {
|
||||
expect(getCapturedPayload()?.cachedContent).toBe("cachedContents/system-cache-2");
|
||||
});
|
||||
|
||||
it("propagates session takeover errors from cache entry persistence", async () => {
|
||||
const now = 2_500_000;
|
||||
const takeoverError = new EmbeddedAttemptSessionTakeoverError("/tmp/session.jsonl");
|
||||
const sessionManager = {
|
||||
appendCustomEntry: vi.fn(async () => {
|
||||
throw takeoverError;
|
||||
}),
|
||||
getEntries: vi.fn(() => []),
|
||||
};
|
||||
const fetchMock = createCacheFetchMock({
|
||||
name: "cachedContents/system-cache-takeover",
|
||||
expireTime: new Date(now + 3_600_000).toISOString(),
|
||||
});
|
||||
const innerStreamFn = vi.fn(() => "stream" as never);
|
||||
|
||||
const wrapped = await preparePromptCacheStream({
|
||||
fetchMock,
|
||||
now,
|
||||
sessionManager,
|
||||
streamFn: innerStreamFn,
|
||||
});
|
||||
|
||||
await expect(
|
||||
Promise.resolve(
|
||||
wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
),
|
||||
),
|
||||
).rejects.toBe(takeoverError);
|
||||
expect(innerStreamFn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("refreshes an about-to-expire cache entry instead of creating a new one", async () => {
|
||||
const now = 3_000_000;
|
||||
const expireSoon = new Date(now + 60_000).toISOString();
|
||||
@@ -297,10 +348,12 @@ describe("google prompt cache", () => {
|
||||
streamFn: innerStreamFn,
|
||||
});
|
||||
|
||||
void wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
await Promise.resolve(
|
||||
wrapped?.(
|
||||
makeGoogleModel(),
|
||||
{ systemPrompt: "Follow policy.", messages: [] } as never,
|
||||
{} as never,
|
||||
),
|
||||
);
|
||||
|
||||
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
@@ -5,11 +5,13 @@ import { parseGeminiAuth } from "../../infra/gemini-auth.js";
|
||||
import { normalizeGoogleApiBaseUrl } from "../../infra/google-api-base-url.js";
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
import { buildGuardedModelFetch } from "../provider-transport-fetch.js";
|
||||
import { isSessionWriteLockTimeoutError } from "../session-write-lock-error.js";
|
||||
import { stableStringify } from "../stable-stringify.js";
|
||||
import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js";
|
||||
import { mergeTransportHeaders, sanitizeTransportPayloadText } from "../transport-stream-shared.js";
|
||||
import { log } from "./logger.js";
|
||||
import { isGooglePromptCacheEligible, resolveCacheRetention } from "./prompt-cache-retention.js";
|
||||
import { EmbeddedAttemptSessionTakeoverError } from "./run/attempt.session-lock.js";
|
||||
import { streamWithPayloadPatch } from "./stream-payload-utils.js";
|
||||
|
||||
const GOOGLE_PROMPT_CACHE_CUSTOM_TYPE = "openclaw.google-prompt-cache";
|
||||
@@ -21,7 +23,7 @@ type CacheRetention = "short" | "long";
|
||||
type CustomEntryLike = { type?: unknown; customType?: unknown; data?: unknown };
|
||||
|
||||
type GooglePromptCacheSessionManager = {
|
||||
appendCustomEntry(customType: string, data?: unknown): unknown;
|
||||
appendCustomEntry(customType: string, data?: unknown): void | Promise<void>;
|
||||
getEntries(): CustomEntryLike[];
|
||||
};
|
||||
type GooglePromptCacheModel = Model<Api> & {
|
||||
@@ -159,13 +161,16 @@ function readLatestGooglePromptCacheEntry(
|
||||
return null;
|
||||
}
|
||||
|
||||
function appendGooglePromptCacheEntry(
|
||||
async function appendGooglePromptCacheEntry(
|
||||
sessionManager: GooglePromptCacheSessionManager,
|
||||
entry: GooglePromptCacheEntry,
|
||||
): void {
|
||||
): Promise<void> {
|
||||
try {
|
||||
sessionManager.appendCustomEntry(GOOGLE_PROMPT_CACHE_CUSTOM_TYPE, entry);
|
||||
} catch {
|
||||
await sessionManager.appendCustomEntry(GOOGLE_PROMPT_CACHE_CUSTOM_TYPE, entry);
|
||||
} catch (err) {
|
||||
if (err instanceof EmbeddedAttemptSessionTakeoverError || isSessionWriteLockTimeoutError(err)) {
|
||||
throw err;
|
||||
}
|
||||
// ignore persistence failures
|
||||
}
|
||||
}
|
||||
@@ -293,7 +298,7 @@ async function ensureGooglePromptCache(
|
||||
signal: params.signal,
|
||||
}).catch(() => null);
|
||||
if (refreshed) {
|
||||
appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
await appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
status: "ready",
|
||||
timestamp: now,
|
||||
provider: params.provider,
|
||||
@@ -322,7 +327,7 @@ async function ensureGooglePromptCache(
|
||||
systemPrompt: params.systemPrompt,
|
||||
});
|
||||
if (!created) {
|
||||
appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
await appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
status: "failed",
|
||||
timestamp: now,
|
||||
provider: params.provider,
|
||||
@@ -336,7 +341,7 @@ async function ensureGooglePromptCache(
|
||||
return null;
|
||||
}
|
||||
|
||||
appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
await appendGooglePromptCacheEntry(params.sessionManager, {
|
||||
status: "ready",
|
||||
timestamp: now,
|
||||
provider: params.provider,
|
||||
@@ -379,28 +384,28 @@ export async function prepareGooglePromptCacheStreamFn(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const cachedContent = await ensureGooglePromptCache(
|
||||
{
|
||||
apiKey,
|
||||
cacheRetention: resolvedRetention,
|
||||
model: params.model,
|
||||
provider: params.provider,
|
||||
sessionManager: params.sessionManager,
|
||||
signal: params.signal,
|
||||
systemPrompt,
|
||||
},
|
||||
deps,
|
||||
);
|
||||
if (!cachedContent) {
|
||||
log.debug(
|
||||
`google prompt cache unavailable for ${params.provider}/${params.modelId}; continuing without cachedContent`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const inner = params.streamFn;
|
||||
return (model, context, options) =>
|
||||
streamWithPayloadPatch(
|
||||
return async (model, context, options) => {
|
||||
const cachedContent = await ensureGooglePromptCache(
|
||||
{
|
||||
apiKey,
|
||||
cacheRetention: resolvedRetention,
|
||||
model: params.model,
|
||||
provider: params.provider,
|
||||
sessionManager: params.sessionManager,
|
||||
signal: params.signal,
|
||||
systemPrompt,
|
||||
},
|
||||
deps,
|
||||
);
|
||||
if (!cachedContent) {
|
||||
log.debug(
|
||||
`google prompt cache unavailable for ${params.provider}/${params.modelId}; continuing without cachedContent`,
|
||||
);
|
||||
return inner(model, context, options);
|
||||
}
|
||||
|
||||
return streamWithPayloadPatch(
|
||||
inner,
|
||||
model,
|
||||
buildManagedContextWithoutSystemPrompt(context),
|
||||
@@ -409,4 +414,5 @@ export async function prepareGooglePromptCacheStreamFn(
|
||||
payload.cachedContent = cachedContent;
|
||||
},
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
462
src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts
Normal file
462
src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts
Normal file
@@ -0,0 +1,462 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
|
||||
import {
|
||||
createEmbeddedAttemptSessionLockController,
|
||||
EmbeddedAttemptSessionTakeoverError,
|
||||
installPromptSubmissionLockRelease,
|
||||
installSessionEventWriteLock,
|
||||
installSessionExternalHookWriteLock,
|
||||
} from "./attempt.session-lock.js";
|
||||
|
||||
const lockOptions = {
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
timeoutMs: 60_000,
|
||||
staleMs: 1_800_000,
|
||||
maxHoldMs: 300_000,
|
||||
};
|
||||
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
afterEach(async () => {
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
async function createTempSessionFile(): Promise<string> {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-attempt-session-lock-"));
|
||||
tempDirs.push(dir);
|
||||
const sessionFile = path.join(dir, "session.jsonl");
|
||||
await fs.writeFile(sessionFile, '{"type":"session"}\n', "utf8");
|
||||
return sessionFile;
|
||||
}
|
||||
|
||||
describe("embedded attempt session lock lifecycle", () => {
|
||||
it("releases the coarse attempt lock before prompt submission and reacquires for cleanup", async () => {
|
||||
const releases: string[] = [];
|
||||
const acquireSessionWriteLock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) })
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => releases.push("cleanup")) });
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions,
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
const cleanupLock = await controller.acquireForCleanup();
|
||||
await cleanupLock.release();
|
||||
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
|
||||
expect(acquireSessionWriteLock).toHaveBeenNthCalledWith(1, lockOptions);
|
||||
expect(acquireSessionWriteLock).toHaveBeenNthCalledWith(2, lockOptions);
|
||||
expect(releases).toEqual(["prep", "cleanup"]);
|
||||
});
|
||||
|
||||
it("runs post-prompt transcript writes under a short reacquired lock", async () => {
|
||||
const events: string[] = [];
|
||||
const acquireSessionWriteLock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => events.push("prep-release")) })
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => events.push("post-release")) });
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions,
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
await controller.withSessionWriteLock(async () => {
|
||||
events.push("post-write");
|
||||
});
|
||||
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
|
||||
expect(events).toEqual(["prep-release", "post-write", "post-release"]);
|
||||
});
|
||||
|
||||
it("reuses its active post-prompt lock for nested session writes", async () => {
|
||||
const events: string[] = [];
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const acquireSessionWriteLock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => events.push("prep-release")) })
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => events.push("post-release")) })
|
||||
.mockRejectedValueOnce(
|
||||
new SessionWriteLockTimeoutError({
|
||||
timeoutMs: lockOptions.timeoutMs,
|
||||
owner: "pid=789",
|
||||
lockPath: `${sessionFile}.lock`,
|
||||
}),
|
||||
);
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions: { ...lockOptions, sessionFile },
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
await controller.withSessionWriteLock(async () => {
|
||||
events.push("outer-start");
|
||||
await fs.appendFile(sessionFile, '{"type":"message","id":"local"}\n', "utf8");
|
||||
await controller.withSessionWriteLock(async () => {
|
||||
events.push("inner-write");
|
||||
});
|
||||
events.push("outer-end");
|
||||
});
|
||||
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
|
||||
expect(events).toEqual([
|
||||
"prep-release",
|
||||
"outer-start",
|
||||
"inner-write",
|
||||
"outer-end",
|
||||
"post-release",
|
||||
]);
|
||||
});
|
||||
|
||||
it("drains queued Pi session events before reacquiring for cleanup", async () => {
|
||||
const events: string[] = [];
|
||||
let resolveQueue!: () => void;
|
||||
const session = {
|
||||
_agentEventQueue: new Promise<void>((resolve) => {
|
||||
resolveQueue = resolve;
|
||||
}).then(() => {
|
||||
events.push("events-drained");
|
||||
}),
|
||||
};
|
||||
let acquireCount = 0;
|
||||
const acquireSessionWriteLock = vi.fn(async () => {
|
||||
acquireCount += 1;
|
||||
events.push(`acquire-${acquireCount}`);
|
||||
return {
|
||||
release: vi.fn(async () => {
|
||||
events.push("release");
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions,
|
||||
});
|
||||
await controller.releaseForPrompt();
|
||||
const cleanupLockPromise = controller.acquireForCleanup({ session });
|
||||
|
||||
await Promise.resolve();
|
||||
expect(events).toEqual(["acquire-1", "release"]);
|
||||
|
||||
resolveQueue();
|
||||
const cleanupLock = await cleanupLockPromise;
|
||||
await cleanupLock.release();
|
||||
|
||||
expect(events).toEqual(["acquire-1", "release", "events-drained", "acquire-2", "release"]);
|
||||
});
|
||||
|
||||
it("rejects post-prompt writes when another owner advances the session file", async () => {
|
||||
const sessionFile = await createTempSessionFile();
|
||||
const release = vi.fn(async () => {});
|
||||
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions: { ...lockOptions, sessionFile },
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
await fs.appendFile(sessionFile, '{"type":"message","id":"takeover"}\n', "utf8");
|
||||
|
||||
await expect(controller.withSessionWriteLock(() => "late-write")).rejects.toBeInstanceOf(
|
||||
EmbeddedAttemptSessionTakeoverError,
|
||||
);
|
||||
expect(controller.hasSessionTakeover()).toBe(true);
|
||||
|
||||
const cleanupLock = await controller.acquireForCleanup();
|
||||
await cleanupLock.release();
|
||||
|
||||
expect(release).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => {
|
||||
const releases: string[] = [];
|
||||
const acquireSessionWriteLock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) })
|
||||
.mockRejectedValueOnce(
|
||||
new SessionWriteLockTimeoutError({
|
||||
timeoutMs: lockOptions.timeoutMs,
|
||||
owner: "pid=123",
|
||||
lockPath: `${lockOptions.sessionFile}.lock`,
|
||||
}),
|
||||
);
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions,
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
const cleanupLock = await controller.acquireForCleanup();
|
||||
await cleanupLock.release();
|
||||
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
|
||||
expect(controller.hasSessionTakeover()).toBe(true);
|
||||
expect(releases).toEqual(["prep"]);
|
||||
});
|
||||
|
||||
it("skips cleanup lock reacquisition after a post-prompt lock timeout", async () => {
|
||||
const releases: string[] = [];
|
||||
const acquireSessionWriteLock = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ release: vi.fn(async () => releases.push("prep")) })
|
||||
.mockRejectedValueOnce(
|
||||
new SessionWriteLockTimeoutError({
|
||||
timeoutMs: lockOptions.timeoutMs,
|
||||
owner: "pid=456",
|
||||
lockPath: `${lockOptions.sessionFile}.lock`,
|
||||
}),
|
||||
);
|
||||
|
||||
const controller = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions,
|
||||
});
|
||||
|
||||
await controller.releaseForPrompt();
|
||||
await expect(controller.withSessionWriteLock(() => "late-write")).rejects.toBeInstanceOf(
|
||||
SessionWriteLockTimeoutError,
|
||||
);
|
||||
const cleanupLock = await controller.acquireForCleanup();
|
||||
await cleanupLock.release();
|
||||
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(2);
|
||||
expect(controller.hasSessionTakeover()).toBe(true);
|
||||
expect(releases).toEqual(["prep"]);
|
||||
});
|
||||
|
||||
it("wraps provider stream submission with queued transcript drain and lock release", async () => {
|
||||
const events: string[] = [];
|
||||
const streamFn = vi.fn(async (..._args: unknown[]) => {
|
||||
events.push("stream");
|
||||
});
|
||||
const waitForSessionEvents = vi.fn(async () => {
|
||||
events.push("drain");
|
||||
});
|
||||
const releaseForPrompt = vi.fn(async () => {
|
||||
events.push("release");
|
||||
});
|
||||
const session = { agent: { streamFn } };
|
||||
|
||||
installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt });
|
||||
|
||||
await session.agent.streamFn("model", "context");
|
||||
|
||||
expect(waitForSessionEvents).toHaveBeenCalledWith(session);
|
||||
expect(releaseForPrompt).toHaveBeenCalledTimes(1);
|
||||
expect(streamFn).toHaveBeenCalledWith("model", "context");
|
||||
expect(events).toEqual(["drain", "release", "stream"]);
|
||||
});
|
||||
|
||||
it("rewraps provider stream submission after the stream function is rebuilt", async () => {
|
||||
const events: string[] = [];
|
||||
const firstStreamFn = vi.fn(async (..._args: unknown[]) => {
|
||||
events.push("first-stream");
|
||||
});
|
||||
const secondStreamFn = vi.fn(async (..._args: unknown[]) => {
|
||||
events.push("second-stream");
|
||||
});
|
||||
const waitForSessionEvents = vi.fn(async () => {
|
||||
events.push("drain");
|
||||
});
|
||||
const releaseForPrompt = vi.fn(async () => {
|
||||
events.push("release");
|
||||
});
|
||||
const session = { agent: { streamFn: firstStreamFn } };
|
||||
|
||||
installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt });
|
||||
installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt });
|
||||
await session.agent.streamFn("first-model");
|
||||
|
||||
session.agent.streamFn = secondStreamFn;
|
||||
installPromptSubmissionLockRelease({ session, waitForSessionEvents, releaseForPrompt });
|
||||
await session.agent.streamFn("second-model");
|
||||
|
||||
expect(firstStreamFn).toHaveBeenCalledTimes(1);
|
||||
expect(secondStreamFn).toHaveBeenCalledTimes(1);
|
||||
expect(waitForSessionEvents).toHaveBeenCalledTimes(2);
|
||||
expect(releaseForPrompt).toHaveBeenCalledTimes(2);
|
||||
expect(events).toEqual([
|
||||
"drain",
|
||||
"release",
|
||||
"first-stream",
|
||||
"drain",
|
||||
"release",
|
||||
"second-stream",
|
||||
]);
|
||||
});
|
||||
|
||||
it("locks agent events that can reach transcript writers or registered extension hooks", async () => {
|
||||
const releases: string[] = [];
|
||||
const acquireSessionWriteLock = vi.fn(async (_options: typeof lockOptions) => ({
|
||||
release: vi.fn(async () => {
|
||||
releases.push("released");
|
||||
}),
|
||||
}));
|
||||
const processed: Array<string | undefined> = [];
|
||||
const hasHandlers = vi.fn(() => false);
|
||||
const session = {
|
||||
_extensionRunner: { hasHandlers },
|
||||
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
|
||||
processed.push(event.type);
|
||||
}),
|
||||
};
|
||||
|
||||
installSessionEventWriteLock({
|
||||
session,
|
||||
withSessionWriteLock: async (run) => {
|
||||
const lock = await acquireSessionWriteLock(lockOptions);
|
||||
try {
|
||||
return await run();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await session._processAgentEvent({ type: "message_update" });
|
||||
await session._processAgentEvent({ type: "tool_execution_end" });
|
||||
await session._processAgentEvent({ type: "message_end" });
|
||||
await session._processAgentEvent({ type: "agent_end" });
|
||||
await session._processAgentEvent({});
|
||||
|
||||
expect(processed).toEqual([
|
||||
"message_update",
|
||||
"tool_execution_end",
|
||||
"message_end",
|
||||
"agent_end",
|
||||
undefined,
|
||||
]);
|
||||
expect(hasHandlers).toHaveBeenCalledWith("tool_execution_end");
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
|
||||
expect(acquireSessionWriteLock).toHaveBeenCalledWith(lockOptions);
|
||||
expect(releases).toEqual(["released", "released", "released"]);
|
||||
});
|
||||
|
||||
it("locks Pi extension hooks that can mutate the session outside agent events", async () => {
|
||||
const locked: string[] = [];
|
||||
const called: string[] = [];
|
||||
const hasHandlers = vi.fn(
|
||||
(eventType: string) =>
|
||||
eventType === "tool_call" ||
|
||||
eventType === "tool_result" ||
|
||||
eventType === "before_provider_request",
|
||||
);
|
||||
const session = {
|
||||
_extensionRunner: { hasHandlers },
|
||||
compact: vi.fn(async () => called.push("compact")),
|
||||
agent: {
|
||||
beforeToolCall: vi.fn(async () => called.push("tool_call")),
|
||||
afterToolCall: vi.fn(async () => called.push("tool_result")),
|
||||
onPayload: vi.fn(async () => {
|
||||
called.push("before_provider_request");
|
||||
return { ok: true };
|
||||
}),
|
||||
onResponse: vi.fn(async () => called.push("after_provider_response")),
|
||||
},
|
||||
};
|
||||
|
||||
installSessionExternalHookWriteLock({
|
||||
session,
|
||||
withSessionWriteLock: async (run) => {
|
||||
locked.push("lock");
|
||||
return await run();
|
||||
},
|
||||
});
|
||||
|
||||
await session.agent.beforeToolCall();
|
||||
await session.agent.afterToolCall();
|
||||
await expect(session.agent.onPayload()).resolves.toEqual({ ok: true });
|
||||
await session.agent.onResponse();
|
||||
await session.compact();
|
||||
|
||||
expect(called).toEqual([
|
||||
"tool_call",
|
||||
"tool_result",
|
||||
"before_provider_request",
|
||||
"after_provider_response",
|
||||
"compact",
|
||||
]);
|
||||
expect(locked).toEqual(["lock", "lock", "lock", "lock"]);
|
||||
expect(hasHandlers).toHaveBeenCalledWith("tool_result");
|
||||
expect(hasHandlers).toHaveBeenCalledWith("before_provider_request");
|
||||
expect(hasHandlers).toHaveBeenCalledWith("after_provider_response");
|
||||
});
|
||||
|
||||
it("fences tool calls even when no extension hook is registered", async () => {
|
||||
const events: string[] = [];
|
||||
const session = {
|
||||
_extensionRunner: {
|
||||
hasHandlers: vi.fn(() => false),
|
||||
},
|
||||
agent: {
|
||||
beforeToolCall: vi.fn(async () => {
|
||||
events.push("tool_call");
|
||||
}),
|
||||
},
|
||||
};
|
||||
|
||||
installSessionExternalHookWriteLock({
|
||||
session,
|
||||
withSessionWriteLock: async (run) => {
|
||||
events.push("lock");
|
||||
return await run();
|
||||
},
|
||||
});
|
||||
|
||||
await session.agent.beforeToolCall();
|
||||
|
||||
expect(events).toEqual(["lock", "tool_call"]);
|
||||
expect(session._extensionRunner.hasHandlers).not.toHaveBeenCalledWith("tool_call");
|
||||
});
|
||||
|
||||
it("drains queued session events before locking a tool-call extension hook", async () => {
|
||||
const events: string[] = [];
|
||||
let resolveQueue!: () => void;
|
||||
const session = {
|
||||
_agentEventQueue: new Promise<void>((resolve) => {
|
||||
resolveQueue = resolve;
|
||||
}).then(() => {
|
||||
events.push("queue-drained");
|
||||
}),
|
||||
_extensionRunner: {
|
||||
hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"),
|
||||
},
|
||||
agent: {
|
||||
beforeToolCall: vi.fn(async () => {
|
||||
events.push("hook-start");
|
||||
await session._agentEventQueue;
|
||||
events.push("hook-end");
|
||||
}),
|
||||
},
|
||||
};
|
||||
|
||||
installSessionExternalHookWriteLock({
|
||||
session,
|
||||
withSessionWriteLock: async (run) => {
|
||||
events.push("lock");
|
||||
return await run();
|
||||
},
|
||||
});
|
||||
|
||||
const hookPromise = session.agent.beforeToolCall();
|
||||
await Promise.resolve();
|
||||
expect(events).toEqual([]);
|
||||
|
||||
resolveQueue();
|
||||
await hookPromise;
|
||||
|
||||
expect(events).toEqual(["queue-drained", "lock", "hook-start", "hook-end"]);
|
||||
});
|
||||
});
|
||||
392
src/agents/pi-embedded-runner/run/attempt.session-lock.ts
Normal file
392
src/agents/pi-embedded-runner/run/attempt.session-lock.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
import { AsyncLocalStorage } from "node:async_hooks";
|
||||
import fs from "node:fs/promises";
|
||||
import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
|
||||
import type { acquireSessionWriteLock } from "../../session-write-lock.js";
|
||||
|
||||
type SessionLock = Awaited<ReturnType<typeof acquireSessionWriteLock>>;
|
||||
type AcquireSessionWriteLock = typeof acquireSessionWriteLock;
|
||||
|
||||
type LockOptions = {
|
||||
sessionFile: string;
|
||||
timeoutMs: number;
|
||||
staleMs: number;
|
||||
maxHoldMs: number;
|
||||
};
|
||||
|
||||
type SessionEventProcessor = {
|
||||
_processAgentEvent?: (event: unknown) => Promise<void>;
|
||||
_extensionRunner?: {
|
||||
hasHandlers?: (eventType: string) => boolean;
|
||||
};
|
||||
__openclawSessionEventWriteLockInstalled?: boolean;
|
||||
};
|
||||
|
||||
type SessionEventQueueOwner = {
|
||||
_agentEventQueue?: PromiseLike<unknown>;
|
||||
};
|
||||
|
||||
type SessionWithAgentPrompt = {
|
||||
agent?: {
|
||||
streamFn?: PromptReleaseStreamFn;
|
||||
};
|
||||
};
|
||||
|
||||
type SessionWithExternalHooks = SessionEventProcessor & {
|
||||
compact?: LockableFunction;
|
||||
agent?: {
|
||||
beforeToolCall?: LockableFunction;
|
||||
afterToolCall?: LockableFunction;
|
||||
onPayload?: LockableFunction;
|
||||
onResponse?: LockableFunction;
|
||||
};
|
||||
};
|
||||
|
||||
type PromptReleaseStreamFn = ((...args: unknown[]) => unknown) & {
|
||||
__openclawSessionLockPromptReleaseInstalled?: boolean;
|
||||
};
|
||||
|
||||
type LockableFunction = ((...args: unknown[]) => unknown) & {
|
||||
__openclawSessionWriteLockInstalled?: boolean;
|
||||
};
|
||||
|
||||
function sessionHasExtensionHandlers(session: SessionEventProcessor, eventType: string): boolean {
|
||||
const hasHandlers = session._extensionRunner?.hasHandlers;
|
||||
if (typeof hasHandlers !== "function") {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return hasHandlers.call(session._extensionRunner, eventType);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
function eventMayReachTranscriptWriters(session: SessionEventProcessor, event: unknown): boolean {
|
||||
const type = (event as { type?: unknown } | null)?.type;
|
||||
if (type === "message_update" || type === "message_end" || type === "agent_end") {
|
||||
return true;
|
||||
}
|
||||
if (typeof type !== "string") {
|
||||
return false;
|
||||
}
|
||||
return sessionHasExtensionHandlers(session, type);
|
||||
}
|
||||
|
||||
function installLockableFunction(params: {
|
||||
owner: Record<string, unknown>;
|
||||
key: string;
|
||||
shouldLock: () => boolean;
|
||||
waitBeforeLock?: () => Promise<void>;
|
||||
withSessionWriteLock: <T>(run: () => Promise<T> | T) => Promise<T>;
|
||||
}): void {
|
||||
const current = params.owner[params.key] as LockableFunction | undefined;
|
||||
if (typeof current !== "function" || current.__openclawSessionWriteLockInstalled === true) {
|
||||
return;
|
||||
}
|
||||
const wrapped: LockableFunction = async function lockedExternalHook(
|
||||
this: unknown,
|
||||
...args: unknown[]
|
||||
) {
|
||||
if (!params.shouldLock()) {
|
||||
return await current.apply(this, args);
|
||||
}
|
||||
await params.waitBeforeLock?.();
|
||||
return await params.withSessionWriteLock(async () => await current.apply(this, args));
|
||||
};
|
||||
wrapped.__openclawSessionWriteLockInstalled = true;
|
||||
params.owner[params.key] = wrapped;
|
||||
}
|
||||
|
||||
type SessionFileFingerprint =
|
||||
| { exists: false }
|
||||
| {
|
||||
exists: true;
|
||||
dev: bigint;
|
||||
ino: bigint;
|
||||
size: bigint;
|
||||
mtimeNs: bigint;
|
||||
ctimeNs: bigint;
|
||||
};
|
||||
|
||||
function sameSessionFileFingerprint(
|
||||
left: SessionFileFingerprint | undefined,
|
||||
right: SessionFileFingerprint,
|
||||
): boolean {
|
||||
if (!left || left.exists !== right.exists) {
|
||||
return false;
|
||||
}
|
||||
if (!left.exists || !right.exists) {
|
||||
return true;
|
||||
}
|
||||
return (
|
||||
left.dev === right.dev &&
|
||||
left.ino === right.ino &&
|
||||
left.size === right.size &&
|
||||
left.mtimeNs === right.mtimeNs &&
|
||||
left.ctimeNs === right.ctimeNs
|
||||
);
|
||||
}
|
||||
|
||||
async function readSessionFileFingerprint(sessionFile: string): Promise<SessionFileFingerprint> {
|
||||
try {
|
||||
const stat = await fs.stat(sessionFile, { bigint: true });
|
||||
return {
|
||||
exists: true,
|
||||
dev: stat.dev,
|
||||
ino: stat.ino,
|
||||
size: stat.size,
|
||||
mtimeNs: stat.mtimeNs,
|
||||
ctimeNs: stat.ctimeNs,
|
||||
};
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
|
||||
return { exists: false };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForSessionEventQueue(session: unknown): Promise<void> {
|
||||
const owner = session as SessionEventQueueOwner;
|
||||
for (let attempts = 0; attempts < 5; attempts += 1) {
|
||||
const queue = owner?._agentEventQueue;
|
||||
if (!queue || typeof queue.then !== "function") {
|
||||
return;
|
||||
}
|
||||
await Promise.resolve(queue).catch(() => {});
|
||||
if (owner?._agentEventQueue === queue) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const queue = owner?._agentEventQueue;
|
||||
if (queue && typeof queue.then === "function") {
|
||||
await Promise.resolve(queue).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
export class EmbeddedAttemptSessionTakeoverError extends Error {
|
||||
constructor(sessionFile: string) {
|
||||
super(`session file changed while embedded prompt lock was released: ${sessionFile}`);
|
||||
this.name = "EmbeddedAttemptSessionTakeoverError";
|
||||
}
|
||||
}
|
||||
|
||||
export function installSessionEventWriteLock(params: {
|
||||
session: unknown;
|
||||
withSessionWriteLock: <T>(run: () => Promise<T> | T) => Promise<T>;
|
||||
}): void {
|
||||
const session = params.session as SessionEventProcessor;
|
||||
const original = session._processAgentEvent;
|
||||
if (typeof original !== "function" || session.__openclawSessionEventWriteLockInstalled === true) {
|
||||
return;
|
||||
}
|
||||
session.__openclawSessionEventWriteLockInstalled = true;
|
||||
session._processAgentEvent = async function lockedProcessAgentEvent(
|
||||
this: unknown,
|
||||
event: unknown,
|
||||
) {
|
||||
if (!eventMayReachTranscriptWriters(session, event)) {
|
||||
return await original.call(this, event);
|
||||
}
|
||||
return await params.withSessionWriteLock(async () => await original.call(this, event));
|
||||
};
|
||||
}
|
||||
|
||||
export function installSessionExternalHookWriteLock(params: {
|
||||
session: unknown;
|
||||
withSessionWriteLock: <T>(run: () => Promise<T> | T) => Promise<T>;
|
||||
}): void {
|
||||
const session = params.session as SessionWithExternalHooks;
|
||||
const agent = session.agent;
|
||||
if (agent) {
|
||||
installLockableFunction({
|
||||
owner: agent as Record<string, unknown>,
|
||||
key: "beforeToolCall",
|
||||
shouldLock: () => true,
|
||||
waitBeforeLock: () => waitForSessionEventQueue(session),
|
||||
withSessionWriteLock: params.withSessionWriteLock,
|
||||
});
|
||||
installLockableFunction({
|
||||
owner: agent as Record<string, unknown>,
|
||||
key: "afterToolCall",
|
||||
shouldLock: () => sessionHasExtensionHandlers(session, "tool_result"),
|
||||
waitBeforeLock: () => waitForSessionEventQueue(session),
|
||||
withSessionWriteLock: params.withSessionWriteLock,
|
||||
});
|
||||
installLockableFunction({
|
||||
owner: agent as Record<string, unknown>,
|
||||
key: "onPayload",
|
||||
shouldLock: () => sessionHasExtensionHandlers(session, "before_provider_request"),
|
||||
waitBeforeLock: () => waitForSessionEventQueue(session),
|
||||
withSessionWriteLock: params.withSessionWriteLock,
|
||||
});
|
||||
installLockableFunction({
|
||||
owner: agent as Record<string, unknown>,
|
||||
key: "onResponse",
|
||||
shouldLock: () => sessionHasExtensionHandlers(session, "after_provider_response"),
|
||||
waitBeforeLock: () => waitForSessionEventQueue(session),
|
||||
withSessionWriteLock: params.withSessionWriteLock,
|
||||
});
|
||||
}
|
||||
installLockableFunction({
|
||||
owner: session as Record<string, unknown>,
|
||||
key: "compact",
|
||||
shouldLock: () => true,
|
||||
waitBeforeLock: () => waitForSessionEventQueue(session),
|
||||
withSessionWriteLock: params.withSessionWriteLock,
|
||||
});
|
||||
}
|
||||
|
||||
export type EmbeddedAttemptSessionLockController = {
|
||||
releaseForPrompt(): Promise<void>;
|
||||
waitForSessionEvents(session: unknown): Promise<void>;
|
||||
withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T>;
|
||||
acquireForCleanup(params?: { session?: unknown }): Promise<SessionLock>;
|
||||
hasSessionTakeover(): boolean;
|
||||
};
|
||||
|
||||
export async function createEmbeddedAttemptSessionLockController(params: {
|
||||
acquireSessionWriteLock: AcquireSessionWriteLock;
|
||||
lockOptions: LockOptions;
|
||||
}): Promise<EmbeddedAttemptSessionLockController> {
|
||||
const acquireLock = async (): Promise<SessionLock> =>
|
||||
await params.acquireSessionWriteLock({
|
||||
sessionFile: params.lockOptions.sessionFile,
|
||||
timeoutMs: params.lockOptions.timeoutMs,
|
||||
staleMs: params.lockOptions.staleMs,
|
||||
maxHoldMs: params.lockOptions.maxHoldMs,
|
||||
});
|
||||
|
||||
let heldLock: SessionLock | undefined = await acquireLock();
|
||||
const activeWriteLock = new AsyncLocalStorage<SessionLock>();
|
||||
let fenceFingerprint: SessionFileFingerprint | undefined;
|
||||
let fenceActive = false;
|
||||
let takeoverDetected = false;
|
||||
|
||||
async function acquireWriteLock(): Promise<{ lock: SessionLock; owned: boolean }> {
|
||||
if (heldLock) {
|
||||
return { lock: heldLock, owned: false };
|
||||
}
|
||||
try {
|
||||
return { lock: await acquireLock(), owned: true };
|
||||
} catch (err) {
|
||||
if (isSessionWriteLockTimeoutError(err)) {
|
||||
takeoverDetected = true;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function assertSessionFileFence(): Promise<void> {
|
||||
if (!fenceActive) {
|
||||
return;
|
||||
}
|
||||
const current = await readSessionFileFingerprint(params.lockOptions.sessionFile);
|
||||
if (!sameSessionFileFingerprint(fenceFingerprint, current)) {
|
||||
takeoverDetected = true;
|
||||
throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
async function refreshSessionFileFence(): Promise<void> {
|
||||
if (fenceActive && !takeoverDetected) {
|
||||
fenceFingerprint = await readSessionFileFingerprint(params.lockOptions.sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
const noopLock: SessionLock = { release: async () => {} };
|
||||
|
||||
return {
|
||||
async releaseForPrompt(): Promise<void> {
|
||||
if (!heldLock) {
|
||||
return;
|
||||
}
|
||||
const lock = heldLock;
|
||||
heldLock = undefined;
|
||||
fenceFingerprint = await readSessionFileFingerprint(params.lockOptions.sessionFile);
|
||||
fenceActive = true;
|
||||
await lock.release();
|
||||
},
|
||||
waitForSessionEvents: waitForSessionEventQueue,
|
||||
async withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T> {
|
||||
if (takeoverDetected) {
|
||||
throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile);
|
||||
}
|
||||
if (activeWriteLock.getStore()) {
|
||||
return await run();
|
||||
}
|
||||
const { lock, owned } = await acquireWriteLock();
|
||||
try {
|
||||
await assertSessionFileFence();
|
||||
const runWithLock = async () => {
|
||||
const result = await run();
|
||||
await refreshSessionFileFence();
|
||||
return result;
|
||||
};
|
||||
if (owned) {
|
||||
return await activeWriteLock.run(lock, runWithLock);
|
||||
}
|
||||
return await runWithLock();
|
||||
} finally {
|
||||
if (owned) {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
},
|
||||
async acquireForCleanup(cleanupParams?: { session?: unknown }): Promise<SessionLock> {
|
||||
if (cleanupParams?.session) {
|
||||
await waitForSessionEventQueue(cleanupParams.session);
|
||||
}
|
||||
if (takeoverDetected) {
|
||||
return noopLock;
|
||||
}
|
||||
try {
|
||||
heldLock ??= await acquireLock();
|
||||
} catch (err) {
|
||||
if (isSessionWriteLockTimeoutError(err)) {
|
||||
takeoverDetected = true;
|
||||
return noopLock;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
const cleanupLock = heldLock;
|
||||
heldLock = undefined;
|
||||
try {
|
||||
await assertSessionFileFence();
|
||||
} catch (err) {
|
||||
await cleanupLock.release();
|
||||
if (err instanceof EmbeddedAttemptSessionTakeoverError) {
|
||||
return noopLock;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
return cleanupLock;
|
||||
},
|
||||
hasSessionTakeover(): boolean {
|
||||
return takeoverDetected;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function installPromptSubmissionLockRelease(params: {
|
||||
session: unknown;
|
||||
waitForSessionEvents: (session: unknown) => Promise<void>;
|
||||
releaseForPrompt: () => Promise<void>;
|
||||
}): void {
|
||||
const agent = (params.session as SessionWithAgentPrompt).agent;
|
||||
if (typeof agent?.streamFn !== "function") {
|
||||
return;
|
||||
}
|
||||
const currentStreamFn = agent.streamFn;
|
||||
if (currentStreamFn.__openclawSessionLockPromptReleaseInstalled === true) {
|
||||
return;
|
||||
}
|
||||
const originalStreamFn = currentStreamFn.bind(agent);
|
||||
const wrappedStreamFn: PromptReleaseStreamFn = async (...args: unknown[]) => {
|
||||
await params.waitForSessionEvents(params.session);
|
||||
await params.releaseForPrompt();
|
||||
return await originalStreamFn(...args);
|
||||
};
|
||||
wrappedStreamFn.__openclawSessionLockPromptReleaseInstalled = true;
|
||||
agent.streamFn = wrappedStreamFn;
|
||||
}
|
||||
@@ -96,6 +96,26 @@ function expectFields(actual: Record<string, unknown>, expected: Record<string,
|
||||
}
|
||||
}
|
||||
|
||||
function trackSessionWriteLocks(): string[] {
|
||||
const events: string[] = [];
|
||||
hoisted.acquireSessionWriteLockMock.mockImplementation(async () => {
|
||||
const lockId = hoisted.acquireSessionWriteLockMock.mock.calls.length;
|
||||
events.push(`acquire-${lockId}`);
|
||||
return {
|
||||
release: async () => {
|
||||
events.push(`release-${lockId}`);
|
||||
},
|
||||
};
|
||||
});
|
||||
return events;
|
||||
}
|
||||
|
||||
function expectInitialLockReleasedBeforePostTurnWrite(events: string[]) {
|
||||
expect(events.indexOf("release-1")).toBeGreaterThan(events.indexOf("acquire-1"));
|
||||
expect(events.indexOf("acquire-2")).toBeGreaterThan(events.indexOf("release-1"));
|
||||
expect(events.indexOf("release-2")).toBeGreaterThan(events.indexOf("acquire-2"));
|
||||
}
|
||||
|
||||
function createTestContextEngine(params: Partial<AttemptContextEngine>): AttemptContextEngine {
|
||||
return {
|
||||
info: {
|
||||
@@ -773,6 +793,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
|
||||
});
|
||||
|
||||
it("skips blank visible prompts with replay history before provider submission", async () => {
|
||||
const lockEvents = trackSessionWriteLocks();
|
||||
const sessionPrompt = vi.fn(async () => {
|
||||
throw new Error("blank prompt should not be submitted");
|
||||
});
|
||||
@@ -809,6 +830,35 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
|
||||
"prompt skipped event",
|
||||
);
|
||||
expect(requireRecord(skipped.data, "prompt skipped data").reason).toBe("blank_user_prompt");
|
||||
expectInitialLockReleasedBeforePostTurnWrite(lockEvents);
|
||||
});
|
||||
|
||||
it("releases the initial session lock before before_agent_run block finalizers", async () => {
|
||||
const lockEvents = trackSessionWriteLocks();
|
||||
const sessionPrompt = vi.fn(async () => {
|
||||
throw new Error("blocked prompt should not be submitted");
|
||||
});
|
||||
const runBeforeAgentRun = vi.fn(async () => ({
|
||||
pluginId: "test-policy",
|
||||
decision: { outcome: "block", reason: "Blocked by test policy." },
|
||||
}));
|
||||
hoisted.getGlobalHookRunnerMock.mockReturnValue({
|
||||
hasHooks: vi.fn((name: string) => name === "before_agent_run"),
|
||||
runBeforeAgentRun,
|
||||
});
|
||||
|
||||
const result = await createContextEngineAttemptRunner({
|
||||
contextEngine: createContextEngineBootstrapAndAssemble(),
|
||||
sessionKey,
|
||||
tempPaths,
|
||||
sessionPrompt,
|
||||
});
|
||||
|
||||
expect(runBeforeAgentRun).toHaveBeenCalledTimes(1);
|
||||
expect(sessionPrompt).not.toHaveBeenCalled();
|
||||
expect(result.finalPromptText).toBeUndefined();
|
||||
expect(result.promptErrorSource).toBe("hook:before_agent_run");
|
||||
expectInitialLockReleasedBeforePostTurnWrite(lockEvents);
|
||||
});
|
||||
|
||||
it("uses assembled context as the default precheck authority", async () => {
|
||||
@@ -846,6 +896,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
|
||||
});
|
||||
|
||||
it("honors context engines that opt into preassembly overflow authority", async () => {
|
||||
const lockEvents = trackSessionWriteLocks();
|
||||
let sawPrompt = false;
|
||||
const hugeHistory = "large raw history ".repeat(2_000);
|
||||
|
||||
@@ -878,6 +929,7 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
|
||||
expect(result.promptErrorSource).toBe("precheck");
|
||||
expect(result.preflightRecovery?.route).toBe("compact_only");
|
||||
expect(hoisted.preemptiveCompactionCalls.at(-1)).toHaveProperty("unwindowedMessages");
|
||||
expectInitialLockReleasedBeforePostTurnWrite(lockEvents);
|
||||
});
|
||||
|
||||
it("snapshots pre-assembly messages before assemble even when the engine windows in place", async () => {
|
||||
|
||||
@@ -441,6 +441,7 @@ vi.mock("../../session-write-lock.js", () => ({
|
||||
acquireSessionWriteLock: (params: Parameters<AcquireSessionWriteLockFn>[0]) =>
|
||||
hoisted.acquireSessionWriteLockMock(params),
|
||||
resolveSessionWriteLockAcquireTimeoutMs: () => 60000,
|
||||
resolveSessionWriteLockOptions: () => ({ timeoutMs: 60000, staleMs: 1_800_000, maxHoldMs: 1 }),
|
||||
resolveSessionLockMaxHoldFromTimeout: () => 1,
|
||||
}));
|
||||
|
||||
@@ -811,7 +812,8 @@ export type MutableSession = {
|
||||
isCompacting: boolean;
|
||||
isStreaming: boolean;
|
||||
agent: {
|
||||
streamFn?: unknown;
|
||||
prompt?: (...args: unknown[]) => Promise<unknown>;
|
||||
streamFn?: (...args: unknown[]) => Promise<unknown>;
|
||||
transport?: string;
|
||||
reset: () => void;
|
||||
state: {
|
||||
@@ -841,6 +843,22 @@ type SessionPromptOverride = (
|
||||
options?: { images?: unknown[] },
|
||||
) => Promise<void>;
|
||||
|
||||
type TestAgentStream = {
|
||||
result: () => Promise<unknown>;
|
||||
[Symbol.asyncIterator]: () => AsyncIterator<unknown>;
|
||||
};
|
||||
|
||||
function createCompletedAssistantStream(): TestAgentStream {
|
||||
return {
|
||||
async result() {
|
||||
return { role: "assistant", content: "done" };
|
||||
},
|
||||
[Symbol.asyncIterator]() {
|
||||
return (async function* () {})();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
let runEmbeddedAttemptPromise:
|
||||
| Promise<typeof import("./attempt.js").runEmbeddedAttempt>
|
||||
| undefined;
|
||||
@@ -967,12 +985,30 @@ export function createDefaultEmbeddedSession(params?: {
|
||||
options?: { images?: unknown[] },
|
||||
) => Promise<void>;
|
||||
}): MutableSession {
|
||||
let pendingPrompt:
|
||||
| {
|
||||
prompt: string;
|
||||
options?: { images?: unknown[] };
|
||||
}
|
||||
| undefined;
|
||||
const session: MutableSession = {
|
||||
sessionId: "embedded-session",
|
||||
messages: [...(params?.initialMessages ?? [])],
|
||||
isCompacting: false,
|
||||
isStreaming: false,
|
||||
agent: {
|
||||
prompt: async (prompt, options) => {
|
||||
pendingPrompt = { prompt: String(prompt), options: options as { images?: unknown[] } };
|
||||
await session.agent.streamFn?.();
|
||||
},
|
||||
streamFn: async () => {
|
||||
if (params?.prompt && pendingPrompt) {
|
||||
const currentPrompt = pendingPrompt;
|
||||
pendingPrompt = undefined;
|
||||
await params.prompt(session, currentPrompt.prompt, currentPrompt.options);
|
||||
}
|
||||
return createCompletedAssistantStream();
|
||||
},
|
||||
reset: () => {
|
||||
session.messages = [];
|
||||
},
|
||||
@@ -987,8 +1023,8 @@ export function createDefaultEmbeddedSession(params?: {
|
||||
},
|
||||
setActiveToolsByName: () => {},
|
||||
prompt: async (prompt, options) => {
|
||||
await session.agent.prompt?.(prompt, options);
|
||||
if (params?.prompt) {
|
||||
await params.prompt(session, prompt, options);
|
||||
return;
|
||||
}
|
||||
session.messages = [
|
||||
|
||||
@@ -115,4 +115,25 @@ describe("cleanupEmbeddedAttemptResources", () => {
|
||||
|
||||
expect(release).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("can skip stale session-manager flushing after session takeover", async () => {
|
||||
const flushPendingToolResultsAfterIdle = vi.fn(async () => {});
|
||||
const dispose = vi.fn();
|
||||
const release = vi.fn(async () => {});
|
||||
|
||||
await cleanupEmbeddedAttemptResources({
|
||||
flushPendingToolResultsAfterIdle,
|
||||
session: {
|
||||
agent: {},
|
||||
dispose,
|
||||
},
|
||||
sessionManager: {},
|
||||
sessionLock: { release },
|
||||
skipSessionFlush: true,
|
||||
});
|
||||
|
||||
expect(flushPendingToolResultsAfterIdle).not.toHaveBeenCalled();
|
||||
expect(dispose).toHaveBeenCalledTimes(1);
|
||||
expect(release).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -66,6 +66,7 @@ export async function cleanupEmbeddedAttemptResources(params: {
|
||||
sessionLock: { release(): Promise<void> | void };
|
||||
aborted?: boolean;
|
||||
abortSettlePromise?: Promise<unknown> | null;
|
||||
skipSessionFlush?: boolean;
|
||||
runId?: string;
|
||||
sessionId?: string;
|
||||
}): Promise<void> {
|
||||
@@ -85,14 +86,16 @@ export async function cleanupEmbeddedAttemptResources(params: {
|
||||
// PERF: When the run was aborted (user stop / timeout), skip the expensive
|
||||
// waitForIdle (up to 30 s) and flush pending tool results synchronously so
|
||||
// the session write-lock is released without leaving orphaned tool calls.
|
||||
try {
|
||||
await params.flushPendingToolResultsAfterIdle({
|
||||
agent: params.session?.agent as IdleAwareAgent | null | undefined,
|
||||
sessionManager: params.sessionManager as ToolResultFlushManager | null | undefined,
|
||||
...(params.aborted ? { timeoutMs: 0 } : {}),
|
||||
});
|
||||
} catch {
|
||||
/* best-effort */
|
||||
if (!params.skipSessionFlush) {
|
||||
try {
|
||||
await params.flushPendingToolResultsAfterIdle({
|
||||
agent: params.session?.agent as IdleAwareAgent | null | undefined,
|
||||
sessionManager: params.sessionManager as ToolResultFlushManager | null | undefined,
|
||||
...(params.aborted ? { timeoutMs: 0 } : {}),
|
||||
});
|
||||
} catch {
|
||||
/* best-effort */
|
||||
}
|
||||
}
|
||||
try {
|
||||
params.session?.dispose();
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { AgentMessage } from "@earendil-works/pi-agent-core";
|
||||
import type { AssistantMessage } from "@earendil-works/pi-ai";
|
||||
import { createAgentSession, SessionManager } from "@earendil-works/pi-coding-agent";
|
||||
import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js";
|
||||
import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js";
|
||||
@@ -157,7 +158,7 @@ import {
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} from "../../session-write-lock.js";
|
||||
import { detectRuntimeShell } from "../../shell-utils.js";
|
||||
import {
|
||||
@@ -220,6 +221,7 @@ import {
|
||||
collectPromptCacheToolNames,
|
||||
beginPromptCacheObservation,
|
||||
completePromptCacheObservation,
|
||||
type PromptCacheBreak,
|
||||
type PromptCacheChange,
|
||||
} from "../prompt-cache-observability.js";
|
||||
import { resolveCacheRetention } from "../prompt-cache-retention.js";
|
||||
@@ -319,6 +321,12 @@ import {
|
||||
shouldWarnOnOrphanedUserRepair,
|
||||
shouldInjectHeartbeatPrompt,
|
||||
} from "./attempt.prompt-helpers.js";
|
||||
import {
|
||||
createEmbeddedAttemptSessionLockController,
|
||||
installPromptSubmissionLockRelease,
|
||||
installSessionExternalHookWriteLock,
|
||||
installSessionEventWriteLock,
|
||||
} from "./attempt.session-lock.js";
|
||||
import {
|
||||
createYieldAbortedResponse,
|
||||
persistSessionsYieldContextMessage,
|
||||
@@ -2010,19 +2018,21 @@ export async function runEmbeddedAttempt(
|
||||
let systemPromptText = systemPromptOverride();
|
||||
prepStages.mark("system-prompt");
|
||||
|
||||
// Keep the session lock scoped to transcript/session mutations. Cold plugin
|
||||
// and tool setup can be slow, and holding the lock there blocks CLI fallback
|
||||
// from taking over the same session when a gateway run stalls before model I/O.
|
||||
const sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
maxHoldMs: resolveSessionLockMaxHoldFromTimeout({
|
||||
const sessionWriteLockOptions = resolveSessionWriteLockOptions(params.config, {
|
||||
maxHoldMsFallback: resolveSessionLockMaxHoldFromTimeout({
|
||||
timeoutMs: resolveRunTimeoutWithCompactionGraceMs({
|
||||
runTimeoutMs: params.timeoutMs,
|
||||
compactionTimeoutMs: resolveCompactionTimeoutMs(params.config),
|
||||
}),
|
||||
}),
|
||||
});
|
||||
const sessionLockController = await createEmbeddedAttemptSessionLockController({
|
||||
acquireSessionWriteLock,
|
||||
lockOptions: {
|
||||
sessionFile: params.sessionFile,
|
||||
...sessionWriteLockOptions,
|
||||
},
|
||||
});
|
||||
|
||||
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
|
||||
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
|
||||
@@ -2327,6 +2337,14 @@ export async function runEmbeddedAttempt(
|
||||
}
|
||||
session.setActiveToolsByName(sessionToolAllowlist);
|
||||
const activeSession = session;
|
||||
installSessionEventWriteLock({
|
||||
session: activeSession,
|
||||
withSessionWriteLock: (operation) => sessionLockController.withSessionWriteLock(operation),
|
||||
});
|
||||
installSessionExternalHookWriteLock({
|
||||
session: activeSession,
|
||||
withSessionWriteLock: (operation) => sessionLockController.withSessionWriteLock(operation),
|
||||
});
|
||||
prepStages.mark("agent-session");
|
||||
if (isRawModelRun) {
|
||||
// Raw model probes should measure exactly the requested prompt against
|
||||
@@ -3224,11 +3242,13 @@ export async function runEmbeddedAttempt(
|
||||
},
|
||||
abort: abortRun,
|
||||
};
|
||||
let lastAssistant: AgentMessage | undefined;
|
||||
let lastAssistant: AssistantMessage | undefined;
|
||||
let currentAttemptAssistant: EmbeddedRunAttemptResult["currentAttemptAssistant"];
|
||||
let attemptUsage: NormalizedUsage | undefined;
|
||||
let cacheBreak: ReturnType<typeof completePromptCacheObservation> = null;
|
||||
let cacheBreak: PromptCacheBreak | null = null;
|
||||
let promptCache: EmbeddedRunAttemptResult["promptCache"];
|
||||
let lastCallUsage: NormalizedUsage | undefined;
|
||||
let compactionOccurredThisAttempt = false;
|
||||
let finalPromptText: string | undefined;
|
||||
if (params.replyOperation) {
|
||||
params.replyOperation.attachBackend(queueHandle);
|
||||
@@ -3724,7 +3744,14 @@ export async function runEmbeddedAttempt(
|
||||
model: params.model,
|
||||
modelId: params.modelId,
|
||||
provider: params.provider,
|
||||
sessionManager,
|
||||
sessionManager: {
|
||||
appendCustomEntry: async (customType, data) => {
|
||||
await sessionLockController.withSessionWriteLock(() => {
|
||||
activeSessionManager.appendCustomEntry(customType, data);
|
||||
});
|
||||
},
|
||||
getEntries: () => activeSessionManager.getEntries(),
|
||||
},
|
||||
signal: runAbortController.signal,
|
||||
streamFn: activeSession.agent.streamFn,
|
||||
systemPrompt: systemPromptText,
|
||||
@@ -3732,6 +3759,12 @@ export async function runEmbeddedAttempt(
|
||||
if (googlePromptCacheStreamFn) {
|
||||
activeSession.agent.streamFn = googlePromptCacheStreamFn;
|
||||
}
|
||||
installPromptSubmissionLockRelease({
|
||||
session: activeSession,
|
||||
waitForSessionEvents: (sessionToDrain) =>
|
||||
sessionLockController.waitForSessionEvents(sessionToDrain),
|
||||
releaseForPrompt: () => sessionLockController.releaseForPrompt(),
|
||||
});
|
||||
}
|
||||
|
||||
// Detect and load images referenced in the visible prompt for vision-capable models.
|
||||
@@ -4027,12 +4060,18 @@ export async function runEmbeddedAttempt(
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
});
|
||||
stripSessionsYieldArtifacts(activeSession);
|
||||
if (yieldMessage) {
|
||||
await persistSessionsYieldContextMessage(activeSession, yieldMessage);
|
||||
}
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.withSessionWriteLock(async () => {
|
||||
stripSessionsYieldArtifacts(activeSession);
|
||||
if (yieldMessage) {
|
||||
await persistSessionsYieldContextMessage(activeSession, yieldMessage);
|
||||
}
|
||||
});
|
||||
} else if (isMidTurnPrecheckSignal(err)) {
|
||||
handleMidTurnPrecheckRequest(err.request);
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.withSessionWriteLock(() => {
|
||||
handleMidTurnPrecheckRequest(err.request);
|
||||
});
|
||||
} else {
|
||||
promptError = err;
|
||||
promptErrorSource = "prompt";
|
||||
@@ -4046,17 +4085,23 @@ export async function runEmbeddedAttempt(
|
||||
if (pendingMidTurnPrecheckRequest) {
|
||||
const request = pendingMidTurnPrecheckRequest;
|
||||
pendingMidTurnPrecheckRequest = null;
|
||||
removeTrailingMidTurnPrecheckAssistantError({
|
||||
activeSession,
|
||||
sessionManager,
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.withSessionWriteLock(() => {
|
||||
removeTrailingMidTurnPrecheckAssistantError({
|
||||
activeSession,
|
||||
sessionManager: activeSessionManager,
|
||||
});
|
||||
if (!preflightRecovery && promptErrorSource !== "precheck") {
|
||||
promptError = null;
|
||||
promptErrorSource = null;
|
||||
handleMidTurnPrecheckRequest(request);
|
||||
}
|
||||
});
|
||||
if (!preflightRecovery && promptErrorSource !== "precheck") {
|
||||
promptError = null;
|
||||
promptErrorSource = null;
|
||||
handleMidTurnPrecheckRequest(request);
|
||||
}
|
||||
}
|
||||
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.releaseForPrompt();
|
||||
|
||||
// Capture snapshot before compaction wait so we have complete messages if timeout occurs
|
||||
// Check compaction state before and after to avoid race condition where compaction starts during capture
|
||||
// Use session state (not subscription) for snapshot decisions - need instantaneous compaction status
|
||||
@@ -4112,117 +4157,122 @@ export async function runEmbeddedAttempt(
|
||||
}
|
||||
}
|
||||
|
||||
// Check if ANY compaction occurred during the entire attempt (prompt + retry).
|
||||
// Using a cumulative count (> 0) instead of a delta check avoids missing
|
||||
// compactions that complete during activeSession.prompt() before the delta
|
||||
// baseline is sampled.
|
||||
const compactionOccurredThisAttempt = getCompactionCount() > 0;
|
||||
// Append cache-TTL timestamp AFTER prompt + compaction retry completes.
|
||||
// Previously this was before the prompt, which caused a custom entry to be
|
||||
// inserted between compaction and the next prompt — breaking the
|
||||
// prepareCompaction() guard that checks the last entry type, leading to
|
||||
// double-compaction. See: https://github.com/openclaw/openclaw/issues/9282
|
||||
// Skip when timed out during compaction — session state may be inconsistent.
|
||||
// Also skip when compaction ran this attempt — appending a custom entry
|
||||
// after compaction would break the guard again. See: #28491
|
||||
appendAttemptCacheTtlIfNeeded({
|
||||
sessionManager,
|
||||
timedOutDuringCompaction,
|
||||
compactionOccurredThisAttempt,
|
||||
config: params.config,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.model.api,
|
||||
isCacheTtlEligibleProvider,
|
||||
});
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.withSessionWriteLock(async () => {
|
||||
// Check if ANY compaction occurred during the entire attempt (prompt + retry).
|
||||
// Using a cumulative count (> 0) instead of a delta check avoids missing
|
||||
// compactions that complete during activeSession.prompt() before the delta
|
||||
// baseline is sampled.
|
||||
compactionOccurredThisAttempt = getCompactionCount() > 0;
|
||||
// Append cache-TTL timestamp AFTER prompt + compaction retry completes.
|
||||
// Previously this was before the prompt, which caused a custom entry to be
|
||||
// inserted between compaction and the next prompt — breaking the
|
||||
// prepareCompaction() guard that checks the last entry type, leading to
|
||||
// double-compaction. See: https://github.com/openclaw/openclaw/issues/9282
|
||||
// Skip when timed out during compaction — session state may be inconsistent.
|
||||
// Also skip when compaction ran this attempt — appending a custom entry
|
||||
// after compaction would break the guard again. See: #28491
|
||||
appendAttemptCacheTtlIfNeeded({
|
||||
sessionManager: activeSessionManager,
|
||||
timedOutDuringCompaction,
|
||||
compactionOccurredThisAttempt,
|
||||
config: params.config,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.model.api,
|
||||
isCacheTtlEligibleProvider,
|
||||
});
|
||||
|
||||
// If timeout occurred during compaction, use pre-compaction snapshot when available
|
||||
// (compaction restructures messages but does not add user/assistant turns).
|
||||
const snapshotSelection = selectCompactionTimeoutSnapshot({
|
||||
timedOutDuringCompaction,
|
||||
preCompactionSnapshot,
|
||||
preCompactionSessionId,
|
||||
currentSnapshot: activeSession.messages.slice(),
|
||||
currentSessionId: activeSession.sessionId,
|
||||
});
|
||||
if (timedOutDuringCompaction) {
|
||||
if (!isProbeSession) {
|
||||
log.warn(
|
||||
`using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
// If timeout occurred during compaction, use pre-compaction snapshot when available
|
||||
// (compaction restructures messages but does not add user/assistant turns).
|
||||
const snapshotSelection = selectCompactionTimeoutSnapshot({
|
||||
timedOutDuringCompaction,
|
||||
preCompactionSnapshot,
|
||||
preCompactionSessionId,
|
||||
currentSnapshot: activeSession.messages.slice(),
|
||||
currentSessionId: activeSession.sessionId,
|
||||
});
|
||||
if (timedOutDuringCompaction) {
|
||||
if (!isProbeSession) {
|
||||
log.warn(
|
||||
`using ${snapshotSelection.source} snapshot: timed out during compaction runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
messagesSnapshot = projectToolSearchTargetTranscriptMessages(
|
||||
snapshotSelection.messagesSnapshot,
|
||||
toolSearchTargetTranscriptProjections,
|
||||
);
|
||||
sessionIdUsed = snapshotSelection.sessionIdUsed;
|
||||
messagesSnapshot = projectToolSearchTargetTranscriptMessages(
|
||||
snapshotSelection.messagesSnapshot,
|
||||
toolSearchTargetTranscriptProjections,
|
||||
);
|
||||
sessionIdUsed = snapshotSelection.sessionIdUsed;
|
||||
|
||||
lastAssistant = messagesSnapshot
|
||||
.slice()
|
||||
.toReversed()
|
||||
.find((m) => m.role === "assistant");
|
||||
currentAttemptAssistant = findCurrentAttemptAssistantMessage({
|
||||
messagesSnapshot,
|
||||
prePromptMessageCount,
|
||||
});
|
||||
attemptUsage = getUsageTotals();
|
||||
cacheBreak = cacheObservabilityEnabled
|
||||
? completePromptCacheObservation({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
usage: attemptUsage,
|
||||
})
|
||||
: null;
|
||||
const lastCallUsage = normalizeUsage(currentAttemptAssistant?.usage);
|
||||
const promptCacheObservation =
|
||||
cacheObservabilityEnabled &&
|
||||
(cacheBreak || promptCacheChangesForTurn || typeof attemptUsage?.cacheRead === "number")
|
||||
? {
|
||||
broke: Boolean(cacheBreak),
|
||||
...(typeof cacheBreak?.previousCacheRead === "number"
|
||||
? { previousCacheRead: cacheBreak.previousCacheRead }
|
||||
: {}),
|
||||
...(typeof cacheBreak?.cacheRead === "number"
|
||||
? { cacheRead: cacheBreak.cacheRead }
|
||||
: typeof attemptUsage?.cacheRead === "number"
|
||||
? { cacheRead: attemptUsage.cacheRead }
|
||||
lastAssistant = messagesSnapshot
|
||||
.slice()
|
||||
.toReversed()
|
||||
.find((message): message is AssistantMessage => message.role === "assistant");
|
||||
currentAttemptAssistant = findCurrentAttemptAssistantMessage({
|
||||
messagesSnapshot,
|
||||
prePromptMessageCount,
|
||||
});
|
||||
attemptUsage = getUsageTotals();
|
||||
cacheBreak = cacheObservabilityEnabled
|
||||
? completePromptCacheObservation({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
usage: attemptUsage,
|
||||
})
|
||||
: null;
|
||||
lastCallUsage = normalizeUsage(currentAttemptAssistant?.usage);
|
||||
const promptCacheObservation =
|
||||
cacheObservabilityEnabled &&
|
||||
(cacheBreak || promptCacheChangesForTurn || typeof attemptUsage?.cacheRead === "number")
|
||||
? {
|
||||
broke: Boolean(cacheBreak),
|
||||
...(typeof cacheBreak?.previousCacheRead === "number"
|
||||
? { previousCacheRead: cacheBreak.previousCacheRead }
|
||||
: {}),
|
||||
changes: cacheBreak?.changes ?? promptCacheChangesForTurn,
|
||||
}
|
||||
: undefined;
|
||||
const fallbackLastCacheTouchAt = readLastCacheTtlTimestamp(sessionManager, {
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
promptCache = buildContextEnginePromptCacheInfo({
|
||||
retention: effectivePromptCacheRetention,
|
||||
lastCallUsage,
|
||||
observation: promptCacheObservation,
|
||||
lastCacheTouchAt: resolvePromptCacheTouchTimestamp({
|
||||
...(typeof cacheBreak?.cacheRead === "number"
|
||||
? { cacheRead: cacheBreak.cacheRead }
|
||||
: typeof attemptUsage?.cacheRead === "number"
|
||||
? { cacheRead: attemptUsage.cacheRead }
|
||||
: {}),
|
||||
changes: cacheBreak?.changes ?? promptCacheChangesForTurn,
|
||||
}
|
||||
: undefined;
|
||||
const fallbackLastCacheTouchAt = readLastCacheTtlTimestamp(activeSessionManager, {
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
});
|
||||
promptCache = buildContextEnginePromptCacheInfo({
|
||||
retention: effectivePromptCacheRetention,
|
||||
lastCallUsage,
|
||||
assistantTimestamp: currentAttemptAssistant?.timestamp,
|
||||
fallbackLastCacheTouchAt,
|
||||
}),
|
||||
observation: promptCacheObservation,
|
||||
lastCacheTouchAt: resolvePromptCacheTouchTimestamp({
|
||||
lastCallUsage,
|
||||
assistantTimestamp: currentAttemptAssistant?.timestamp,
|
||||
fallbackLastCacheTouchAt,
|
||||
}),
|
||||
});
|
||||
|
||||
if (promptError && promptErrorSource === "prompt" && !compactionOccurredThisAttempt) {
|
||||
try {
|
||||
activeSessionManager.appendCustomEntry("openclaw:prompt-error", {
|
||||
timestamp: Date.now(),
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
api: params.model.api,
|
||||
error: formatErrorMessage(promptError),
|
||||
});
|
||||
} catch (entryErr) {
|
||||
log.warn(`failed to persist prompt error entry: ${String(entryErr)}`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (promptError && promptErrorSource === "prompt" && !compactionOccurredThisAttempt) {
|
||||
try {
|
||||
sessionManager.appendCustomEntry("openclaw:prompt-error", {
|
||||
timestamp: Date.now(),
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
provider: params.provider,
|
||||
model: params.modelId,
|
||||
api: params.model.api,
|
||||
error: formatErrorMessage(promptError),
|
||||
});
|
||||
} catch (entryErr) {
|
||||
log.warn(`failed to persist prompt error entry: ${String(entryErr)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Let the active context engine run its post-turn lifecycle.
|
||||
// Let the active context engine run its post-turn lifecycle. These hooks
|
||||
// may call runtime LLM capabilities, so only their transcript rewrite
|
||||
// helper reacquires the session write lock.
|
||||
if (activeContextEngine) {
|
||||
const afterTurnRuntimeContext = buildAfterTurnRuntimeContextFromUsage({
|
||||
attempt: params,
|
||||
@@ -4254,64 +4304,69 @@ export async function runEmbeddedAttempt(
|
||||
sessionFile: contextParams.sessionFile,
|
||||
reason: contextParams.reason,
|
||||
sessionManager: contextParams.sessionManager as never,
|
||||
withSessionManagerRewriteLock: async (operation) =>
|
||||
await sessionLockController.withSessionWriteLock(operation),
|
||||
runtimeContext: contextParams.runtimeContext,
|
||||
config: params.config,
|
||||
agentId: sessionAgentId,
|
||||
}),
|
||||
sessionManager,
|
||||
sessionManager: activeSessionManager,
|
||||
config: params.config,
|
||||
warn: (message) => log.warn(message),
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
shouldPersistCompletedBootstrapTurn({
|
||||
shouldRecordCompletedBootstrapTurn,
|
||||
promptError,
|
||||
aborted,
|
||||
timedOutDuringCompaction,
|
||||
compactionOccurredThisAttempt,
|
||||
})
|
||||
) {
|
||||
try {
|
||||
sessionManager.appendCustomEntry(FULL_BOOTSTRAP_COMPLETED_CUSTOM_TYPE, {
|
||||
timestamp: Date.now(),
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
});
|
||||
} catch (entryErr) {
|
||||
log.warn(`failed to persist bootstrap completion entry: ${String(entryErr)}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
compactionOccurredThisAttempt &&
|
||||
!promptError &&
|
||||
!aborted &&
|
||||
!timedOut &&
|
||||
!idleTimedOut &&
|
||||
!timedOutDuringCompaction &&
|
||||
shouldRotateCompactionTranscript(params.config)
|
||||
) {
|
||||
try {
|
||||
const rotation = await rotateTranscriptAfterCompaction({
|
||||
sessionManager,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
if (rotation.rotated) {
|
||||
sessionIdUsed = rotation.sessionId ?? sessionIdUsed;
|
||||
sessionFileUsed = rotation.sessionFile ?? sessionFileUsed;
|
||||
log.info(
|
||||
`[compaction] rotated active transcript after automatic compaction ` +
|
||||
`(sessionKey=${params.sessionKey ?? params.sessionId})`,
|
||||
);
|
||||
await sessionLockController.waitForSessionEvents(activeSession);
|
||||
await sessionLockController.withSessionWriteLock(async () => {
|
||||
if (
|
||||
shouldPersistCompletedBootstrapTurn({
|
||||
shouldRecordCompletedBootstrapTurn,
|
||||
promptError,
|
||||
aborted,
|
||||
timedOutDuringCompaction,
|
||||
compactionOccurredThisAttempt,
|
||||
})
|
||||
) {
|
||||
try {
|
||||
activeSessionManager.appendCustomEntry(FULL_BOOTSTRAP_COMPLETED_CUSTOM_TYPE, {
|
||||
timestamp: Date.now(),
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
});
|
||||
} catch (entryErr) {
|
||||
log.warn(`failed to persist bootstrap completion entry: ${String(entryErr)}`);
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("[compaction] automatic transcript rotation failed", {
|
||||
errorMessage: formatErrorMessage(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
compactionOccurredThisAttempt &&
|
||||
!promptError &&
|
||||
!aborted &&
|
||||
!timedOut &&
|
||||
!idleTimedOut &&
|
||||
!timedOutDuringCompaction &&
|
||||
shouldRotateCompactionTranscript(params.config)
|
||||
) {
|
||||
try {
|
||||
const rotation = await rotateTranscriptAfterCompaction({
|
||||
sessionManager: activeSessionManager,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
if (rotation.rotated) {
|
||||
sessionIdUsed = rotation.sessionId ?? sessionIdUsed;
|
||||
sessionFileUsed = rotation.sessionFile ?? sessionFileUsed;
|
||||
log.info(
|
||||
`[compaction] rotated active transcript after automatic compaction ` +
|
||||
`(sessionKey=${params.sessionKey ?? params.sessionId})`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn("[compaction] automatic transcript rotation failed", {
|
||||
errorMessage: formatErrorMessage(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
cacheTrace?.recordStage("session:after", {
|
||||
messages: messagesSnapshot,
|
||||
@@ -4384,20 +4439,22 @@ export async function runEmbeddedAttempt(
|
||||
)
|
||||
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
|
||||
if (cacheObservabilityEnabled) {
|
||||
if (cacheBreak) {
|
||||
const cacheBreakForLog = cacheBreak as PromptCacheBreak | null;
|
||||
if (cacheBreakForLog) {
|
||||
const changeSummary =
|
||||
cacheBreak.changes?.map((change) => `${change.code}(${change.detail})`).join(", ") ??
|
||||
"no tracked cache input change";
|
||||
cacheBreakForLog.changes
|
||||
?.map((change) => `${change.code}(${change.detail})`)
|
||||
.join(", ") ?? "no tracked cache input change";
|
||||
log.warn(
|
||||
`[prompt-cache] cache read dropped ${cacheBreak.previousCacheRead} -> ${cacheBreak.cacheRead} ` +
|
||||
`[prompt-cache] cache read dropped ${cacheBreakForLog.previousCacheRead} -> ${cacheBreakForLog.cacheRead} ` +
|
||||
`for ${params.provider}/${params.modelId} via ${streamStrategy}; ${changeSummary}`,
|
||||
);
|
||||
cacheTrace?.recordStage("cache:result", {
|
||||
options: {
|
||||
previousCacheRead: cacheBreak.previousCacheRead,
|
||||
cacheRead: cacheBreak.cacheRead,
|
||||
previousCacheRead: cacheBreakForLog.previousCacheRead,
|
||||
cacheRead: cacheBreakForLog.cacheRead,
|
||||
changes:
|
||||
cacheBreak.changes?.map((change) => ({
|
||||
cacheBreakForLog.changes?.map((change) => ({
|
||||
code: change.code,
|
||||
detail: change.detail,
|
||||
})) ?? undefined,
|
||||
@@ -4727,6 +4784,7 @@ export async function runEmbeddedAttempt(
|
||||
timedOut ||
|
||||
idleTimedOut ||
|
||||
timedOutDuringCompaction;
|
||||
const cleanupSessionLock = await sessionLockController.acquireForCleanup({ session });
|
||||
await cleanupEmbeddedAttemptResources({
|
||||
removeToolResultContextGuard,
|
||||
flushPendingToolResultsAfterIdle,
|
||||
@@ -4734,11 +4792,12 @@ export async function runEmbeddedAttempt(
|
||||
sessionManager,
|
||||
bundleMcpRuntime,
|
||||
bundleLspRuntime,
|
||||
sessionLock,
|
||||
sessionLock: cleanupSessionLock,
|
||||
// PERF: If the run was aborted (user stop, timeout, etc.), skip the idle wait
|
||||
// and flush pending results synchronously so we can release the session lock ASAP.
|
||||
aborted: cleanupAborted,
|
||||
abortSettlePromise: cleanupAborted ? buildAbortSettlePromise() : null,
|
||||
skipSessionFlush: sessionLockController.hasSessionTakeover(),
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
});
|
||||
|
||||
@@ -9,7 +9,7 @@ import { resolveAgentContextLimits } from "../agent-scope.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} from "../session-write-lock.js";
|
||||
import { formatContextLimitTruncationNotice } from "./context-truncation-notice.js";
|
||||
import { log } from "./logger.js";
|
||||
@@ -777,7 +777,7 @@ export async function truncateOversizedToolResultsInSession(params: {
|
||||
try {
|
||||
sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
});
|
||||
const state = await readTranscriptFileState(sessionFile);
|
||||
return await truncateOversizedToolResultsInTranscriptState({
|
||||
|
||||
@@ -343,7 +343,9 @@ describe("rewriteTranscriptEntriesInSessionFile", () => {
|
||||
expect(result.changed).toBe(true);
|
||||
expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({
|
||||
sessionFile,
|
||||
staleMs: 1_800_000,
|
||||
timeoutMs: 60_000,
|
||||
maxHoldMs: 300_000,
|
||||
});
|
||||
expect(acquireSessionWriteLockReleaseMock).toHaveBeenCalledTimes(1);
|
||||
expect(listener).toHaveBeenCalledWith({ sessionFile, sessionKey: "agent:main:test" });
|
||||
|
||||
@@ -11,7 +11,7 @@ import { getRawSessionAppendMessage } from "../session-raw-append-message.js";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} from "../session-write-lock.js";
|
||||
import { log } from "./logger.js";
|
||||
import {
|
||||
@@ -366,7 +366,7 @@ export async function rewriteTranscriptEntriesInSessionFile(params: {
|
||||
try {
|
||||
sessionLock = await acquireSessionWriteLock({
|
||||
sessionFile: params.sessionFile,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
});
|
||||
const state = await readTranscriptFileState(params.sessionFile);
|
||||
const result = rewriteTranscriptEntriesInState({
|
||||
|
||||
@@ -10,6 +10,7 @@ let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLock
|
||||
let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest;
|
||||
let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout;
|
||||
let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs;
|
||||
let resolveSessionWriteLockOptions: typeof import("./session-write-lock.js").resolveSessionWriteLockOptions;
|
||||
|
||||
async function expectLockRemovedOnlyAfterFinalRelease(params: {
|
||||
lockPath: string;
|
||||
@@ -146,6 +147,7 @@ describe("acquireSessionWriteLock", () => {
|
||||
resetSessionWriteLockStateForTest,
|
||||
resolveSessionLockMaxHoldFromTimeout,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} = await import("./session-write-lock.js"));
|
||||
});
|
||||
|
||||
@@ -369,6 +371,91 @@ describe("acquireSessionWriteLock", () => {
|
||||
).toBe(60_000);
|
||||
});
|
||||
|
||||
it("resolves session write-lock stale and max-hold policy", () => {
|
||||
expect(
|
||||
resolveSessionWriteLockOptions({
|
||||
session: {
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 90_000,
|
||||
staleMs: 45_000,
|
||||
maxHoldMs: 30_000,
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
timeoutMs: 90_000,
|
||||
staleMs: 45_000,
|
||||
maxHoldMs: 30_000,
|
||||
});
|
||||
});
|
||||
|
||||
it("lets session write-lock env override config for emergency tuning", () => {
|
||||
expect(
|
||||
resolveSessionWriteLockOptions(
|
||||
{
|
||||
session: {
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 90_000,
|
||||
staleMs: 45_000,
|
||||
maxHoldMs: 30_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
env: {
|
||||
OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS: "120000",
|
||||
OPENCLAW_SESSION_WRITE_LOCK_STALE_MS: "60000",
|
||||
OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS: "50000",
|
||||
},
|
||||
},
|
||||
),
|
||||
).toEqual({
|
||||
timeoutMs: 120_000,
|
||||
staleMs: 60_000,
|
||||
maxHoldMs: 50_000,
|
||||
});
|
||||
});
|
||||
|
||||
it("uses resolved stale policy when cleaning stale lock files", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-policy-"));
|
||||
const sessionsDir = path.join(root, "sessions");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const nowMs = Date.now();
|
||||
const lockPath = path.join(sessionsDir, "configured-live.jsonl.lock");
|
||||
|
||||
try {
|
||||
await fs.writeFile(
|
||||
lockPath,
|
||||
JSON.stringify({
|
||||
pid: process.pid,
|
||||
createdAt: new Date(nowMs - 45_000).toISOString(),
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const configOnly = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
config: { session: { writeLock: { staleMs: 30_000 } } },
|
||||
nowMs,
|
||||
removeStale: false,
|
||||
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"],
|
||||
});
|
||||
expect(configOnly.locks[0]?.stale).toBe(true);
|
||||
|
||||
const envOverride = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
config: { session: { writeLock: { staleMs: 30_000 } } },
|
||||
env: { OPENCLAW_SESSION_WRITE_LOCK_STALE_MS: "60000" },
|
||||
nowMs,
|
||||
removeStale: false,
|
||||
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"],
|
||||
});
|
||||
expect(envOverride.locks[0]?.stale).toBe(false);
|
||||
} finally {
|
||||
await fs.rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("clamps max hold for effectively no-timeout runs", () => {
|
||||
expect(
|
||||
resolveSessionLockMaxHoldFromTimeout({
|
||||
|
||||
@@ -36,8 +36,8 @@ type CleanupSignal = (typeof CLEANUP_SIGNALS)[number];
|
||||
const CLEANUP_STATE_KEY = Symbol.for("openclaw.sessionWriteLockCleanupState");
|
||||
const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState");
|
||||
|
||||
const DEFAULT_STALE_MS = 30 * 60 * 1000;
|
||||
const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000;
|
||||
export const DEFAULT_SESSION_WRITE_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
export const DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS = 5 * 60 * 1000;
|
||||
export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
|
||||
const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000;
|
||||
const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000;
|
||||
@@ -74,18 +74,113 @@ export type SessionWriteLockAcquireTimeoutConfig = {
|
||||
session?: {
|
||||
writeLock?: {
|
||||
acquireTimeoutMs?: number;
|
||||
staleMs?: number;
|
||||
maxHoldMs?: number;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
type SessionWriteLockMsKey = "acquireTimeoutMs" | "staleMs" | "maxHoldMs";
|
||||
|
||||
const SESSION_WRITE_LOCK_ENV: Record<SessionWriteLockMsKey, string> = {
|
||||
acquireTimeoutMs: "OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS",
|
||||
staleMs: "OPENCLAW_SESSION_WRITE_LOCK_STALE_MS",
|
||||
maxHoldMs: "OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS",
|
||||
};
|
||||
|
||||
function readPositiveMsEnv(
|
||||
env: NodeJS.ProcessEnv,
|
||||
key: string,
|
||||
opts: { allowInfinity?: boolean } = {},
|
||||
): number | undefined {
|
||||
const raw = env[key]?.trim();
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
const value = Number(raw);
|
||||
return parsePositiveMs(value, opts);
|
||||
}
|
||||
|
||||
function parsePositiveMs(
|
||||
value: number | undefined,
|
||||
opts: { allowInfinity?: boolean } = {},
|
||||
): number | undefined {
|
||||
if (typeof value !== "number" || Number.isNaN(value) || value <= 0) {
|
||||
return undefined;
|
||||
}
|
||||
if (value === Number.POSITIVE_INFINITY) {
|
||||
return opts.allowInfinity ? value : undefined;
|
||||
}
|
||||
if (!Number.isFinite(value)) {
|
||||
return undefined;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function resolveSessionWriteLockMs(params: {
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
key: SessionWriteLockMsKey;
|
||||
fallback: number;
|
||||
allowInfinity?: boolean;
|
||||
}): number {
|
||||
const opts = { allowInfinity: params.allowInfinity };
|
||||
return (
|
||||
readPositiveMsEnv(params.env ?? process.env, SESSION_WRITE_LOCK_ENV[params.key], opts) ??
|
||||
parsePositiveMs(params.config?.session?.writeLock?.[params.key], opts) ??
|
||||
params.fallback
|
||||
);
|
||||
}
|
||||
|
||||
export function resolveSessionWriteLockAcquireTimeoutMs(
|
||||
config?: SessionWriteLockAcquireTimeoutConfig,
|
||||
env?: NodeJS.ProcessEnv,
|
||||
): number {
|
||||
return resolvePositiveMs(
|
||||
config?.session?.writeLock?.acquireTimeoutMs,
|
||||
DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS,
|
||||
{ allowInfinity: true },
|
||||
);
|
||||
return resolveSessionWriteLockMs({
|
||||
config,
|
||||
env,
|
||||
key: "acquireTimeoutMs",
|
||||
fallback: DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS,
|
||||
allowInfinity: true,
|
||||
});
|
||||
}
|
||||
|
||||
export function resolveSessionWriteLockStaleMs(
|
||||
config?: SessionWriteLockAcquireTimeoutConfig,
|
||||
env?: NodeJS.ProcessEnv,
|
||||
): number {
|
||||
return resolveSessionWriteLockMs({
|
||||
config,
|
||||
env,
|
||||
key: "staleMs",
|
||||
fallback: DEFAULT_SESSION_WRITE_LOCK_STALE_MS,
|
||||
});
|
||||
}
|
||||
|
||||
export function resolveSessionWriteLockMaxHoldMs(
|
||||
config?: SessionWriteLockAcquireTimeoutConfig,
|
||||
params: { env?: NodeJS.ProcessEnv; fallback?: number } = {},
|
||||
): number {
|
||||
return resolveSessionWriteLockMs({
|
||||
config,
|
||||
env: params.env,
|
||||
key: "maxHoldMs",
|
||||
fallback: params.fallback ?? DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS,
|
||||
});
|
||||
}
|
||||
|
||||
export function resolveSessionWriteLockOptions(
|
||||
config?: SessionWriteLockAcquireTimeoutConfig,
|
||||
params: { env?: NodeJS.ProcessEnv; maxHoldMsFallback?: number } = {},
|
||||
): { timeoutMs: number; staleMs: number; maxHoldMs: number } {
|
||||
return {
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(config, params.env),
|
||||
staleMs: resolveSessionWriteLockStaleMs(config, params.env),
|
||||
maxHoldMs: resolveSessionWriteLockMaxHoldMs(config, {
|
||||
env: params.env,
|
||||
fallback: params.maxHoldMsFallback,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveCleanupState(): CleanupState {
|
||||
@@ -137,7 +232,7 @@ export function resolveSessionLockMaxHoldFromTimeout(params: {
|
||||
graceMs?: number;
|
||||
minMs?: number;
|
||||
}): number {
|
||||
const minMs = resolvePositiveMs(params.minMs, DEFAULT_MAX_HOLD_MS);
|
||||
const minMs = resolvePositiveMs(params.minMs, DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS);
|
||||
const timeoutMs = resolvePositiveMs(params.timeoutMs, minMs, { allowInfinity: true });
|
||||
if (timeoutMs === Number.POSITIVE_INFINITY) {
|
||||
return MAX_LOCK_HOLD_MS;
|
||||
@@ -159,7 +254,9 @@ async function runLockWatchdogCheck(nowMs = Date.now()): Promise<number> {
|
||||
let released = 0;
|
||||
for (const held of SESSION_LOCKS.heldEntries()) {
|
||||
const maxHoldMs =
|
||||
typeof held.metadata.maxHoldMs === "number" ? held.metadata.maxHoldMs : DEFAULT_MAX_HOLD_MS;
|
||||
typeof held.metadata.maxHoldMs === "number"
|
||||
? held.metadata.maxHoldMs
|
||||
: DEFAULT_SESSION_WRITE_LOCK_MAX_HOLD_MS;
|
||||
const heldForMs = nowMs - held.acquiredAt;
|
||||
if (heldForMs <= maxHoldMs) {
|
||||
continue;
|
||||
@@ -547,6 +644,8 @@ function inspectLockPayloadForSession(params: {
|
||||
|
||||
export async function cleanStaleLockFiles(params: {
|
||||
sessionsDir: string;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
staleMs?: number;
|
||||
removeStale?: boolean;
|
||||
nowMs?: number;
|
||||
@@ -557,7 +656,10 @@ export async function cleanStaleLockFiles(params: {
|
||||
};
|
||||
}): Promise<{ locks: SessionLockInspection[]; cleaned: SessionLockInspection[] }> {
|
||||
const sessionsDir = path.resolve(params.sessionsDir);
|
||||
const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS);
|
||||
const staleMs = resolvePositiveMs(
|
||||
params.staleMs,
|
||||
resolveSessionWriteLockStaleMs(params.config, params.env),
|
||||
);
|
||||
const removeStale = params.removeStale !== false;
|
||||
const nowMs = params.nowMs ?? Date.now();
|
||||
const ownerProcessArgsReader = params.readOwnerProcessArgs ?? readProcessArgsSync;
|
||||
@@ -622,11 +724,12 @@ export async function acquireSessionWriteLock(params: {
|
||||
}> {
|
||||
registerCleanupHandlers();
|
||||
const allowReentrant = params.allowReentrant ?? false;
|
||||
const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), {
|
||||
const defaultOptions = resolveSessionWriteLockOptions();
|
||||
const timeoutMs = resolvePositiveMs(params.timeoutMs, defaultOptions.timeoutMs, {
|
||||
allowInfinity: true,
|
||||
});
|
||||
const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS);
|
||||
const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS);
|
||||
const staleMs = resolvePositiveMs(params.staleMs, defaultOptions.staleMs);
|
||||
const maxHoldMs = resolvePositiveMs(params.maxHoldMs, defaultOptions.maxHoldMs);
|
||||
const sessionFile = path.resolve(params.sessionFile);
|
||||
const sessionDir = path.dirname(sessionFile);
|
||||
const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile);
|
||||
|
||||
@@ -104,6 +104,30 @@ describe("noteSessionLockHealth", () => {
|
||||
await expect(fs.access(freshLock)).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses configured stale threshold when repairing lock files", async () => {
|
||||
const sessionsDir = state.sessionsDir();
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
|
||||
const configuredStaleLock = path.join(sessionsDir, "configured-stale.jsonl.lock");
|
||||
await fs.writeFile(
|
||||
configuredStaleLock,
|
||||
JSON.stringify({ pid: process.pid, createdAt: new Date(Date.now() - 45_000).toISOString() }),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
await noteSessionLockHealth({
|
||||
shouldRepair: true,
|
||||
config: { session: { writeLock: { staleMs: 30_000 } } },
|
||||
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"],
|
||||
});
|
||||
|
||||
expect(note).toHaveBeenCalledTimes(1);
|
||||
const [message] = firstNoteCall();
|
||||
expect(message).toContain("stale=yes (too-old)");
|
||||
expect(message).toContain("[removed]");
|
||||
await expectPathMissing(configuredStaleLock);
|
||||
});
|
||||
|
||||
it("removes fresh live locks when the owner is not an OpenClaw process", async () => {
|
||||
const sessionsDir = state.sessionsDir();
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import { resolveAgentSessionDirs } from "../agents/session-dirs.js";
|
||||
import {
|
||||
cleanStaleLockFiles,
|
||||
resolveSessionWriteLockStaleMs,
|
||||
type SessionLockInspection,
|
||||
type SessionLockOwnerProcessArgsReader,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "../agents/session-write-lock.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { note } from "../terminal/note.js";
|
||||
import { shortenHomePath } from "../utils.js";
|
||||
|
||||
const DEFAULT_STALE_MS = 30 * 60 * 1000;
|
||||
|
||||
function formatAge(ageMs: number | null): string {
|
||||
if (ageMs === null) {
|
||||
return "unknown";
|
||||
@@ -41,11 +41,13 @@ function formatLockLine(lock: SessionLockInspection): string {
|
||||
|
||||
export async function noteSessionLockHealth(params?: {
|
||||
shouldRepair?: boolean;
|
||||
config?: SessionWriteLockAcquireTimeoutConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
staleMs?: number;
|
||||
readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader;
|
||||
}) {
|
||||
const shouldRepair = params?.shouldRepair === true;
|
||||
const staleMs = params?.staleMs ?? DEFAULT_STALE_MS;
|
||||
const staleMs = params?.staleMs ?? resolveSessionWriteLockStaleMs(params?.config, params?.env);
|
||||
let sessionDirs: string[] = [];
|
||||
try {
|
||||
sessionDirs = await resolveAgentSessionDirs(resolveStateDir(process.env));
|
||||
|
||||
@@ -708,10 +708,18 @@ describe("config help copy quality", () => {
|
||||
expect(/raw|unnormalized/i.test(rawKeyPrefix)).toBe(true);
|
||||
});
|
||||
|
||||
it("documents session write-lock acquire timeout defaults", () => {
|
||||
it("documents session write-lock policy defaults", () => {
|
||||
const acquireTimeout = FIELD_HELP["session.writeLock.acquireTimeoutMs"];
|
||||
expect(acquireTimeout.includes("60000")).toBe(true);
|
||||
expect(/transcript|lock/i.test(acquireTimeout)).toBe(true);
|
||||
|
||||
const stale = FIELD_HELP["session.writeLock.staleMs"];
|
||||
expect(stale.includes("1800000")).toBe(true);
|
||||
expect(stale.includes("OPENCLAW_SESSION_WRITE_LOCK_STALE_MS")).toBe(true);
|
||||
|
||||
const maxHold = FIELD_HELP["session.writeLock.maxHoldMs"];
|
||||
expect(maxHold.includes("300000")).toBe(true);
|
||||
expect(maxHold.includes("OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS")).toBe(true);
|
||||
});
|
||||
|
||||
it("documents session maintenance duration/size examples and deprecations", () => {
|
||||
|
||||
@@ -1610,9 +1610,13 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"session.sendPolicy.rules[].match.rawKeyPrefix":
|
||||
"Matches the raw, unnormalized session-key prefix for exact full-key policy targeting. Use this when normalized keyPrefix is too broad and you need agent-prefixed or transport-specific precision.",
|
||||
"session.writeLock":
|
||||
"Groups session transcript write-lock acquisition controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default wait.",
|
||||
"Groups session transcript write-lock controls. Tune only when legitimate transcript prep, cleanup, compaction, or mirror work contends longer than the default policies.",
|
||||
"session.writeLock.acquireTimeoutMs":
|
||||
"Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; raise for slow disks or long prep/cleanup, lower only when quick failure is preferred.",
|
||||
"Milliseconds to wait while acquiring a session transcript write lock before reporting the session as busy. Default: 60000; env override: OPENCLAW_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS.",
|
||||
"session.writeLock.staleMs":
|
||||
"Milliseconds before an existing session transcript lock can be treated as stale and reclaimed. Default: 1800000; env override: OPENCLAW_SESSION_WRITE_LOCK_STALE_MS.",
|
||||
"session.writeLock.maxHoldMs":
|
||||
"Milliseconds a held in-process session transcript lock may remain held before the watchdog releases it. Default: 300000; env override: OPENCLAW_SESSION_WRITE_LOCK_MAX_HOLD_MS.",
|
||||
"session.agentToAgent":
|
||||
"Groups controls for inter-agent session exchanges, including loop prevention limits on reply chaining. Keep defaults unless you run advanced agent-to-agent automation with strict turn caps.",
|
||||
"session.agentToAgent.maxPingPongTurns":
|
||||
|
||||
@@ -791,6 +791,8 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"session.sendPolicy.rules[].match.rawKeyPrefix": "Session Send Rule Raw Key Prefix",
|
||||
"session.writeLock": "Session Write Lock",
|
||||
"session.writeLock.acquireTimeoutMs": "Session Write Lock Acquire Timeout",
|
||||
"session.writeLock.staleMs": "Session Write Lock Stale Timeout",
|
||||
"session.writeLock.maxHoldMs": "Session Write Lock Max Hold",
|
||||
"session.agentToAgent": "Session Agent-to-Agent",
|
||||
"session.agentToAgent.maxPingPongTurns": "Agent-to-Agent Ping-Pong Turns",
|
||||
"session.threadBindings": "Session Thread Bindings",
|
||||
|
||||
@@ -5,7 +5,7 @@ import { StringDecoder } from "node:string_decoder";
|
||||
import type { AgentMessage } from "@earendil-works/pi-agent-core";
|
||||
import {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
} from "../../agents/session-write-lock.js";
|
||||
import { redactTranscriptMessage } from "../../agents/transcript-redact.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
@@ -264,7 +264,7 @@ async function appendSessionTranscriptMessageLocked<TMessage>(
|
||||
): Promise<{ messageId: string; message: TMessage }> {
|
||||
const lock = await acquireSessionWriteLock({
|
||||
sessionFile: params.transcriptPath,
|
||||
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
|
||||
...resolveSessionWriteLockOptions(params.config),
|
||||
allowReentrant: true,
|
||||
});
|
||||
try {
|
||||
|
||||
@@ -219,6 +219,10 @@ export type SessionConfig = {
|
||||
export type SessionWriteLockConfig = {
|
||||
/** How long to wait while acquiring a session transcript write lock. Default: 60000. */
|
||||
acquireTimeoutMs?: number;
|
||||
/** When an existing lock can be treated as stale and reclaimed. Default: 1800000. */
|
||||
staleMs?: number;
|
||||
/** Maximum in-process hold time before the watchdog releases the lock. Default: 300000. */
|
||||
maxHoldMs?: number;
|
||||
};
|
||||
|
||||
export type SessionMaintenanceMode = "enforce" | "warn";
|
||||
|
||||
@@ -6,12 +6,14 @@ describe("SessionSchema maintenance extensions", () => {
|
||||
const result = SessionSchema.safeParse({
|
||||
writeLock: {
|
||||
acquireTimeoutMs: 60_000,
|
||||
staleMs: 1_800_000,
|
||||
maxHoldMs: 300_000,
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects invalid session write-lock acquire timeout values", () => {
|
||||
it("rejects invalid session write-lock timeout values", () => {
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
writeLock: {
|
||||
@@ -19,6 +21,22 @@ describe("SessionSchema maintenance extensions", () => {
|
||||
},
|
||||
}),
|
||||
).toThrow(/acquireTimeoutMs|number/i);
|
||||
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
writeLock: {
|
||||
staleMs: 0,
|
||||
},
|
||||
}),
|
||||
).toThrow(/staleMs|number/i);
|
||||
|
||||
expect(() =>
|
||||
SessionSchema.parse({
|
||||
writeLock: {
|
||||
maxHoldMs: 0,
|
||||
},
|
||||
}),
|
||||
).toThrow(/maxHoldMs|number/i);
|
||||
});
|
||||
|
||||
it("accepts valid maintenance extensions", () => {
|
||||
|
||||
@@ -59,6 +59,8 @@ export const SessionSchema = z
|
||||
writeLock: z
|
||||
.object({
|
||||
acquireTimeoutMs: z.number().int().positive().optional(),
|
||||
staleMs: z.number().int().positive().optional(),
|
||||
maxHoldMs: z.number().int().positive().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
|
||||
@@ -344,7 +344,11 @@ async function runCodexSessionRouteHealth(ctx: DoctorHealthFlowContext): Promise
|
||||
|
||||
async function runSessionLocksHealth(ctx: DoctorHealthFlowContext): Promise<void> {
|
||||
const { noteSessionLockHealth } = await import("../commands/doctor-session-locks.js");
|
||||
await noteSessionLockHealth({ shouldRepair: ctx.prompter.shouldRepair });
|
||||
await noteSessionLockHealth({
|
||||
shouldRepair: ctx.prompter.shouldRepair,
|
||||
config: ctx.cfg,
|
||||
env: ctx.env,
|
||||
});
|
||||
}
|
||||
|
||||
async function runSessionTranscriptsHealth(ctx: DoctorHealthFlowContext): Promise<void> {
|
||||
|
||||
@@ -23,7 +23,6 @@ import type { refreshLatestUpdateRestartSentinel } from "./server-restart-sentin
|
||||
import type { logGatewayStartup } from "./server-startup-log.js";
|
||||
import type { startGatewayTailscaleExposure } from "./server-tailscale.js";
|
||||
|
||||
const SESSION_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
const ACP_BACKEND_READY_TIMEOUT_MS = 5_000;
|
||||
const ACP_BACKEND_READY_POLL_MS = 50;
|
||||
const PRIMARY_MODEL_PREWARM_TIMEOUT_MS = 5_000;
|
||||
@@ -563,7 +562,7 @@ export async function startGatewaySidecars(params: {
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
const result = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
config: params.cfg,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
|
||||
@@ -162,6 +162,7 @@ export { isSubagentSessionKey } from "../routing/session-key.js";
|
||||
export {
|
||||
acquireSessionWriteLock,
|
||||
resolveSessionWriteLockAcquireTimeoutMs,
|
||||
resolveSessionWriteLockOptions,
|
||||
type SessionWriteLockAcquireTimeoutConfig,
|
||||
} from "../agents/session-write-lock.js";
|
||||
export { appendSessionTranscriptMessage } from "../config/sessions/transcript-append.js";
|
||||
|
||||
Reference in New Issue
Block a user