Fix context usage display and active-run reload interruptions

Fixes context usage display regressions and prevents active runs from being interrupted by channel reloads. Adds persisted tool-result detail bounds so large tool metadata stays out of model/session payloads.
This commit is contained in:
Poo-Squirry
2026-04-26 03:07:52 +09:00
committed by GitHub
parent c3bfd328ad
commit fd3840cb00
11 changed files with 1014 additions and 43 deletions

View File

@@ -13,6 +13,7 @@ import {
applyEmbeddedAttemptToolsAllow,
isPrimaryBootstrapRun,
mergeOrphanedTrailingUserPrompt,
normalizeMessagesForLlmBoundary,
prependSystemPromptAddition,
remapInjectedContextFilesToWorkspace,
resetEmbeddedAgentBaseStreamFnCacheForTest,
@@ -73,6 +74,30 @@ describe("applyEmbeddedAttemptToolsAllow", () => {
});
});
describe("normalizeMessagesForLlmBoundary", () => {
it("strips tool result details before provider conversion", () => {
const input = [
{
role: "toolResult",
toolCallId: "call_1",
toolName: "exec",
content: [{ type: "text", text: "visible output" }],
details: { aggregated: "hidden diagnostics" },
isError: false,
timestamp: 1,
},
];
const output = normalizeMessagesForLlmBoundary(
input as Parameters<typeof normalizeMessagesForLlmBoundary>[0],
) as Array<Record<string, unknown>>;
expect(output[0]).not.toHaveProperty("details");
expect(output[0]?.content).toEqual([{ type: "text", text: "visible output" }]);
expect(input[0]).toHaveProperty("details");
});
});
describe("shouldCreateBundleMcpRuntimeForAttempt", () => {
it("skips bundle MCP when tools are disabled or unavailable", () => {
expect(shouldCreateBundleMcpRuntimeForAttempt({ toolsEnabled: false })).toBe(false);

View File

@@ -121,7 +121,10 @@ import { resolveSandboxContext } from "../../sandbox.js";
import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js";
import { repairSessionFileIfNeeded } from "../../session-file-repair.js";
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
import { sanitizeToolUseResultPairing } from "../../session-transcript-repair.js";
import {
sanitizeToolUseResultPairing,
stripToolResultDetails,
} from "../../session-transcript-repair.js";
import {
acquireSessionWriteLock,
resolveSessionLockMaxHoldFromTimeout,
@@ -466,6 +469,10 @@ export function applyEmbeddedAttemptToolsAllow<T extends { name: string }>(
return tools.filter((tool) => allowSet.has(tool.name));
}
export function normalizeMessagesForLlmBoundary(messages: AgentMessage[]): AgentMessage[] {
return stripToolResultDetails(normalizeAssistantReplayContent(messages));
}
export function shouldCreateBundleMcpRuntimeForAttempt(params: {
toolsEnabled: boolean;
disableTools?: boolean;
@@ -1391,7 +1398,7 @@ export async function runEmbeddedAttempt(
if (typeof activeSession.agent.convertToLlm === "function") {
const baseConvertToLlm = activeSession.agent.convertToLlm.bind(activeSession.agent);
activeSession.agent.convertToLlm = async (messages) =>
await baseConvertToLlm(normalizeAssistantReplayContent(messages));
await baseConvertToLlm(normalizeMessagesForLlmBoundary(messages));
}
let prePromptMessageCount = activeSession.messages.length;
let unwindowedContextEngineMessagesForPrecheck: AgentMessage[] | undefined;

View File

@@ -3,7 +3,7 @@ import os from "node:os";
import path from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import { describe, expect, it, afterEach } from "vitest";
import { describe, expect, it, afterEach, vi } from "vitest";
import {
initializeGlobalHookRunner,
resetGlobalHookRunner,
@@ -88,6 +88,14 @@ function expectPersistedToolResultTextCapped(sm: ReturnType<typeof SessionManage
expect(text).toContain("truncated");
}
function expectPersistedToolResultDetailsCapped(sm: ReturnType<typeof SessionManager.inMemory>) {
const toolResult = getPersistedToolResult(sm);
const details = toolResult.details as Record<string, unknown>;
expect(details.persistedDetailsTruncated).toBe(true);
expect(details.aggregated).toBeUndefined();
expect(Buffer.byteLength(JSON.stringify(details), "utf-8")).toBeLessThan(8_192);
}
afterEach(() => {
resetGlobalHookRunner();
if (originalBundledPluginsDir === undefined) {
@@ -109,6 +117,189 @@ describe("tool_result_persist hook", () => {
expect(toolResult.details).toBeTruthy();
});
it("caps oversized toolResult details before persistence", () => {
const sm = guardSessionManager(SessionManager.inMemory(), {
agentId: "main",
sessionKey: "main",
});
const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void;
appendMessage({
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }],
} as AgentMessage);
appendMessage({
role: "toolResult",
toolCallId: "call_1",
isError: false,
content: [{ type: "text", text: "visible output stays small" }],
details: {
status: "completed",
sessionId: "exec-1",
aggregated: "x".repeat(120_000),
tail: "t".repeat(6_000),
sessions: [
{
sessionId: "proc-1",
status: "completed",
command: "node noisy-script.js ".repeat(2_000),
aggregated: "a".repeat(80_000),
tail: "z".repeat(8_000),
},
],
},
} as any);
const toolResult = getPersistedToolResult(sm);
expect(toolResult.content[0]?.text).toBe("visible output stays small");
expectPersistedToolResultDetailsCapped(sm);
});
it("caps oversized toolResult details without serializing the original payload", () => {
const sm = guardSessionManager(SessionManager.inMemory(), {
agentId: "main",
sessionKey: "main",
});
const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void;
const oversizedDetails = {
status: "completed",
sessionId: "exec-large",
aggregated: "x".repeat(200_000),
sessions: [
{
sessionId: "proc-large",
command: "node noisy-script.js ".repeat(2_000),
tail: "z".repeat(20_000),
},
],
};
const originalStringify = JSON.stringify;
const stringifySpy = vi.spyOn(JSON, "stringify").mockImplementation((value, ...args) => {
if (value === oversizedDetails) {
throw new Error("unbounded original details stringify");
}
return originalStringify(value, ...args);
});
try {
appendMessage({
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }],
} as AgentMessage);
appendMessage({
role: "toolResult",
toolCallId: "call_1",
isError: false,
content: [{ type: "text", text: "visible output stays small" }],
details: oversizedDetails,
} as any);
} finally {
stringifySpy.mockRestore();
}
const toolResult = getPersistedToolResult(sm);
expect(toolResult.content[0]?.text).toBe("visible output stays small");
expectPersistedToolResultDetailsCapped(sm);
expect(stringifySpy).not.toHaveBeenCalledWith(oversizedDetails);
});
it("caps wide toolResult details without materializing every entry up front", () => {
const sm = guardSessionManager(SessionManager.inMemory(), {
agentId: "main",
sessionKey: "main",
});
const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void;
const wideDetails: Record<string, unknown> = {
status: "completed",
sessionId: "exec-wide",
};
for (let index = 0; index < 20_000; index += 1) {
wideDetails[`debug_${index}`] = `value-${index}`;
}
const originalEntries = Object.entries;
const originalKeys = Object.keys;
const entriesSpy = vi.spyOn(Object, "entries").mockImplementation((value) => {
if (value === wideDetails) {
throw new Error("wide details entries materialized");
}
return originalEntries(value);
});
const keysSpy = vi.spyOn(Object, "keys").mockImplementation((value) => {
if (value === wideDetails) {
throw new Error("wide details keys materialized");
}
return originalKeys(value);
});
try {
appendMessage({
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }],
} as AgentMessage);
appendMessage({
role: "toolResult",
toolCallId: "call_1",
isError: false,
content: [{ type: "text", text: "visible output stays small" }],
details: wideDetails,
} as any);
} finally {
entriesSpy.mockRestore();
keysSpy.mockRestore();
}
const toolResult = getPersistedToolResult(sm);
const details = toolResult.details as Record<string, unknown>;
expect(details.persistedDetailsTruncated).toBe(true);
expect(details.originalDetailKeys).toEqual(
expect.arrayContaining(["status", "sessionId", "debug_0"]),
);
});
it("falls back to a compact summary when sanitized details still exceed the cap", () => {
const sm = guardSessionManager(SessionManager.inMemory(), {
agentId: "main",
sessionKey: "main",
});
const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void;
appendMessage({
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }],
} as AgentMessage);
appendMessage({
role: "toolResult",
toolCallId: "call_1",
isError: false,
content: [{ type: "text", text: "visible output stays small" }],
details: {
status: "completed".repeat(250),
sessionId: "exec-oversized",
cwd: "/tmp/very-long-working-directory".repeat(250),
name: "noisy process".repeat(250),
fullOutputPath: "/tmp/output.log".repeat(250),
truncation: "truncated".repeat(250),
tail: "t".repeat(20_000),
aggregated: "a".repeat(120_000),
sessions: Array.from({ length: 10 }, (_, index) => ({
sessionId: `proc-${index}`,
status: "completed".repeat(100),
cwd: "/tmp/session".repeat(100),
name: "child process".repeat(100),
command: "node noisy-script.js ".repeat(200),
aggregated: "x".repeat(50_000),
tail: "z".repeat(10_000),
})),
},
} as any);
const toolResult = getPersistedToolResult(sm);
const details = toolResult.details as Record<string, unknown>;
expect(details.persistedDetailsTruncated).toBe(true);
expect(details.finalDetailsTruncated).toBe(true);
expect(details.aggregated).toBeUndefined();
expect(details.tail).toBeUndefined();
expect(Buffer.byteLength(JSON.stringify(details), "utf-8")).toBeLessThan(8_192);
});
it("loads tool_result_persist hooks without breaking persistence", () => {
const tmp = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-toolpersist-"));
process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = "/nonexistent/bundled/plugins";
@@ -189,6 +380,35 @@ describe("tool_result_persist hook", () => {
appendToolCallAndResult(sm);
expectPersistedToolResultTextCapped(sm);
});
it("reapplies the details cap after tool_result_persist expands details", () => {
initializeTempPlugin({
tmpPrefix: "openclaw-toolpersist-details-expand-",
id: "persist-details-expand",
body: `export default { id: "persist-details-expand", register(api) {
api.on("tool_result_persist", (event) => {
return {
message: {
...event.message,
details: {
status: "completed",
aggregated: "x".repeat(150000),
sessions: [{ sessionId: "proc-1", command: "y".repeat(50000), tail: "z".repeat(10000) }],
},
},
};
}, { priority: 10 });
} };`,
});
const sm = guardSessionManager(SessionManager.inMemory(), {
agentId: "main",
sessionKey: "main",
});
appendToolCallAndResult(sm);
expectPersistedToolResultDetailsCapped(sm);
});
});
describe("before_message_write hook", () => {

View File

@@ -1,5 +1,11 @@
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { SessionManager } from "@mariozechner/pi-coding-agent";
import {
boundedJsonUtf8Bytes,
firstEnumerableOwnKeys,
jsonUtf8BytesOrInfinity,
type BoundedJsonUtf8Bytes,
} from "../infra/json-utf8-bytes.js";
import type {
PluginHookBeforeMessageWriteEvent,
PluginHookBeforeMessageWriteResult,
@@ -38,6 +44,188 @@ function resolveMaxToolResultChars(opts?: { maxToolResultChars?: number }): numb
return Math.max(1, opts?.maxToolResultChars ?? DEFAULT_MAX_LIVE_TOOL_RESULT_CHARS);
}
const MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES = 8_192;
const MAX_PERSISTED_DETAIL_STRING_CHARS = 2_000;
const MAX_PERSISTED_DETAIL_SESSION_COUNT = 10;
const MAX_PERSISTED_DETAIL_FALLBACK_STRING_CHARS = 200;
function originalDetailsSizeFields(size: BoundedJsonUtf8Bytes): Record<string, number> {
return size.complete
? { originalDetailsBytes: size.bytes }
: { originalDetailsBytesAtLeast: size.bytes };
}
function truncatePersistedDetailString(
value: string,
maxChars = MAX_PERSISTED_DETAIL_STRING_CHARS,
): string {
if (value.length <= maxChars) {
return value;
}
return `${value.slice(0, maxChars)}\n\n[OpenClaw persisted detail truncated: ${
value.length - maxChars
} chars omitted]`;
}
function sanitizePersistedSessionDetail(value: unknown): unknown {
if (!value || typeof value !== "object") {
return value;
}
const src = value as Record<string, unknown>;
const out: Record<string, unknown> = {};
for (const key of [
"sessionId",
"status",
"pid",
"startedAt",
"endedAt",
"runtimeMs",
"cwd",
"name",
"truncated",
"exitCode",
"exitSignal",
]) {
const field = src[key];
if (field !== undefined) {
out[key] = typeof field === "string" ? truncatePersistedDetailString(field, 500) : field;
}
}
if (typeof src.command === "string") {
out.command = truncatePersistedDetailString(src.command, 500);
}
return out;
}
function buildPersistedDetailsFallback(
src: Record<string, unknown> | undefined,
originalSize: BoundedJsonUtf8Bytes,
sanitizedBytes?: number,
): Record<string, unknown> {
const fallback: Record<string, unknown> = {
persistedDetailsTruncated: true,
finalDetailsTruncated: true,
...originalDetailsSizeFields(originalSize),
};
if (sanitizedBytes !== undefined) {
fallback.sanitizedDetailsBytes = sanitizedBytes;
}
if (src) {
fallback.originalDetailKeys = firstEnumerableOwnKeys(src, 40);
for (const key of ["status", "sessionId", "pid", "exitCode", "exitSignal", "truncated"]) {
const field = src[key];
if (field !== undefined) {
fallback[key] =
typeof field === "string"
? truncatePersistedDetailString(field, MAX_PERSISTED_DETAIL_FALLBACK_STRING_CHARS)
: field;
}
}
}
return fallback;
}
function enforcePersistedDetailsByteCap(
value: Record<string, unknown>,
src: Record<string, unknown> | undefined,
originalSize: BoundedJsonUtf8Bytes,
): Record<string, unknown> {
const sanitizedBytes = jsonUtf8BytesOrInfinity(value);
if (sanitizedBytes <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) {
return value;
}
const fallback = buildPersistedDetailsFallback(src, originalSize, sanitizedBytes);
if (jsonUtf8BytesOrInfinity(fallback) <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) {
return fallback;
}
return {
persistedDetailsTruncated: true,
finalDetailsTruncated: true,
...originalDetailsSizeFields(originalSize),
sanitizedDetailsBytes: sanitizedBytes,
};
}
function sanitizeToolResultDetailsForPersistence(details: unknown): unknown {
if (details === undefined || details === null) {
return details;
}
const originalSize = boundedJsonUtf8Bytes(details, MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES);
if (originalSize.complete && originalSize.bytes <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) {
return details;
}
if (typeof details !== "object") {
return enforcePersistedDetailsByteCap(
{
persistedDetailsTruncated: true,
...originalDetailsSizeFields(originalSize),
valueType: typeof details,
},
undefined,
originalSize,
);
}
const src = details as Record<string, unknown>;
const out: Record<string, unknown> = {
persistedDetailsTruncated: true,
...originalDetailsSizeFields(originalSize),
originalDetailKeys: firstEnumerableOwnKeys(src, 40),
};
for (const key of [
"status",
"sessionId",
"pid",
"startedAt",
"endedAt",
"cwd",
"name",
"exitCode",
"exitSignal",
"retryInMs",
"total",
"totalLines",
"totalChars",
"truncated",
"fullOutputPath",
"truncation",
]) {
const field = src[key];
if (field !== undefined) {
out[key] = typeof field === "string" ? truncatePersistedDetailString(field) : field;
}
}
if (typeof src.tail === "string") {
out.tail = truncatePersistedDetailString(src.tail);
}
if (Array.isArray(src.sessions)) {
out.sessions = src.sessions
.slice(0, MAX_PERSISTED_DETAIL_SESSION_COUNT)
.map(sanitizePersistedSessionDetail);
if (src.sessions.length > MAX_PERSISTED_DETAIL_SESSION_COUNT) {
out.sessionsTruncated = src.sessions.length - MAX_PERSISTED_DETAIL_SESSION_COUNT;
}
}
return enforcePersistedDetailsByteCap(out, src, originalSize);
}
function capToolResultDetails(msg: AgentMessage): AgentMessage {
if ((msg as { role?: string }).role !== "toolResult") {
return msg;
}
const details = (msg as { details?: unknown }).details;
const sanitizedDetails = sanitizeToolResultDetailsForPersistence(details);
if (sanitizedDetails === details) {
return msg;
}
const next = { ...msg } as AgentMessage & { details?: unknown };
next.details = sanitizedDetails;
return next;
}
function capToolResultForPersistence(msg: AgentMessage, maxChars: number): AgentMessage {
return capToolResultDetails(capToolResultSize(msg, maxChars));
}
function normalizePersistedToolResultName(
message: AgentMessage,
fallbackName?: string,
@@ -169,7 +357,7 @@ export function installSessionToolResultGuard(
}),
);
if (flushed) {
originalAppend(capToolResultSize(flushed, maxToolResultChars) as never);
originalAppend(capToolResultForPersistence(flushed, maxToolResultChars) as never);
}
}
}
@@ -206,7 +394,10 @@ export function installSessionToolResultGuard(
const normalizedToolResult = normalizePersistedToolResultName(nextMessage, toolName);
// Apply hard size cap before persistence to prevent oversized tool results
// from consuming the entire context window on subsequent LLM calls.
const capped = capToolResultSize(persistMessage(normalizedToolResult), maxToolResultChars);
const capped = capToolResultForPersistence(
persistMessage(normalizedToolResult),
maxToolResultChars,
);
const persisted = applyBeforeWriteHook(
persistToolResult(capped, {
toolCallId: id ?? undefined,
@@ -217,7 +408,7 @@ export function installSessionToolResultGuard(
if (!persisted) {
return undefined;
}
return originalAppend(capToolResultSize(persisted, maxToolResultChars) as never);
return originalAppend(capToolResultForPersistence(persisted, maxToolResultChars) as never);
}
// Skip tool call extraction for aborted/errored assistant messages.

View File

@@ -59,6 +59,8 @@ type GatewayReloadLog = {
};
const MCP_RUNTIME_RELOAD_DISPOSE_TIMEOUT_MS = 5_000;
const CHANNEL_RELOAD_DEFERRAL_POLL_MS = 500;
const CHANNEL_RELOAD_STILL_PENDING_WARN_MS = 30_000;
async function disposeMcpRuntimesWithTimeout(params: {
dispose: () => Promise<void>;
@@ -125,6 +127,87 @@ type ManagedGatewayConfigReloaderParams = Omit<
};
export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) {
const getActiveCounts = () => {
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const embeddedRuns = getActiveEmbeddedRunCount();
const activeTasks = getInspectableTaskRegistrySummary().active;
return {
queueSize,
pendingReplies,
embeddedRuns,
activeTasks,
totalActive: queueSize + pendingReplies + embeddedRuns + activeTasks,
};
};
const formatActiveDetails = (counts: ReturnType<typeof getActiveCounts>) => {
const details = [];
if (counts.queueSize > 0) {
details.push(`${counts.queueSize} operation(s)`);
}
if (counts.pendingReplies > 0) {
details.push(`${counts.pendingReplies} reply(ies)`);
}
if (counts.embeddedRuns > 0) {
details.push(`${counts.embeddedRuns} embedded run(s)`);
}
if (counts.activeTasks > 0) {
details.push(`${counts.activeTasks} task run(s)`);
}
return details;
};
const waitForActiveWorkBeforeChannelReload = async (
channels: Iterable<ChannelKind>,
nextConfig: OpenClawConfig,
) => {
const initial = getActiveCounts();
if (initial.totalActive <= 0) {
return;
}
const channelNames = [...channels].join(", ");
const initialDetails = formatActiveDetails(initial);
params.logReload.warn(
`config change requires channel reload (${channelNames}) — deferring until ${initialDetails.join(
", ",
)} complete`,
);
const timeoutMsRaw = nextConfig.gateway?.reload?.deferralTimeoutMs;
const timeoutMs =
typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw) && timeoutMsRaw > 0
? Math.max(CHANNEL_RELOAD_DEFERRAL_POLL_MS, Math.floor(timeoutMsRaw))
: undefined;
const startedAt = Date.now();
let nextStillPendingAt = startedAt + CHANNEL_RELOAD_STILL_PENDING_WARN_MS;
while (true) {
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, CHANNEL_RELOAD_DEFERRAL_POLL_MS);
timer.unref?.();
});
const current = getActiveCounts();
if (current.totalActive <= 0) {
params.logReload.info("active operations and replies completed; reloading channels now");
return;
}
const elapsedMs = Date.now() - startedAt;
if (timeoutMs !== undefined && elapsedMs >= timeoutMs) {
const remaining = formatActiveDetails(current);
params.logReload.warn(
`channel reload timeout after ${elapsedMs}ms with ${remaining.join(
", ",
)} still active; reloading channels anyway`,
);
return;
}
if (Date.now() >= nextStillPendingAt) {
const remaining = formatActiveDetails(current);
params.logReload.warn(
`channel reload still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`,
);
nextStillPendingAt = Date.now() + CHANNEL_RELOAD_STILL_PENDING_WARN_MS;
}
}
};
const applyHotReload = async (plan: GatewayReloadPlan, nextConfig: OpenClawConfig) => {
setGatewaySigusr1RestartPolicy({ allowExternal: isRestartEnabled(nextConfig) });
const state = params.getState();
@@ -207,6 +290,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
"skipping channel reload (OPENCLAW_SKIP_CHANNELS=1 or OPENCLAW_SKIP_PROVIDERS=1)",
);
} else {
await waitForActiveWorkBeforeChannelReload(plan.restartChannels, nextConfig);
const restartChannel = async (name: ChannelKind) => {
params.logChannels.info(`restarting ${name} channel`);
await params.stopChannel(name);
@@ -244,35 +328,6 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
return false;
}
const getActiveCounts = () => {
const queueSize = getTotalQueueSize();
const pendingReplies = getTotalPendingReplies();
const embeddedRuns = getActiveEmbeddedRunCount();
const activeTasks = getInspectableTaskRegistrySummary().active;
return {
queueSize,
pendingReplies,
embeddedRuns,
activeTasks,
totalActive: queueSize + pendingReplies + embeddedRuns + activeTasks,
};
};
const formatActiveDetails = (counts: ReturnType<typeof getActiveCounts>) => {
const details = [];
if (counts.queueSize > 0) {
details.push(`${counts.queueSize} operation(s)`);
}
if (counts.pendingReplies > 0) {
details.push(`${counts.pendingReplies} reply(ies)`);
}
if (counts.embeddedRuns > 0) {
details.push(`${counts.embeddedRuns} embedded run(s)`);
}
if (counts.activeTasks > 0) {
details.push(`${counts.activeTasks} task run(s)`);
}
return details;
};
const active = getActiveCounts();
if (active.totalActive > 0) {

View File

@@ -20,6 +20,7 @@ import { ConnectErrorDetailCodes } from "./protocol/connect-error-details.js";
import {
connectReq,
connectOk,
embeddedRunMock,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
@@ -282,6 +283,7 @@ describe("gateway hot reload", () => {
let prevOpenAiApiKey: string | undefined;
beforeEach(() => {
vi.clearAllMocks();
prevSkipChannels = process.env.OPENCLAW_SKIP_CHANNELS;
prevSkipGmail = process.env.OPENCLAW_SKIP_GMAIL_WATCHER;
prevSkipProviders = process.env.OPENCLAW_SKIP_PROVIDERS;
@@ -289,10 +291,12 @@ describe("gateway hot reload", () => {
process.env.OPENCLAW_SKIP_CHANNELS = "0";
delete process.env.OPENCLAW_SKIP_GMAIL_WATCHER;
delete process.env.OPENCLAW_SKIP_PROVIDERS;
hoisted.cronInstances.length = 0;
hoisted.activeEmbeddedRunCount.value = 0;
hoisted.totalPendingReplies.value = 0;
hoisted.totalQueueSize.value = 0;
hoisted.activeTaskCount.value = 0;
embeddedRunMock.activeIds.clear();
hoisted.resetModelCatalogCache.mockReset();
hoisted.disposeAllSessionMcpRuntimes.mockReset();
hoisted.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined);
@@ -423,6 +427,196 @@ describe("gateway hot reload", () => {
);
}
it("defers channel hot reload until active work drains", async () => {
await withNonMinimalGatewayServer(async () => {
const onHotReload = hoisted.getOnHotReload();
expect(onHotReload).toBeTypeOf("function");
hoisted.providerManager.stopChannel.mockClear();
hoisted.providerManager.startChannel.mockClear();
hoisted.activeEmbeddedRunCount.value = 1;
embeddedRunMock.activeIds.add("reload-active");
const delay = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const reloadPromise = onHotReload?.(
{
changedPaths: ["channels.discord.token"],
restartGateway: false,
restartReasons: [],
hotReasons: ["channels.discord.token"],
reloadHooks: false,
restartGmailWatcher: false,
restartCron: false,
restartHeartbeat: false,
restartChannels: new Set(["discord"]),
noopPaths: [],
},
{
gateway: { reload: { deferralTimeoutMs: 60_000 } },
channels: { discord: { token: "token" } },
},
);
try {
await delay(550);
expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled();
expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled();
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await reloadPromise;
} finally {
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await reloadPromise?.catch(() => {});
}
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord");
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord");
});
});
it("uses the configured timeout when active work does not drain before channel reload", async () => {
await withNonMinimalGatewayServer(async () => {
const onHotReload = hoisted.getOnHotReload();
expect(onHotReload).toBeTypeOf("function");
hoisted.providerManager.stopChannel.mockClear();
hoisted.providerManager.startChannel.mockClear();
hoisted.activeEmbeddedRunCount.value = 1;
embeddedRunMock.activeIds.add("reload-stuck");
vi.useFakeTimers();
const reloadPromise = onHotReload?.(
{
changedPaths: ["channels.discord.token"],
restartGateway: false,
restartReasons: [],
hotReasons: ["channels.discord.token"],
reloadHooks: false,
restartGmailWatcher: false,
restartCron: false,
restartHeartbeat: false,
restartChannels: new Set(["discord"]),
noopPaths: [],
},
{
gateway: { reload: { deferralTimeoutMs: 1_000 } },
channels: { discord: { token: "token" } },
},
);
try {
await Promise.resolve();
await vi.advanceTimersByTimeAsync(500);
expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled();
expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(500);
await reloadPromise;
} finally {
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await vi.advanceTimersByTimeAsync(500).catch(() => {});
vi.useRealTimers();
await reloadPromise?.catch(() => {});
}
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord");
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord");
});
});
it("waits indefinitely for channel hot reload when deferral timeout is 0 or omitted", async () => {
await withNonMinimalGatewayServer(async () => {
const onHotReload = hoisted.getOnHotReload();
expect(onHotReload).toBeTypeOf("function");
hoisted.providerManager.stopChannel.mockClear();
hoisted.providerManager.startChannel.mockClear();
hoisted.activeEmbeddedRunCount.value = 1;
embeddedRunMock.activeIds.add("reload-indefinite");
vi.useFakeTimers();
const reloadPromise = onHotReload?.(
{
changedPaths: ["channels.discord.token"],
restartGateway: false,
restartReasons: [],
hotReasons: ["channels.discord.token"],
reloadHooks: false,
restartGmailWatcher: false,
restartCron: false,
restartHeartbeat: false,
restartChannels: new Set(["discord"]),
noopPaths: [],
},
{
gateway: { reload: { deferralTimeoutMs: 0 } },
channels: { discord: { token: "token" } },
},
);
try {
await Promise.resolve();
await vi.advanceTimersByTimeAsync(10 * 60_000);
expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled();
expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled();
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await vi.advanceTimersByTimeAsync(500);
await reloadPromise;
} finally {
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await vi.advanceTimersByTimeAsync(500).catch(() => {});
vi.useRealTimers();
await reloadPromise?.catch(() => {});
}
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord");
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord");
hoisted.providerManager.stopChannel.mockClear();
hoisted.providerManager.startChannel.mockClear();
hoisted.activeEmbeddedRunCount.value = 1;
embeddedRunMock.activeIds.add("reload-indefinite-omitted");
vi.useFakeTimers();
const omittedPromise = onHotReload?.(
{
changedPaths: ["channels.telegram.botToken"],
restartGateway: false,
restartReasons: [],
hotReasons: ["channels.telegram.botToken"],
reloadHooks: false,
restartGmailWatcher: false,
restartCron: false,
restartHeartbeat: false,
restartChannels: new Set(["telegram"]),
noopPaths: [],
},
{
channels: { telegram: { botToken: "token" } },
},
);
try {
await Promise.resolve();
await vi.advanceTimersByTimeAsync(10 * 60_000);
expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled();
expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled();
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await vi.advanceTimersByTimeAsync(500);
await omittedPromise;
} finally {
hoisted.activeEmbeddedRunCount.value = 0;
embeddedRunMock.activeIds.clear();
await vi.advanceTimersByTimeAsync(500).catch(() => {});
vi.useRealTimers();
await omittedPromise?.catch(() => {});
}
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("telegram");
expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("telegram");
});
});
it("applies hot reload actions and emits restart signal", async () => {
await withNonMinimalGatewayServer(async () => {
const onHotReload = hoisted.getOnHotReload();

View File

@@ -1,5 +1,10 @@
import { describe, expect, it } from "vitest";
import { jsonUtf8Bytes } from "./json-utf8-bytes.js";
import {
boundedJsonUtf8Bytes,
firstEnumerableOwnKeys,
jsonUtf8Bytes,
jsonUtf8BytesOrInfinity,
} from "./json-utf8-bytes.js";
function createCircularValue() {
const circular: { self?: unknown } = {};
@@ -45,3 +50,69 @@ describe("jsonUtf8Bytes", () => {
expect(jsonUtf8Bytes(value)).toBe(Buffer.byteLength(expected, "utf8"));
});
});
describe("jsonUtf8BytesOrInfinity", () => {
it("returns exact JSON byte length for serializable values", () => {
const value = { a: "x", b: [1, 2, null] };
expect(jsonUtf8BytesOrInfinity(value)).toBe(Buffer.byteLength(JSON.stringify(value), "utf8"));
});
it.each([createCircularValue(), 12n, undefined])(
"returns infinity for values that cannot be serialized as JSON",
(value) => {
expect(jsonUtf8BytesOrInfinity(value)).toBe(Number.POSITIVE_INFINITY);
},
);
});
describe("boundedJsonUtf8Bytes", () => {
it.each([
{ name: "plain object", value: { a: "x", b: [1, 2, null] } },
{ name: "unicode string", value: { value: "🙂" } },
{
name: "array holes and undefined",
value: (() => {
const value = [undefined, () => undefined] as unknown[];
value.length = 3;
return value;
})(),
},
{ name: "non-finite numbers", value: [Number.NaN, Number.POSITIVE_INFINITY] },
{ name: "date", value: { at: new Date("2026-04-25T12:00:00.000Z") } },
])("matches JSON.stringify byte length for $name", ({ value }) => {
expect(boundedJsonUtf8Bytes(value, 100_000)).toEqual({
bytes: Buffer.byteLength(JSON.stringify(value), "utf8"),
complete: true,
});
});
it("stops once the byte limit is exceeded", () => {
expect(boundedJsonUtf8Bytes({ value: "x".repeat(50_000) }, 8_192)).toEqual({
bytes: 8_193,
complete: false,
});
});
it.each([
{ name: "circular objects", value: createCircularValue() },
{ name: "BigInt", value: { value: 12n } },
{ name: "custom toJSON", value: { toJSON: () => ({ ok: true }) } },
])("marks $name incomplete instead of invoking unsafe JSON serialization", ({ value }) => {
const result = boundedJsonUtf8Bytes(value, 8_192);
expect(result.complete).toBe(false);
expect(result.bytes).toBeGreaterThan(8_192);
});
});
describe("firstEnumerableOwnKeys", () => {
it("returns only own enumerable keys up to the limit", () => {
const inherited = { inherited: true };
const value = Object.create(inherited) as Record<string, unknown>;
value.a = 1;
value.b = 2;
value.c = 3;
Object.defineProperty(value, "hidden", { enumerable: false, value: true });
expect(firstEnumerableOwnKeys(value, 2)).toEqual(["a", "b"]);
});
});

View File

@@ -5,3 +5,144 @@ export function jsonUtf8Bytes(value: unknown): number {
return Buffer.byteLength(String(value), "utf8");
}
}
export type BoundedJsonUtf8Bytes = {
bytes: number;
complete: boolean;
};
export function jsonUtf8BytesOrInfinity(value: unknown): number {
try {
const serialized = JSON.stringify(value);
return typeof serialized === "string"
? Buffer.byteLength(serialized, "utf8")
: Number.POSITIVE_INFINITY;
} catch {
return Number.POSITIVE_INFINITY;
}
}
function jsonStringByteLengthUpToLimit(value: string, remainingBytes: number): number {
if (value.length + 2 > remainingBytes) {
return remainingBytes + 1;
}
return jsonUtf8BytesOrInfinity(value);
}
function* enumerableOwnEntries(value: object): Generator<[string, unknown]> {
const record = value as Record<string, unknown>;
for (const key in record) {
if (Object.prototype.propertyIsEnumerable.call(record, key)) {
yield [key, record[key]];
}
}
}
export function firstEnumerableOwnKeys(value: object, maxKeys: number): string[] {
const keys: string[] = [];
for (const key in value as Record<string, unknown>) {
if (!Object.prototype.propertyIsEnumerable.call(value, key)) {
continue;
}
keys.push(key);
if (keys.length >= maxKeys) {
break;
}
}
return keys;
}
export function boundedJsonUtf8Bytes(value: unknown, maxBytes: number): BoundedJsonUtf8Bytes {
let bytes = 0;
const seen = new WeakSet<object>();
const add = (amount: number): void => {
bytes += amount;
if (bytes > maxBytes) {
throw new Error("json_byte_limit_exceeded");
}
};
const visit = (entry: unknown, inArray: boolean): void => {
if (entry === null) {
add(4);
return;
}
switch (typeof entry) {
case "string":
add(jsonStringByteLengthUpToLimit(entry, maxBytes - bytes));
return;
case "number":
add(jsonUtf8BytesOrInfinity(Number.isFinite(entry) ? entry : null));
return;
case "boolean":
add(entry ? 4 : 5);
return;
case "undefined":
case "function":
case "symbol":
if (inArray) {
add(4);
}
return;
case "bigint":
throw new Error("json_byte_length_unsupported");
case "object":
break;
}
const objectEntry = entry as object;
if (seen.has(objectEntry)) {
throw new Error("json_byte_length_circular");
}
if (
typeof (objectEntry as { toJSON?: unknown }).toJSON === "function" &&
!(objectEntry instanceof Date)
) {
throw new Error("json_byte_length_custom_to_json");
}
seen.add(objectEntry);
try {
if (objectEntry instanceof Date) {
visit(objectEntry.toJSON(), inArray);
return;
}
if (Array.isArray(objectEntry)) {
add(1);
for (let index = 0; index < objectEntry.length; index += 1) {
if (index > 0) {
add(1);
}
visit(objectEntry[index], true);
}
add(1);
return;
}
add(1);
let wroteField = false;
for (const [key, field] of enumerableOwnEntries(objectEntry)) {
if (field === undefined || typeof field === "function" || typeof field === "symbol") {
continue;
}
if (wroteField) {
add(1);
}
wroteField = true;
add(jsonStringByteLengthUpToLimit(key, maxBytes - bytes));
add(1);
visit(field, false);
}
add(1);
} finally {
seen.delete(objectEntry);
}
};
try {
visit(value, false);
return { bytes, complete: true };
} catch {
return { bytes: Math.max(bytes, maxBytes + 1), complete: false };
}
}