fix(cron): isolate fresh cron session state

* fix(cron): isolate fresh cron session state

* fix(cron): deep-copy isolated session state

* fix(cron): reset isolated session context

* test(providers): avoid shared mock races

* test(providers): type injected stream fakes

* ci: refresh package boundary on reply runtime changes

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Vincent Koc
2026-04-24 22:15:19 -07:00
committed by GitHub
parent 7a9584f0f9
commit f0ceb4b68f
13 changed files with 595 additions and 154 deletions

View File

@@ -1587,7 +1587,7 @@ jobs:
packages/plugin-sdk/dist
extensions/*/dist/.boundary-tsc.tsbuildinfo
extensions/*/dist/.boundary-tsc.stamp
key: ${{ runner.os }}-extension-package-boundary-v1-${{ hashFiles('tsconfig.json', 'tsconfig.plugin-sdk.dts.json', 'packages/plugin-sdk/tsconfig.json', 'scripts/check-extension-package-tsc-boundary.mjs', 'scripts/prepare-extension-package-boundary-artifacts.mjs', 'scripts/write-plugin-sdk-entry-dts.ts', 'scripts/lib/plugin-sdk-entrypoints.json', 'scripts/lib/plugin-sdk-entries.mjs', 'src/plugin-sdk/**', 'src/video-generation/dashscope-compatible.ts', 'src/video-generation/types.ts', 'src/types/**', 'extensions/**', 'extensions/tsconfig.package-boundary*.json', 'package.json', 'pnpm-lock.yaml') }}
key: ${{ runner.os }}-extension-package-boundary-v1-${{ hashFiles('tsconfig.json', 'tsconfig.plugin-sdk.dts.json', 'packages/plugin-sdk/tsconfig.json', 'scripts/check-extension-package-tsc-boundary.mjs', 'scripts/prepare-extension-package-boundary-artifacts.mjs', 'scripts/write-plugin-sdk-entry-dts.ts', 'scripts/lib/plugin-sdk-entrypoints.json', 'scripts/lib/plugin-sdk-entries.mjs', 'src/plugin-sdk/**', 'src/auto-reply/**', 'src/video-generation/dashscope-compatible.ts', 'src/video-generation/types.ts', 'src/types/**', 'extensions/**', 'extensions/tsconfig.package-boundary*.json', 'package.json', 'pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-extension-package-boundary-v1-

View File

@@ -145,6 +145,7 @@ Docs: https://docs.openclaw.ai
- Plugins/Google Meet: tell agents to recover already-open Meet tabs after browser timeouts, and make the dev CLI release its build lock if compiler spawning fails. Thanks @steipete.
- Plugins/Google Meet: return structured manual-action details when browser-based meeting creation needs login or permissions, so agents can guide the operator without opening duplicate Meet tabs. Thanks @steipete.
- Plugins/CLI: provide Gateway-backed node inspection to plugin commands, so `googlemeet recover-tab` can inspect paired browser nodes from the terminal. Thanks @steipete.
- Cron/isolated sessions: clear stale runtime, lifecycle, auth, model, exec, heartbeat, usage, privilege, routing, and delivery artifacts when creating a fresh isolated run, and persist per-run session rows as snapshots so old base-session state no longer leaks into new cron executions. Thanks @vincentkoc.
- Gateway/sessions: recover main-agent turns interrupted by a gateway restart from stale transcript-lock evidence, avoiding stuck `status: "running"` sessions without broad post-boot transcript scans. Fixes #70555. Thanks @bitloi.
- Codex approvals: sanitize MCP elicitation approval titles, descriptions, and display parameters before forwarding them to OpenClaw approval prompts. (#71343) Thanks @Lucenx9.
- Codex approvals: keep command approval responses within Codex app-server `availableDecisions`, including deny/cancel fallbacks for prompts that do not offer `decline`. (#71338) Thanks @Lucenx9.

View File

@@ -1,31 +1,30 @@
import type { Model } from "@mariozechner/pi-ai";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { createAssistantMessageEventStream, type Model } from "@mariozechner/pi-ai";
import { beforeAll, describe, expect, it, vi } from "vitest";
import type { AnthropicVertexStreamDeps } from "./stream-runtime.js";
const hoisted = vi.hoisted(() => {
const streamAnthropicMock = vi.fn(() => Symbol("anthropic-vertex-stream"));
function createStreamDeps(): {
deps: AnthropicVertexStreamDeps;
streamAnthropicMock: ReturnType<typeof vi.fn>;
anthropicVertexCtorMock: ReturnType<typeof vi.fn>;
} {
const streamAnthropicMock = vi.fn(
(..._args: Parameters<AnthropicVertexStreamDeps["streamAnthropic"]>) =>
createAssistantMessageEventStream(),
);
const anthropicVertexCtorMock = vi.fn();
const MockAnthropicVertex = function MockAnthropicVertex(options: unknown) {
anthropicVertexCtorMock(options);
} as unknown as AnthropicVertexStreamDeps["AnthropicVertex"];
return {
deps: {
AnthropicVertex: MockAnthropicVertex,
streamAnthropic: streamAnthropicMock,
},
streamAnthropicMock,
anthropicVertexCtorMock,
};
});
vi.mock("@mariozechner/pi-ai", async () => {
const original =
await vi.importActual<typeof import("@mariozechner/pi-ai")>("@mariozechner/pi-ai");
return {
...original,
streamAnthropic: hoisted.streamAnthropicMock,
};
});
vi.mock("@anthropic-ai/vertex-sdk", () => ({
AnthropicVertex: vi.fn(function MockAnthropicVertex(options: unknown) {
hoisted.anthropicVertexCtorMock(options);
return { options };
}),
}));
}
let createAnthropicVertexStreamFn: typeof import("./api.js").createAnthropicVertexStreamFn;
let createAnthropicVertexStreamFnForModel: typeof import("./api.js").createAnthropicVertexStreamFnForModel;
@@ -45,33 +44,34 @@ describe("Anthropic Vertex API stream factories", () => {
await import("./api.js"));
});
beforeEach(() => {
hoisted.streamAnthropicMock.mockClear();
hoisted.anthropicVertexCtorMock.mockClear();
});
it("reuses the runtime stream factory across direct stream calls", async () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel();
await streamFn(model, { messages: [] }, {});
await streamFn(model, { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledTimes(1);
expect(hoisted.streamAnthropicMock).toHaveBeenCalledTimes(2);
expect(anthropicVertexCtorMock).toHaveBeenCalledTimes(1);
expect(streamAnthropicMock).toHaveBeenCalledTimes(2);
});
it("reuses the runtime stream factory across model-derived stream calls", async () => {
const streamFn = createAnthropicVertexStreamFnForModel(makeModel(), {
ANTHROPIC_VERTEX_PROJECT_ID: "vertex-project",
GOOGLE_CLOUD_LOCATION: "us-east5",
} as NodeJS.ProcessEnv);
const { deps, streamAnthropicMock, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFnForModel(
makeModel(),
{
ANTHROPIC_VERTEX_PROJECT_ID: "vertex-project",
GOOGLE_CLOUD_LOCATION: "us-east5",
} as NodeJS.ProcessEnv,
deps,
);
const model = makeModel();
await streamFn(model, { messages: [] }, {});
await streamFn(model, { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledTimes(1);
expect(hoisted.streamAnthropicMock).toHaveBeenCalledTimes(2);
expect(anthropicVertexCtorMock).toHaveBeenCalledTimes(1);
expect(streamAnthropicMock).toHaveBeenCalledTimes(2);
});
});

View File

@@ -1,4 +1,5 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import type { AnthropicVertexStreamDeps } from "./stream-runtime.js";
export {
ANTHROPIC_VERTEX_DEFAULT_MODEL_ID,
@@ -47,9 +48,10 @@ export function createAnthropicVertexStreamFn(
projectId: string | undefined,
region: string,
baseURL?: string,
deps?: AnthropicVertexStreamDeps,
): StreamFn {
const streamFnPromise = import("./stream-runtime.js").then((runtime) =>
runtime.createAnthropicVertexStreamFn(projectId, region, baseURL),
runtime.createAnthropicVertexStreamFn(projectId, region, baseURL, deps),
);
return async (model, context, options) => {
const streamFn = await streamFnPromise;
@@ -60,9 +62,10 @@ export function createAnthropicVertexStreamFn(
export function createAnthropicVertexStreamFnForModel(
model: { baseUrl?: string },
env: NodeJS.ProcessEnv = process.env,
deps?: AnthropicVertexStreamDeps,
): StreamFn {
const streamFnPromise = import("./stream-runtime.js").then((runtime) =>
runtime.createAnthropicVertexStreamFnForModel(model, env),
runtime.createAnthropicVertexStreamFnForModel(model, env, deps),
);
return async (...args) => {
const streamFn = await streamFnPromise;

View File

@@ -1,36 +1,32 @@
import type { Model } from "@mariozechner/pi-ai";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { createAssistantMessageEventStream, type Model } from "@mariozechner/pi-ai";
import { beforeAll, describe, expect, it, vi } from "vitest";
import type { AnthropicVertexStreamDeps } from "./stream-runtime.js";
const SYSTEM_PROMPT_CACHE_BOUNDARY = "\n<!-- OPENCLAW_CACHE_BOUNDARY -->\n";
const hoisted = vi.hoisted(() => {
const streamAnthropicMock = vi.fn<(model: unknown, context: unknown, options: unknown) => symbol>(
() => Symbol("anthropic-vertex-stream"),
function createStreamDeps(): {
deps: AnthropicVertexStreamDeps;
streamAnthropicMock: ReturnType<typeof vi.fn>;
anthropicVertexCtorMock: ReturnType<typeof vi.fn>;
} {
const streamAnthropicMock = vi.fn(
(..._args: Parameters<AnthropicVertexStreamDeps["streamAnthropic"]>) =>
createAssistantMessageEventStream(),
);
const anthropicVertexCtorMock = vi.fn();
const MockAnthropicVertex = function MockAnthropicVertex(options: unknown) {
anthropicVertexCtorMock(options);
} as unknown as AnthropicVertexStreamDeps["AnthropicVertex"];
return {
deps: {
AnthropicVertex: MockAnthropicVertex,
streamAnthropic: streamAnthropicMock,
},
streamAnthropicMock,
anthropicVertexCtorMock,
};
});
vi.mock("@mariozechner/pi-ai", async () => {
const original =
await vi.importActual<typeof import("@mariozechner/pi-ai")>("@mariozechner/pi-ai");
return {
...original,
streamAnthropic: (model: unknown, context: unknown, options: unknown) =>
hoisted.streamAnthropicMock(model, context, options),
};
});
vi.mock("@anthropic-ai/vertex-sdk", () => ({
AnthropicVertex: vi.fn(function MockAnthropicVertex(options: unknown) {
hoisted.anthropicVertexCtorMock(options);
return { options };
}),
}));
}
let createAnthropicVertexStreamFn: typeof import("./stream-runtime.js").createAnthropicVertexStreamFn;
let createAnthropicVertexStreamFnForModel: typeof import("./stream-runtime.js").createAnthropicVertexStreamFnForModel;
@@ -48,8 +44,12 @@ const CACHE_BOUNDARY_PROMPT = `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynam
type PayloadHook = (payload: unknown, payloadModel: unknown) => Promise<unknown>;
function captureCacheBoundaryPayloadHook(onPayload: PayloadHook) {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
function captureCacheBoundaryPayloadHook(
onPayload: PayloadHook,
deps: AnthropicVertexStreamDeps,
streamAnthropicMock: ReturnType<typeof vi.fn>,
) {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 });
void streamFn(
@@ -64,7 +64,7 @@ function captureCacheBoundaryPayloadHook(onPayload: PayloadHook) {
} as never,
);
const transportOptions = hoisted.streamAnthropicMock.mock.calls[0]?.[2] as {
const transportOptions = streamAnthropicMock.mock.calls[0]?.[2] as {
onPayload?: PayloadHook;
};
@@ -105,31 +105,29 @@ describe("createAnthropicVertexStreamFn", () => {
await import("./stream-runtime.js"));
});
beforeEach(() => {
hoisted.streamAnthropicMock.mockClear();
hoisted.anthropicVertexCtorMock.mockClear();
});
it("omits projectId when ADC credentials are used without an explicit project", () => {
const streamFn = createAnthropicVertexStreamFn(undefined, "global");
const { deps, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn(undefined, "global", undefined, deps);
void streamFn(makeModel({ id: "claude-sonnet-4-6", maxTokens: 128000 }), { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledWith({
expect(anthropicVertexCtorMock).toHaveBeenCalledWith({
region: "global",
});
});
it("passes an explicit baseURL through to the Vertex client", () => {
const { deps, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn(
"vertex-project",
"us-east5",
"https://proxy.example.test/vertex/v1",
deps,
);
void streamFn(makeModel({ id: "claude-sonnet-4-6", maxTokens: 128000 }), { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledWith({
expect(anthropicVertexCtorMock).toHaveBeenCalledWith({
projectId: "vertex-project",
region: "us-east5",
baseURL: "https://proxy.example.test/vertex/v1",
@@ -137,12 +135,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("defaults maxTokens to the model limit instead of the old 32000 cap", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-opus-4-6", maxTokens: 128000 });
void streamFn(model, { messages: [] }, {});
expect(hoisted.streamAnthropicMock).toHaveBeenCalledWith(
expect(streamAnthropicMock).toHaveBeenCalledWith(
model,
{ messages: [] },
expect.objectContaining({
@@ -152,12 +151,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("clamps explicit maxTokens to the selected model limit", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-sonnet-4-6", maxTokens: 128000 });
void streamFn(model, { messages: [] }, { maxTokens: 999999 });
expect(hoisted.streamAnthropicMock).toHaveBeenCalledWith(
expect(streamAnthropicMock).toHaveBeenCalledWith(
model,
{ messages: [] },
expect.objectContaining({
@@ -167,12 +167,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("maps xhigh reasoning to max effort for adaptive Opus models", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-opus-4-6", maxTokens: 64000 });
void streamFn(model, { messages: [] }, { reasoning: "xhigh" });
expect(hoisted.streamAnthropicMock).toHaveBeenCalledWith(
expect(streamAnthropicMock).toHaveBeenCalledWith(
model,
{ messages: [] },
expect.objectContaining({
@@ -183,12 +184,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("maps xhigh reasoning to xhigh effort for Opus 4.7", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-opus-4-7", maxTokens: 64000 });
void streamFn(model, { messages: [] }, { reasoning: "xhigh" });
expect(hoisted.streamAnthropicMock).toHaveBeenCalledWith(
expect(streamAnthropicMock).toHaveBeenCalledWith(
model,
{ messages: [] },
expect.objectContaining({
@@ -199,8 +201,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("applies Anthropic cache-boundary shaping before forwarding payload hooks", async () => {
const { deps, streamAnthropicMock } = createStreamDeps();
const onPayload = vi.fn(async (payload: unknown) => payload);
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(onPayload);
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(
onPayload,
deps,
streamAnthropicMock,
);
const payload = {
system: [
{
@@ -220,6 +227,7 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("reapplies Anthropic cache-boundary shaping when payload hooks return a fresh payload", async () => {
const { deps, streamAnthropicMock } = createStreamDeps();
const onPayload = vi.fn(async () => ({
system: [
{
@@ -229,7 +237,11 @@ describe("createAnthropicVertexStreamFn", () => {
],
messages: [{ role: "user", content: "Hello again" }],
}));
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(onPayload);
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(
onPayload,
deps,
streamAnthropicMock,
);
const nextPayload = await transportPayloadHook?.(
{
@@ -248,12 +260,13 @@ describe("createAnthropicVertexStreamFn", () => {
});
it("omits maxTokens when neither the model nor request provide a finite limit", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const { deps, streamAnthropicMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5", undefined, deps);
const model = makeModel({ id: "claude-sonnet-4-6" });
void streamFn(model, { messages: [] }, { maxTokens: Number.NaN });
expect(hoisted.streamAnthropicMock).toHaveBeenCalledWith(
expect(streamAnthropicMock).toHaveBeenCalledWith(
model,
{ messages: [] },
expect.not.objectContaining({
@@ -264,19 +277,17 @@ describe("createAnthropicVertexStreamFn", () => {
});
describe("createAnthropicVertexStreamFnForModel", () => {
beforeEach(() => {
hoisted.anthropicVertexCtorMock.mockClear();
});
it("derives project and region from the model and env", () => {
const { deps, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFnForModel(
{ baseUrl: "https://europe-west4-aiplatform.googleapis.com" },
{ GOOGLE_CLOUD_PROJECT_ID: "vertex-project" } as NodeJS.ProcessEnv,
deps,
);
void streamFn(makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 }), { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledWith({
expect(anthropicVertexCtorMock).toHaveBeenCalledWith({
projectId: "vertex-project",
region: "europe-west4",
baseURL: "https://europe-west4-aiplatform.googleapis.com/v1",
@@ -284,14 +295,16 @@ describe("createAnthropicVertexStreamFnForModel", () => {
});
it("preserves explicit custom provider base URLs", () => {
const { deps, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFnForModel(
{ baseUrl: "https://proxy.example.test/custom-root/v1" },
{ GOOGLE_CLOUD_PROJECT_ID: "vertex-project" } as NodeJS.ProcessEnv,
deps,
);
void streamFn(makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 }), { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledWith({
expect(anthropicVertexCtorMock).toHaveBeenCalledWith({
projectId: "vertex-project",
region: "global",
baseURL: "https://proxy.example.test/custom-root/v1",
@@ -299,14 +312,16 @@ describe("createAnthropicVertexStreamFnForModel", () => {
});
it("adds /v1 for path-prefixed custom provider base URLs", () => {
const { deps, anthropicVertexCtorMock } = createStreamDeps();
const streamFn = createAnthropicVertexStreamFnForModel(
{ baseUrl: "https://proxy.example.test/custom-root" },
{ GOOGLE_CLOUD_PROJECT_ID: "vertex-project" } as NodeJS.ProcessEnv,
deps,
);
void streamFn(makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 }), { messages: [] }, {});
expect(hoisted.anthropicVertexCtorMock).toHaveBeenCalledWith({
expect(anthropicVertexCtorMock).toHaveBeenCalledWith({
projectId: "vertex-project",
region: "global",
baseURL: "https://proxy.example.test/custom-root/v1",

View File

@@ -1,6 +1,10 @@
import { AnthropicVertex } from "@anthropic-ai/vertex-sdk";
import { AnthropicVertex as AnthropicVertexSdk } from "@anthropic-ai/vertex-sdk";
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { streamAnthropic, type AnthropicOptions, type Model } from "@mariozechner/pi-ai";
import {
streamAnthropic as streamAnthropicDefault,
type AnthropicOptions,
type Model,
} from "@mariozechner/pi-ai";
import {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
@@ -9,6 +13,17 @@ import { resolveAnthropicVertexClientRegion, resolveAnthropicVertexProjectId } f
type AnthropicVertexEffort = NonNullable<AnthropicOptions["effort"]>;
type AnthropicVertexAdaptiveEffort = AnthropicVertexEffort | "xhigh";
type AnthropicVertexClientOptions = ConstructorParameters<typeof AnthropicVertexSdk>[0];
export type AnthropicVertexStreamDeps = {
AnthropicVertex: new (options: AnthropicVertexClientOptions) => unknown;
streamAnthropic: typeof streamAnthropicDefault;
};
const defaultAnthropicVertexStreamDeps: AnthropicVertexStreamDeps = {
AnthropicVertex: AnthropicVertexSdk as AnthropicVertexStreamDeps["AnthropicVertex"],
streamAnthropic: streamAnthropicDefault,
};
function isClaudeOpus47Model(modelId: string): boolean {
return modelId.includes("opus-4-7") || modelId.includes("opus-4.7");
@@ -104,8 +119,9 @@ export function createAnthropicVertexStreamFn(
projectId: string | undefined,
region: string,
baseURL?: string,
deps: AnthropicVertexStreamDeps = defaultAnthropicVertexStreamDeps,
): StreamFn {
const client = new AnthropicVertex({
const client = new deps.AnthropicVertex({
region,
...(baseURL ? { baseURL } : {}),
...(projectId ? { projectId } : {}),
@@ -122,7 +138,7 @@ export function createAnthropicVertexStreamFn(
requestedMaxTokens: options?.maxTokens,
});
const opts: AnthropicOptions = {
client: client as unknown as AnthropicOptions["client"],
client: client as AnthropicOptions["client"],
temperature: options?.temperature,
...(maxTokens !== undefined ? { maxTokens } : {}),
signal: options?.signal,
@@ -157,7 +173,7 @@ export function createAnthropicVertexStreamFn(
opts.thinkingEnabled = false;
}
return streamAnthropic(transportModel, context, opts);
return deps.streamAnthropic(transportModel, context, opts);
};
}
@@ -187,6 +203,7 @@ function resolveAnthropicVertexSdkBaseUrl(baseUrl?: string): string | undefined
export function createAnthropicVertexStreamFnForModel(
model: { baseUrl?: string },
env: NodeJS.ProcessEnv = process.env,
deps?: AnthropicVertexStreamDeps,
): StreamFn {
return createAnthropicVertexStreamFn(
resolveAnthropicVertexProjectId(env),
@@ -195,5 +212,6 @@ export function createAnthropicVertexStreamFnForModel(
env,
}),
resolveAnthropicVertexSdkBaseUrl(model.baseUrl),
deps,
);
}

View File

@@ -1,19 +1,19 @@
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import path from "node:path";
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
import { afterEach, beforeAll, describe, expect, it } from "vitest";
let edgeTTS: typeof import("./tts.js").edgeTTS;
let mockTtsPromise = vi.fn<(text: string, filePath: string) => Promise<void>>();
vi.mock("node-edge-tts", () => ({
EdgeTTS: class {
ttsPromise(text: string, filePath: string) {
return mockTtsPromise(text, filePath);
}
},
}));
function createEdgeTTSDeps(ttsPromise: (text: string, filePath: string) => Promise<void>) {
return {
EdgeTTS: class {
ttsPromise(text: string, filePath: string) {
return ttsPromise(text, filePath);
}
},
};
}
const baseEdgeConfig = {
voice: "en-US-MichelleNeural",
@@ -40,17 +40,20 @@ describe("edgeTTS empty audio validation", () => {
tempDir = mkdtempSync(path.join(tmpdir(), "tts-test-"));
const outputPath = path.join(tempDir, "voice.mp3");
mockTtsPromise = vi.fn(async (_text: string, filePath: string) => {
const deps = createEdgeTTSDeps(async (_text: string, filePath: string) => {
writeFileSync(filePath, "");
});
await expect(
edgeTTS({
text: "Hello",
outputPath,
config: baseEdgeConfig,
timeoutMs: 10000,
}),
edgeTTS(
{
text: "Hello",
outputPath,
config: baseEdgeConfig,
timeoutMs: 10000,
},
deps,
),
).rejects.toThrow("Edge TTS produced empty audio file");
});
@@ -58,17 +61,20 @@ describe("edgeTTS empty audio validation", () => {
tempDir = mkdtempSync(path.join(tmpdir(), "tts-test-"));
const outputPath = path.join(tempDir, "voice.mp3");
mockTtsPromise = vi.fn(async (_text: string, filePath: string) => {
const deps = createEdgeTTSDeps(async (_text: string, filePath: string) => {
writeFileSync(filePath, Buffer.from([0xff, 0xfb, 0x90, 0x00]));
});
await expect(
edgeTTS({
text: "Hello",
outputPath,
config: baseEdgeConfig,
timeoutMs: 10000,
}),
edgeTTS(
{
text: "Hello",
outputPath,
config: baseEdgeConfig,
timeoutMs: 10000,
},
deps,
),
).resolves.toBeUndefined();
});
});

View File

@@ -2,6 +2,16 @@ import { statSync } from "node:fs";
import { EdgeTTS } from "node-edge-tts";
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
type EdgeTTSDeps = {
EdgeTTS: new (config: ConstructorParameters<typeof EdgeTTS>[0]) => {
ttsPromise: (text: string, outputPath: string) => Promise<unknown>;
};
};
const defaultEdgeTTSDeps: EdgeTTSDeps = {
EdgeTTS,
};
export function inferEdgeExtension(outputFormat: string): string {
const normalized = normalizeLowercaseStringOrEmpty(outputFormat);
if (normalized.includes("webm")) {
@@ -19,24 +29,27 @@ export function inferEdgeExtension(outputFormat: string): string {
return ".mp3";
}
export async function edgeTTS(params: {
text: string;
outputPath: string;
config: {
voice: string;
lang: string;
outputFormat: string;
saveSubtitles: boolean;
proxy?: string;
rate?: string;
pitch?: string;
volume?: string;
timeoutMs?: number;
};
timeoutMs: number;
}): Promise<void> {
export async function edgeTTS(
params: {
text: string;
outputPath: string;
config: {
voice: string;
lang: string;
outputFormat: string;
saveSubtitles: boolean;
proxy?: string;
rate?: string;
pitch?: string;
volume?: string;
timeoutMs?: number;
};
timeoutMs: number;
},
deps: EdgeTTSDeps = defaultEdgeTTSDeps,
): Promise<void> {
const { text, outputPath, config, timeoutMs } = params;
const tts = new EdgeTTS({
const tts = new deps.EdgeTTS({
voice: config.voice,
lang: config.lang,
outputFormat: config.outputFormat,

View File

@@ -15,6 +15,7 @@ const VALID_MODES = new Set(["all", "package-boundary"]);
const PLUGIN_SDK_TYPE_INPUTS = [
"tsconfig.json",
"src/plugin-sdk",
"src/auto-reply",
"src/video-generation/dashscope-compatible.ts",
"src/video-generation/types.ts",
"src/types",

View File

@@ -0,0 +1,92 @@
import { describe, expect, it, vi } from "vitest";
import type { SessionEntry } from "../../config/sessions.js";
import { createPersistCronSessionEntry, type MutableCronSession } from "./run-session-state.js";
function makeSessionEntry(overrides?: Partial<SessionEntry>): SessionEntry {
return {
sessionId: "run-session-id",
updatedAt: 1000,
systemSent: true,
...overrides,
};
}
function makeCronSession(entry = makeSessionEntry()): MutableCronSession {
return {
storePath: "/tmp/sessions.json",
store: {},
sessionEntry: entry,
systemSent: true,
isNewSession: true,
previousSessionId: undefined,
} as MutableCronSession;
}
describe("createPersistCronSessionEntry", () => {
it("persists a distinct run-session snapshot for isolated cron runs", async () => {
const cronSession = makeCronSession(
makeSessionEntry({
status: "running",
startedAt: 900,
skillsSnapshot: {
prompt: "old prompt",
skills: [{ name: "memory" }],
},
}),
);
const updateSessionStore = vi.fn(
async (_storePath, update: (store: Record<string, SessionEntry>) => void) => {
const store: Record<string, SessionEntry> = {};
update(store);
expect(store["agent:main:cron:job"]).toBe(cronSession.sessionEntry);
expect(store["agent:main:cron:job:run:run-session-id"]).not.toBe(cronSession.sessionEntry);
expect(store["agent:main:cron:job:run:run-session-id"]).toEqual(cronSession.sessionEntry);
},
);
const persist = createPersistCronSessionEntry({
isFastTestEnv: false,
cronSession,
agentSessionKey: "agent:main:cron:job",
runSessionKey: "agent:main:cron:job:run:run-session-id",
updateSessionStore,
});
await persist();
expect(cronSession.store["agent:main:cron:job"]).toBe(cronSession.sessionEntry);
expect(cronSession.store["agent:main:cron:job:run:run-session-id"]).not.toBe(
cronSession.sessionEntry,
);
cronSession.sessionEntry.status = "done";
cronSession.sessionEntry.skillsSnapshot!.skills[0].name = "changed";
expect(cronSession.store["agent:main:cron:job:run:run-session-id"]?.status).toBe("running");
expect(
cronSession.store["agent:main:cron:job:run:run-session-id"]?.skillsSnapshot?.skills[0]?.name,
).toBe("memory");
});
it("uses the shared session entry when the run key is the agent session key", async () => {
const cronSession = makeCronSession();
const updateSessionStore = vi.fn(
async (_storePath, update: (store: Record<string, SessionEntry>) => void) => {
const store: Record<string, SessionEntry> = {};
update(store);
expect(store["agent:main:session"]).toBe(cronSession.sessionEntry);
},
);
const persist = createPersistCronSessionEntry({
isFastTestEnv: false,
cronSession,
agentSessionKey: "agent:main:session",
runSessionKey: "agent:main:session",
updateSessionStore,
});
await persist();
expect(cronSession.store["agent:main:session"]).toBe(cronSession.sessionEntry);
});
});

View File

@@ -19,6 +19,10 @@ type UpdateSessionStore = (
export type PersistCronSessionEntry = () => Promise<void>;
function cloneSessionEntry(entry: MutableCronSessionEntry): MutableCronSessionEntry {
return globalThis.structuredClone(entry);
}
export function createPersistCronSessionEntry(params: {
isFastTestEnv: boolean;
cronSession: MutableCronSession;
@@ -30,14 +34,15 @@ export function createPersistCronSessionEntry(params: {
if (params.isFastTestEnv) {
return;
}
const runSessionEntry = cloneSessionEntry(params.cronSession.sessionEntry);
params.cronSession.store[params.agentSessionKey] = params.cronSession.sessionEntry;
if (params.runSessionKey !== params.agentSessionKey) {
params.cronSession.store[params.runSessionKey] = params.cronSession.sessionEntry;
params.cronSession.store[params.runSessionKey] = runSessionEntry;
}
await params.updateSessionStore(params.cronSession.storePath, (store) => {
store[params.agentSessionKey] = params.cronSession.sessionEntry;
if (params.runSessionKey !== params.agentSessionKey) {
store[params.runSessionKey] = params.cronSession.sessionEntry;
store[params.runSessionKey] = runSessionEntry;
}
});
};

View File

@@ -222,6 +222,206 @@ describe("resolveCronSession", () => {
expect(result.sessionEntry.modelOverride).toBe("gpt-5.4");
});
it("clears stale run-scoped state when forceNew rolls to a fresh session", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-987",
updatedAt: NOW_MS - 1000,
status: "done",
startedAt: NOW_MS - 10_000,
endedAt: NOW_MS - 1_000,
runtimeMs: 9_000,
lastHeartbeatText: "old heartbeat",
lastHeartbeatSentAt: NOW_MS - 1_000,
heartbeatIsolatedBaseSessionKey: "agent:main:cron:old",
model: "claude-opus-4-6",
modelProvider: "anthropic",
agentHarnessId: "claude-cli",
agentRuntimeOverride: "claude-cli",
cliSessionIds: { anthropic: "old-cli-session" },
cliSessionBindings: {},
claudeCliSessionId: "old-claude-session",
liveModelSwitchPending: true,
fallbackNoticeSelectedModel: "anthropic/claude-opus-4-6",
fallbackNoticeActiveModel: "anthropic/claude-sonnet-4-6",
fallbackNoticeReason: "rate limit",
inputTokens: 1,
outputTokens: 2,
totalTokens: 3,
totalTokensFresh: true,
estimatedCostUsd: 0.01,
execAsk: "always",
execHost: "gateway",
execNode: "node-1",
execSecurity: "allowlist",
cacheRead: 4,
cacheWrite: 5,
contextTokens: 200_000,
compactionCount: 9,
memoryFlushAt: NOW_MS - 500,
abortCutoffMessageSid: "old-message",
spawnedBy: "agent:main:session:parent",
skillsSnapshot: {
prompt: "old skills",
skills: [{ name: "stale-skill" }],
},
systemPromptReport: {
source: "run",
generatedAt: NOW_MS,
systemPrompt: {
chars: 1,
projectContextChars: 0,
nonProjectContextChars: 1,
},
injectedWorkspaceFiles: [],
skills: { promptChars: 0, entries: [] },
tools: { listChars: 0, schemaChars: 0, entries: [] },
},
pluginDebugEntries: [{ pluginId: "test", lines: ["old"] }],
elevatedLevel: "full",
sendPolicy: "deny",
groupActivation: "always",
groupActivationNeedsSystemIntro: true,
queueMode: "interrupt",
queueDebounceMs: 500,
queueCap: 25,
queueDrop: "old",
channel: "telegram" as never,
groupId: "group-1",
subject: "old subject",
groupChannel: "ops",
space: "team",
origin: {
provider: "telegram",
to: "old-chat",
},
acp: {
backend: "acpx",
agent: "codex",
runtimeSessionName: "old-acp",
mode: "persistent",
state: "idle",
lastActivityAt: NOW_MS - 1_000,
},
authProfileOverride: "auto-auth",
authProfileOverrideSource: "auto",
authProfileOverrideCompactionCount: 2,
modelOverride: "auto-model",
providerOverride: "anthropic",
modelOverrideSource: "auto",
},
fresh: true,
forceNew: true,
});
expect(result.isNewSession).toBe(true);
expect(result.sessionEntry.status).toBeUndefined();
expect(result.sessionEntry.startedAt).toBeUndefined();
expect(result.sessionEntry.endedAt).toBeUndefined();
expect(result.sessionEntry.runtimeMs).toBeUndefined();
expect(result.sessionEntry.lastHeartbeatText).toBeUndefined();
expect(result.sessionEntry.lastHeartbeatSentAt).toBeUndefined();
expect(result.sessionEntry.heartbeatIsolatedBaseSessionKey).toBeUndefined();
expect(result.sessionEntry.model).toBeUndefined();
expect(result.sessionEntry.modelProvider).toBeUndefined();
expect(result.sessionEntry.agentHarnessId).toBeUndefined();
expect(result.sessionEntry.agentRuntimeOverride).toBeUndefined();
expect(result.sessionEntry.cliSessionIds).toBeUndefined();
expect(result.sessionEntry.cliSessionBindings).toBeUndefined();
expect(result.sessionEntry.claudeCliSessionId).toBeUndefined();
expect(result.sessionEntry.liveModelSwitchPending).toBeUndefined();
expect(result.sessionEntry.fallbackNoticeSelectedModel).toBeUndefined();
expect(result.sessionEntry.fallbackNoticeActiveModel).toBeUndefined();
expect(result.sessionEntry.fallbackNoticeReason).toBeUndefined();
expect(result.sessionEntry.inputTokens).toBeUndefined();
expect(result.sessionEntry.outputTokens).toBeUndefined();
expect(result.sessionEntry.totalTokens).toBeUndefined();
expect(result.sessionEntry.totalTokensFresh).toBeUndefined();
expect(result.sessionEntry.estimatedCostUsd).toBeUndefined();
expect(result.sessionEntry.execAsk).toBeUndefined();
expect(result.sessionEntry.execHost).toBeUndefined();
expect(result.sessionEntry.execNode).toBeUndefined();
expect(result.sessionEntry.execSecurity).toBeUndefined();
expect(result.sessionEntry.cacheRead).toBeUndefined();
expect(result.sessionEntry.cacheWrite).toBeUndefined();
expect(result.sessionEntry.contextTokens).toBeUndefined();
expect(result.sessionEntry.compactionCount).toBeUndefined();
expect(result.sessionEntry.memoryFlushAt).toBeUndefined();
expect(result.sessionEntry.abortCutoffMessageSid).toBeUndefined();
expect(result.sessionEntry.spawnedBy).toBeUndefined();
expect(result.sessionEntry.skillsSnapshot).toBeUndefined();
expect(result.sessionEntry.systemPromptReport).toBeUndefined();
expect(result.sessionEntry.pluginDebugEntries).toBeUndefined();
expect(result.sessionEntry.elevatedLevel).toBeUndefined();
expect(result.sessionEntry.sendPolicy).toBeUndefined();
expect(result.sessionEntry.groupActivation).toBeUndefined();
expect(result.sessionEntry.groupActivationNeedsSystemIntro).toBeUndefined();
expect(result.sessionEntry.queueMode).toBeUndefined();
expect(result.sessionEntry.queueDebounceMs).toBeUndefined();
expect(result.sessionEntry.queueCap).toBeUndefined();
expect(result.sessionEntry.queueDrop).toBeUndefined();
expect(result.sessionEntry.channel).toBeUndefined();
expect(result.sessionEntry.groupId).toBeUndefined();
expect(result.sessionEntry.subject).toBeUndefined();
expect(result.sessionEntry.groupChannel).toBeUndefined();
expect(result.sessionEntry.space).toBeUndefined();
expect(result.sessionEntry.origin).toBeUndefined();
expect(result.sessionEntry.acp).toBeUndefined();
expect(result.sessionEntry.authProfileOverride).toBeUndefined();
expect(result.sessionEntry.authProfileOverrideSource).toBeUndefined();
expect(result.sessionEntry.authProfileOverrideCompactionCount).toBeUndefined();
expect(result.sessionEntry.modelOverride).toBeUndefined();
expect(result.sessionEntry.providerOverride).toBeUndefined();
expect(result.sessionEntry.modelOverrideSource).toBeUndefined();
});
it("preserves user-selected model and auth overrides for fresh cron sessions", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-654",
updatedAt: NOW_MS - 1000,
modelOverride: "claude-sonnet-4-6",
providerOverride: "anthropic",
modelOverrideSource: "user",
authProfileOverride: "work-profile",
authProfileOverrideSource: "user",
authProfileOverrideCompactionCount: 3,
},
fresh: true,
forceNew: true,
});
expect(result.isNewSession).toBe(true);
expect(result.sessionEntry.modelOverride).toBe("claude-sonnet-4-6");
expect(result.sessionEntry.providerOverride).toBe("anthropic");
expect(result.sessionEntry.modelOverrideSource).toBe("user");
expect(result.sessionEntry.authProfileOverride).toBe("work-profile");
expect(result.sessionEntry.authProfileOverrideSource).toBe("user");
expect(result.sessionEntry.authProfileOverrideCompactionCount).toBe(3);
});
it("preserves session context for stale non-isolated rollovers", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-321",
updatedAt: NOW_MS - 1000,
elevatedLevel: "full",
sendPolicy: "deny",
queueMode: "collect",
channel: "discord" as never,
origin: { provider: "discord", to: "old-channel" },
},
fresh: false,
});
expect(result.isNewSession).toBe(true);
expect(result.sessionEntry.elevatedLevel).toBe("full");
expect(result.sessionEntry.sendPolicy).toBe("deny");
expect(result.sessionEntry.queueMode).toBe("collect");
expect(result.sessionEntry.channel).toBe("discord");
expect(result.sessionEntry.origin).toEqual({ provider: "discord", to: "old-channel" });
});
it("clears delivery routing metadata when session is stale", () => {
const result = resolveWithStoredEntry({
entry: {

View File

@@ -9,6 +9,98 @@ import { loadSessionStore } from "../../config/sessions/store-load.js";
import type { SessionEntry } from "../../config/sessions/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
type FreshCronSessionSanitizeMode = "isolated-force-new" | "stale-rollover";
const FRESH_CRON_SAFE_PREFERENCE_FIELDS = [
"heartbeatTaskState",
"chatType",
"thinkingLevel",
"fastMode",
"verboseLevel",
"traceLevel",
"reasoningLevel",
"ttsAuto",
"responseUsage",
"label",
"displayName",
] as const satisfies readonly (keyof SessionEntry)[];
const STALE_SESSION_CONTEXT_PRESERVED_FIELDS = [
"elevatedLevel",
"groupActivation",
"groupActivationNeedsSystemIntro",
"sendPolicy",
"queueMode",
"queueDebounceMs",
"queueCap",
"queueDrop",
"channel",
"groupId",
"subject",
"groupChannel",
"space",
"origin",
"acp",
] as const satisfies readonly (keyof SessionEntry)[];
function cloneSessionField<T>(value: T): T {
return globalThis.structuredClone(value);
}
function copySessionFields(
target: SessionEntry,
entry: SessionEntry,
fields: readonly (keyof SessionEntry)[],
): void {
for (const field of fields) {
if (entry[field] !== undefined) {
target[field] = cloneSessionField(entry[field]) as never;
}
}
}
function preserveNonAutoModelOverride(target: SessionEntry, entry: SessionEntry): void {
if (entry.modelOverrideSource !== "auto") {
if (entry.modelOverride !== undefined) {
target.modelOverride = entry.modelOverride;
}
if (entry.providerOverride !== undefined) {
target.providerOverride = entry.providerOverride;
}
if (entry.modelOverrideSource !== undefined) {
target.modelOverrideSource = entry.modelOverrideSource;
}
}
}
function preserveUserAuthOverride(target: SessionEntry, entry: SessionEntry): void {
if (entry.authProfileOverrideSource === "user") {
if (entry.authProfileOverride !== undefined) {
target.authProfileOverride = entry.authProfileOverride;
}
target.authProfileOverrideSource = entry.authProfileOverrideSource;
if (entry.authProfileOverrideCompactionCount !== undefined) {
target.authProfileOverrideCompactionCount = entry.authProfileOverrideCompactionCount;
}
}
}
function sanitizeFreshCronSessionEntry(
entry: SessionEntry,
mode: FreshCronSessionSanitizeMode,
): SessionEntry {
const next = {} as SessionEntry;
copySessionFields(next, entry, FRESH_CRON_SAFE_PREFERENCE_FIELDS);
if (mode === "stale-rollover") {
copySessionFields(next, entry, STALE_SESSION_CONTEXT_PRESERVED_FIELDS);
}
preserveNonAutoModelOverride(next, entry);
preserveUserAuthOverride(next, entry);
return next;
}
export function resolveCronSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
@@ -65,27 +157,22 @@ export function resolveCronSession(params: {
previousSessionId,
});
const baseEntry = entry
? isNewSession
? sanitizeFreshCronSessionEntry(
entry,
params.forceNew ? "isolated-force-new" : "stale-rollover",
)
: entry
: undefined;
const sessionEntry: SessionEntry = {
// Preserve existing per-session overrides even when rolling to a new sessionId.
...entry,
...baseEntry,
// Always update these core fields
sessionId,
updatedAt: params.nowMs,
systemSent,
// When starting a fresh session (forceNew / isolated), clear delivery routing
// state inherited from prior sessions. Without this, lastThreadId leaks into
// the new session and causes announce-mode cron deliveries to post as thread
// replies instead of channel top-level messages.
// deliveryContext must also be cleared because normalizeSessionEntryDelivery
// repopulates lastThreadId from deliveryContext.threadId on store writes.
...(isNewSession && {
lastChannel: undefined,
lastTo: undefined,
lastAccountId: undefined,
lastThreadId: undefined,
deliveryContext: undefined,
sessionFile: undefined,
}),
};
return { storePath, store, sessionEntry, systemSent, isNewSession, previousSessionId };
}