diff --git a/CHANGELOG.md b/CHANGELOG.md index c7deea66c3c..6959abd2737 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,8 @@ Docs: https://docs.openclaw.ai block. Fixes #71572. (#71627) Thanks @openperf. - Browser/CDP: make readiness diagnostics use the same discovery-first fallback as reachability for bare `ws://` Browserless and Browserbase CDP URLs. Fixes #69532. - Browser/CDP: explain that loopback Browserless or other externally managed CDP services need `attachOnly: true` and matching Browserless `EXTERNAL` endpoint when reporting local port ownership conflicts, and fall back to the configured bare WebSocket root when a discovered Browserless endpoint rejects CDP. Fixes #49815. +- Gateway/reload: preserve indefinite `gateway.reload.deferralTimeoutMs: 0` semantics for channel hot reload deferrals so active agent runs are not interrupted by a forced channel restart. (#71637) Thanks @Poo-Squirry. +- Agents/tool results: cap persisted Pi tool-result details and strip hidden diagnostics before provider conversion, preventing large debug payloads from bloating session transcripts. (#71637) Thanks @Poo-Squirry. - ACP/OpenCode: update the bundled acpx runtime to 0.6.0 and cover the OpenCode ACP bind path in Docker live tests. - Providers/OpenCode Go: add DeepSeek V4 Pro and DeepSeek V4 Flash to the Go catalog while the bundled Pi registry catches up. Fixes #71587. - Providers/OpenCode Go: route DeepSeek V4 Pro/Flash through the OpenAI-compatible Go endpoint and suppress invalid `reasoning_effort: "off"` payloads, fixing tool-enabled requests for `opencode-go/deepseek-v4-flash`. Fixes #71683. diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index a7ff53b0c6a..61a8b98f1e0 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -13,6 +13,7 @@ import { applyEmbeddedAttemptToolsAllow, isPrimaryBootstrapRun, mergeOrphanedTrailingUserPrompt, + normalizeMessagesForLlmBoundary, prependSystemPromptAddition, remapInjectedContextFilesToWorkspace, resetEmbeddedAgentBaseStreamFnCacheForTest, @@ -73,6 +74,30 @@ describe("applyEmbeddedAttemptToolsAllow", () => { }); }); +describe("normalizeMessagesForLlmBoundary", () => { + it("strips tool result details before provider conversion", () => { + const input = [ + { + role: "toolResult", + toolCallId: "call_1", + toolName: "exec", + content: [{ type: "text", text: "visible output" }], + details: { aggregated: "hidden diagnostics" }, + isError: false, + timestamp: 1, + }, + ]; + + const output = normalizeMessagesForLlmBoundary( + input as Parameters[0], + ) as Array>; + + expect(output[0]).not.toHaveProperty("details"); + expect(output[0]?.content).toEqual([{ type: "text", text: "visible output" }]); + expect(input[0]).toHaveProperty("details"); + }); +}); + describe("shouldCreateBundleMcpRuntimeForAttempt", () => { it("skips bundle MCP when tools are disabled or unavailable", () => { expect(shouldCreateBundleMcpRuntimeForAttempt({ toolsEnabled: false })).toBe(false); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index da4dd241af5..4ed9cd4adbc 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -121,7 +121,10 @@ import { resolveSandboxContext } from "../../sandbox.js"; import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js"; import { repairSessionFileIfNeeded } from "../../session-file-repair.js"; import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js"; -import { sanitizeToolUseResultPairing } from "../../session-transcript-repair.js"; +import { + sanitizeToolUseResultPairing, + stripToolResultDetails, +} from "../../session-transcript-repair.js"; import { acquireSessionWriteLock, resolveSessionLockMaxHoldFromTimeout, @@ -466,6 +469,10 @@ export function applyEmbeddedAttemptToolsAllow( return tools.filter((tool) => allowSet.has(tool.name)); } +export function normalizeMessagesForLlmBoundary(messages: AgentMessage[]): AgentMessage[] { + return stripToolResultDetails(normalizeAssistantReplayContent(messages)); +} + export function shouldCreateBundleMcpRuntimeForAttempt(params: { toolsEnabled: boolean; disableTools?: boolean; @@ -1391,7 +1398,7 @@ export async function runEmbeddedAttempt( if (typeof activeSession.agent.convertToLlm === "function") { const baseConvertToLlm = activeSession.agent.convertToLlm.bind(activeSession.agent); activeSession.agent.convertToLlm = async (messages) => - await baseConvertToLlm(normalizeAssistantReplayContent(messages)); + await baseConvertToLlm(normalizeMessagesForLlmBoundary(messages)); } let prePromptMessageCount = activeSession.messages.length; let unwindowedContextEngineMessagesForPrecheck: AgentMessage[] | undefined; diff --git a/src/agents/session-tool-result-guard.tool-result-persist-hook.test.ts b/src/agents/session-tool-result-guard.tool-result-persist-hook.test.ts index 27770876e3f..525d0cc480f 100644 --- a/src/agents/session-tool-result-guard.tool-result-persist-hook.test.ts +++ b/src/agents/session-tool-result-guard.tool-result-persist-hook.test.ts @@ -3,7 +3,7 @@ import os from "node:os"; import path from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; import { SessionManager } from "@mariozechner/pi-coding-agent"; -import { describe, expect, it, afterEach } from "vitest"; +import { describe, expect, it, afterEach, vi } from "vitest"; import { initializeGlobalHookRunner, resetGlobalHookRunner, @@ -88,6 +88,14 @@ function expectPersistedToolResultTextCapped(sm: ReturnType) { + const toolResult = getPersistedToolResult(sm); + const details = toolResult.details as Record; + expect(details.persistedDetailsTruncated).toBe(true); + expect(details.aggregated).toBeUndefined(); + expect(Buffer.byteLength(JSON.stringify(details), "utf-8")).toBeLessThan(8_192); +} + afterEach(() => { resetGlobalHookRunner(); if (originalBundledPluginsDir === undefined) { @@ -109,6 +117,189 @@ describe("tool_result_persist hook", () => { expect(toolResult.details).toBeTruthy(); }); + it("caps oversized toolResult details before persistence", () => { + const sm = guardSessionManager(SessionManager.inMemory(), { + agentId: "main", + sessionKey: "main", + }); + const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void; + appendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }], + } as AgentMessage); + appendMessage({ + role: "toolResult", + toolCallId: "call_1", + isError: false, + content: [{ type: "text", text: "visible output stays small" }], + details: { + status: "completed", + sessionId: "exec-1", + aggregated: "x".repeat(120_000), + tail: "t".repeat(6_000), + sessions: [ + { + sessionId: "proc-1", + status: "completed", + command: "node noisy-script.js ".repeat(2_000), + aggregated: "a".repeat(80_000), + tail: "z".repeat(8_000), + }, + ], + }, + } as any); + + const toolResult = getPersistedToolResult(sm); + expect(toolResult.content[0]?.text).toBe("visible output stays small"); + expectPersistedToolResultDetailsCapped(sm); + }); + + it("caps oversized toolResult details without serializing the original payload", () => { + const sm = guardSessionManager(SessionManager.inMemory(), { + agentId: "main", + sessionKey: "main", + }); + const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void; + const oversizedDetails = { + status: "completed", + sessionId: "exec-large", + aggregated: "x".repeat(200_000), + sessions: [ + { + sessionId: "proc-large", + command: "node noisy-script.js ".repeat(2_000), + tail: "z".repeat(20_000), + }, + ], + }; + const originalStringify = JSON.stringify; + const stringifySpy = vi.spyOn(JSON, "stringify").mockImplementation((value, ...args) => { + if (value === oversizedDetails) { + throw new Error("unbounded original details stringify"); + } + return originalStringify(value, ...args); + }); + + try { + appendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }], + } as AgentMessage); + appendMessage({ + role: "toolResult", + toolCallId: "call_1", + isError: false, + content: [{ type: "text", text: "visible output stays small" }], + details: oversizedDetails, + } as any); + } finally { + stringifySpy.mockRestore(); + } + + const toolResult = getPersistedToolResult(sm); + expect(toolResult.content[0]?.text).toBe("visible output stays small"); + expectPersistedToolResultDetailsCapped(sm); + expect(stringifySpy).not.toHaveBeenCalledWith(oversizedDetails); + }); + + it("caps wide toolResult details without materializing every entry up front", () => { + const sm = guardSessionManager(SessionManager.inMemory(), { + agentId: "main", + sessionKey: "main", + }); + const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void; + const wideDetails: Record = { + status: "completed", + sessionId: "exec-wide", + }; + for (let index = 0; index < 20_000; index += 1) { + wideDetails[`debug_${index}`] = `value-${index}`; + } + const originalEntries = Object.entries; + const originalKeys = Object.keys; + const entriesSpy = vi.spyOn(Object, "entries").mockImplementation((value) => { + if (value === wideDetails) { + throw new Error("wide details entries materialized"); + } + return originalEntries(value); + }); + const keysSpy = vi.spyOn(Object, "keys").mockImplementation((value) => { + if (value === wideDetails) { + throw new Error("wide details keys materialized"); + } + return originalKeys(value); + }); + + try { + appendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }], + } as AgentMessage); + appendMessage({ + role: "toolResult", + toolCallId: "call_1", + isError: false, + content: [{ type: "text", text: "visible output stays small" }], + details: wideDetails, + } as any); + } finally { + entriesSpy.mockRestore(); + keysSpy.mockRestore(); + } + + const toolResult = getPersistedToolResult(sm); + const details = toolResult.details as Record; + expect(details.persistedDetailsTruncated).toBe(true); + expect(details.originalDetailKeys).toEqual( + expect.arrayContaining(["status", "sessionId", "debug_0"]), + ); + }); + + it("falls back to a compact summary when sanitized details still exceed the cap", () => { + const sm = guardSessionManager(SessionManager.inMemory(), { + agentId: "main", + sessionKey: "main", + }); + const appendMessage = sm.appendMessage.bind(sm) as unknown as (message: AgentMessage) => void; + appendMessage({ + role: "assistant", + content: [{ type: "toolCall", id: "call_1", name: "exec", arguments: {} }], + } as AgentMessage); + appendMessage({ + role: "toolResult", + toolCallId: "call_1", + isError: false, + content: [{ type: "text", text: "visible output stays small" }], + details: { + status: "completed".repeat(250), + sessionId: "exec-oversized", + cwd: "/tmp/very-long-working-directory".repeat(250), + name: "noisy process".repeat(250), + fullOutputPath: "/tmp/output.log".repeat(250), + truncation: "truncated".repeat(250), + tail: "t".repeat(20_000), + aggregated: "a".repeat(120_000), + sessions: Array.from({ length: 10 }, (_, index) => ({ + sessionId: `proc-${index}`, + status: "completed".repeat(100), + cwd: "/tmp/session".repeat(100), + name: "child process".repeat(100), + command: "node noisy-script.js ".repeat(200), + aggregated: "x".repeat(50_000), + tail: "z".repeat(10_000), + })), + }, + } as any); + + const toolResult = getPersistedToolResult(sm); + const details = toolResult.details as Record; + expect(details.persistedDetailsTruncated).toBe(true); + expect(details.finalDetailsTruncated).toBe(true); + expect(details.aggregated).toBeUndefined(); + expect(details.tail).toBeUndefined(); + expect(Buffer.byteLength(JSON.stringify(details), "utf-8")).toBeLessThan(8_192); + }); + it("loads tool_result_persist hooks without breaking persistence", () => { const tmp = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-toolpersist-")); process.env.OPENCLAW_BUNDLED_PLUGINS_DIR = "/nonexistent/bundled/plugins"; @@ -189,6 +380,35 @@ describe("tool_result_persist hook", () => { appendToolCallAndResult(sm); expectPersistedToolResultTextCapped(sm); }); + + it("reapplies the details cap after tool_result_persist expands details", () => { + initializeTempPlugin({ + tmpPrefix: "openclaw-toolpersist-details-expand-", + id: "persist-details-expand", + body: `export default { id: "persist-details-expand", register(api) { + api.on("tool_result_persist", (event) => { + return { + message: { + ...event.message, + details: { + status: "completed", + aggregated: "x".repeat(150000), + sessions: [{ sessionId: "proc-1", command: "y".repeat(50000), tail: "z".repeat(10000) }], + }, + }, + }; + }, { priority: 10 }); +} };`, + }); + + const sm = guardSessionManager(SessionManager.inMemory(), { + agentId: "main", + sessionKey: "main", + }); + + appendToolCallAndResult(sm); + expectPersistedToolResultDetailsCapped(sm); + }); }); describe("before_message_write hook", () => { diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index c3cb82314c5..1f7600a980b 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -1,5 +1,11 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; +import { + boundedJsonUtf8Bytes, + firstEnumerableOwnKeys, + jsonUtf8BytesOrInfinity, + type BoundedJsonUtf8Bytes, +} from "../infra/json-utf8-bytes.js"; import type { PluginHookBeforeMessageWriteEvent, PluginHookBeforeMessageWriteResult, @@ -38,6 +44,188 @@ function resolveMaxToolResultChars(opts?: { maxToolResultChars?: number }): numb return Math.max(1, opts?.maxToolResultChars ?? DEFAULT_MAX_LIVE_TOOL_RESULT_CHARS); } +const MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES = 8_192; +const MAX_PERSISTED_DETAIL_STRING_CHARS = 2_000; +const MAX_PERSISTED_DETAIL_SESSION_COUNT = 10; +const MAX_PERSISTED_DETAIL_FALLBACK_STRING_CHARS = 200; + +function originalDetailsSizeFields(size: BoundedJsonUtf8Bytes): Record { + return size.complete + ? { originalDetailsBytes: size.bytes } + : { originalDetailsBytesAtLeast: size.bytes }; +} + +function truncatePersistedDetailString( + value: string, + maxChars = MAX_PERSISTED_DETAIL_STRING_CHARS, +): string { + if (value.length <= maxChars) { + return value; + } + return `${value.slice(0, maxChars)}\n\n[OpenClaw persisted detail truncated: ${ + value.length - maxChars + } chars omitted]`; +} + +function sanitizePersistedSessionDetail(value: unknown): unknown { + if (!value || typeof value !== "object") { + return value; + } + const src = value as Record; + const out: Record = {}; + for (const key of [ + "sessionId", + "status", + "pid", + "startedAt", + "endedAt", + "runtimeMs", + "cwd", + "name", + "truncated", + "exitCode", + "exitSignal", + ]) { + const field = src[key]; + if (field !== undefined) { + out[key] = typeof field === "string" ? truncatePersistedDetailString(field, 500) : field; + } + } + if (typeof src.command === "string") { + out.command = truncatePersistedDetailString(src.command, 500); + } + return out; +} + +function buildPersistedDetailsFallback( + src: Record | undefined, + originalSize: BoundedJsonUtf8Bytes, + sanitizedBytes?: number, +): Record { + const fallback: Record = { + persistedDetailsTruncated: true, + finalDetailsTruncated: true, + ...originalDetailsSizeFields(originalSize), + }; + if (sanitizedBytes !== undefined) { + fallback.sanitizedDetailsBytes = sanitizedBytes; + } + if (src) { + fallback.originalDetailKeys = firstEnumerableOwnKeys(src, 40); + for (const key of ["status", "sessionId", "pid", "exitCode", "exitSignal", "truncated"]) { + const field = src[key]; + if (field !== undefined) { + fallback[key] = + typeof field === "string" + ? truncatePersistedDetailString(field, MAX_PERSISTED_DETAIL_FALLBACK_STRING_CHARS) + : field; + } + } + } + return fallback; +} + +function enforcePersistedDetailsByteCap( + value: Record, + src: Record | undefined, + originalSize: BoundedJsonUtf8Bytes, +): Record { + const sanitizedBytes = jsonUtf8BytesOrInfinity(value); + if (sanitizedBytes <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) { + return value; + } + const fallback = buildPersistedDetailsFallback(src, originalSize, sanitizedBytes); + if (jsonUtf8BytesOrInfinity(fallback) <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) { + return fallback; + } + return { + persistedDetailsTruncated: true, + finalDetailsTruncated: true, + ...originalDetailsSizeFields(originalSize), + sanitizedDetailsBytes: sanitizedBytes, + }; +} + +function sanitizeToolResultDetailsForPersistence(details: unknown): unknown { + if (details === undefined || details === null) { + return details; + } + const originalSize = boundedJsonUtf8Bytes(details, MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES); + if (originalSize.complete && originalSize.bytes <= MAX_PERSISTED_TOOL_RESULT_DETAILS_BYTES) { + return details; + } + if (typeof details !== "object") { + return enforcePersistedDetailsByteCap( + { + persistedDetailsTruncated: true, + ...originalDetailsSizeFields(originalSize), + valueType: typeof details, + }, + undefined, + originalSize, + ); + } + const src = details as Record; + const out: Record = { + persistedDetailsTruncated: true, + ...originalDetailsSizeFields(originalSize), + originalDetailKeys: firstEnumerableOwnKeys(src, 40), + }; + for (const key of [ + "status", + "sessionId", + "pid", + "startedAt", + "endedAt", + "cwd", + "name", + "exitCode", + "exitSignal", + "retryInMs", + "total", + "totalLines", + "totalChars", + "truncated", + "fullOutputPath", + "truncation", + ]) { + const field = src[key]; + if (field !== undefined) { + out[key] = typeof field === "string" ? truncatePersistedDetailString(field) : field; + } + } + if (typeof src.tail === "string") { + out.tail = truncatePersistedDetailString(src.tail); + } + if (Array.isArray(src.sessions)) { + out.sessions = src.sessions + .slice(0, MAX_PERSISTED_DETAIL_SESSION_COUNT) + .map(sanitizePersistedSessionDetail); + if (src.sessions.length > MAX_PERSISTED_DETAIL_SESSION_COUNT) { + out.sessionsTruncated = src.sessions.length - MAX_PERSISTED_DETAIL_SESSION_COUNT; + } + } + return enforcePersistedDetailsByteCap(out, src, originalSize); +} + +function capToolResultDetails(msg: AgentMessage): AgentMessage { + if ((msg as { role?: string }).role !== "toolResult") { + return msg; + } + const details = (msg as { details?: unknown }).details; + const sanitizedDetails = sanitizeToolResultDetailsForPersistence(details); + if (sanitizedDetails === details) { + return msg; + } + const next = { ...msg } as AgentMessage & { details?: unknown }; + next.details = sanitizedDetails; + return next; +} + +function capToolResultForPersistence(msg: AgentMessage, maxChars: number): AgentMessage { + return capToolResultDetails(capToolResultSize(msg, maxChars)); +} + function normalizePersistedToolResultName( message: AgentMessage, fallbackName?: string, @@ -169,7 +357,7 @@ export function installSessionToolResultGuard( }), ); if (flushed) { - originalAppend(capToolResultSize(flushed, maxToolResultChars) as never); + originalAppend(capToolResultForPersistence(flushed, maxToolResultChars) as never); } } } @@ -206,7 +394,10 @@ export function installSessionToolResultGuard( const normalizedToolResult = normalizePersistedToolResultName(nextMessage, toolName); // Apply hard size cap before persistence to prevent oversized tool results // from consuming the entire context window on subsequent LLM calls. - const capped = capToolResultSize(persistMessage(normalizedToolResult), maxToolResultChars); + const capped = capToolResultForPersistence( + persistMessage(normalizedToolResult), + maxToolResultChars, + ); const persisted = applyBeforeWriteHook( persistToolResult(capped, { toolCallId: id ?? undefined, @@ -217,7 +408,7 @@ export function installSessionToolResultGuard( if (!persisted) { return undefined; } - return originalAppend(capToolResultSize(persisted, maxToolResultChars) as never); + return originalAppend(capToolResultForPersistence(persisted, maxToolResultChars) as never); } // Skip tool call extraction for aborted/errored assistant messages. diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 34cde076626..1126dad61d0 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -59,6 +59,8 @@ type GatewayReloadLog = { }; const MCP_RUNTIME_RELOAD_DISPOSE_TIMEOUT_MS = 5_000; +const CHANNEL_RELOAD_DEFERRAL_POLL_MS = 500; +const CHANNEL_RELOAD_STILL_PENDING_WARN_MS = 30_000; async function disposeMcpRuntimesWithTimeout(params: { dispose: () => Promise; @@ -125,6 +127,87 @@ type ManagedGatewayConfigReloaderParams = Omit< }; export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) { + const getActiveCounts = () => { + const queueSize = getTotalQueueSize(); + const pendingReplies = getTotalPendingReplies(); + const embeddedRuns = getActiveEmbeddedRunCount(); + const activeTasks = getInspectableTaskRegistrySummary().active; + return { + queueSize, + pendingReplies, + embeddedRuns, + activeTasks, + totalActive: queueSize + pendingReplies + embeddedRuns + activeTasks, + }; + }; + const formatActiveDetails = (counts: ReturnType) => { + const details = []; + if (counts.queueSize > 0) { + details.push(`${counts.queueSize} operation(s)`); + } + if (counts.pendingReplies > 0) { + details.push(`${counts.pendingReplies} reply(ies)`); + } + if (counts.embeddedRuns > 0) { + details.push(`${counts.embeddedRuns} embedded run(s)`); + } + if (counts.activeTasks > 0) { + details.push(`${counts.activeTasks} task run(s)`); + } + return details; + }; + const waitForActiveWorkBeforeChannelReload = async ( + channels: Iterable, + nextConfig: OpenClawConfig, + ) => { + const initial = getActiveCounts(); + if (initial.totalActive <= 0) { + return; + } + const channelNames = [...channels].join(", "); + const initialDetails = formatActiveDetails(initial); + params.logReload.warn( + `config change requires channel reload (${channelNames}) — deferring until ${initialDetails.join( + ", ", + )} complete`, + ); + const timeoutMsRaw = nextConfig.gateway?.reload?.deferralTimeoutMs; + const timeoutMs = + typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw) && timeoutMsRaw > 0 + ? Math.max(CHANNEL_RELOAD_DEFERRAL_POLL_MS, Math.floor(timeoutMsRaw)) + : undefined; + const startedAt = Date.now(); + let nextStillPendingAt = startedAt + CHANNEL_RELOAD_STILL_PENDING_WARN_MS; + while (true) { + await new Promise((resolve) => { + const timer = setTimeout(resolve, CHANNEL_RELOAD_DEFERRAL_POLL_MS); + timer.unref?.(); + }); + const current = getActiveCounts(); + if (current.totalActive <= 0) { + params.logReload.info("active operations and replies completed; reloading channels now"); + return; + } + const elapsedMs = Date.now() - startedAt; + if (timeoutMs !== undefined && elapsedMs >= timeoutMs) { + const remaining = formatActiveDetails(current); + params.logReload.warn( + `channel reload timeout after ${elapsedMs}ms with ${remaining.join( + ", ", + )} still active; reloading channels anyway`, + ); + return; + } + if (Date.now() >= nextStillPendingAt) { + const remaining = formatActiveDetails(current); + params.logReload.warn( + `channel reload still deferred after ${elapsedMs}ms with ${remaining.join(", ")} active`, + ); + nextStillPendingAt = Date.now() + CHANNEL_RELOAD_STILL_PENDING_WARN_MS; + } + } + }; + const applyHotReload = async (plan: GatewayReloadPlan, nextConfig: OpenClawConfig) => { setGatewaySigusr1RestartPolicy({ allowExternal: isRestartEnabled(nextConfig) }); const state = params.getState(); @@ -207,6 +290,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) "skipping channel reload (OPENCLAW_SKIP_CHANNELS=1 or OPENCLAW_SKIP_PROVIDERS=1)", ); } else { + await waitForActiveWorkBeforeChannelReload(plan.restartChannels, nextConfig); const restartChannel = async (name: ChannelKind) => { params.logChannels.info(`restarting ${name} channel`); await params.stopChannel(name); @@ -244,35 +328,6 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams) return false; } - const getActiveCounts = () => { - const queueSize = getTotalQueueSize(); - const pendingReplies = getTotalPendingReplies(); - const embeddedRuns = getActiveEmbeddedRunCount(); - const activeTasks = getInspectableTaskRegistrySummary().active; - return { - queueSize, - pendingReplies, - embeddedRuns, - activeTasks, - totalActive: queueSize + pendingReplies + embeddedRuns + activeTasks, - }; - }; - const formatActiveDetails = (counts: ReturnType) => { - const details = []; - if (counts.queueSize > 0) { - details.push(`${counts.queueSize} operation(s)`); - } - if (counts.pendingReplies > 0) { - details.push(`${counts.pendingReplies} reply(ies)`); - } - if (counts.embeddedRuns > 0) { - details.push(`${counts.embeddedRuns} embedded run(s)`); - } - if (counts.activeTasks > 0) { - details.push(`${counts.activeTasks} task run(s)`); - } - return details; - }; const active = getActiveCounts(); if (active.totalActive > 0) { diff --git a/src/gateway/server.reload.test.ts b/src/gateway/server.reload.test.ts index bd3a2ad669a..561a01f8bd6 100644 --- a/src/gateway/server.reload.test.ts +++ b/src/gateway/server.reload.test.ts @@ -20,6 +20,7 @@ import { ConnectErrorDetailCodes } from "./protocol/connect-error-details.js"; import { connectReq, connectOk, + embeddedRunMock, installGatewayTestHooks, rpcReq, startServerWithClient, @@ -282,6 +283,7 @@ describe("gateway hot reload", () => { let prevOpenAiApiKey: string | undefined; beforeEach(() => { + vi.clearAllMocks(); prevSkipChannels = process.env.OPENCLAW_SKIP_CHANNELS; prevSkipGmail = process.env.OPENCLAW_SKIP_GMAIL_WATCHER; prevSkipProviders = process.env.OPENCLAW_SKIP_PROVIDERS; @@ -289,10 +291,12 @@ describe("gateway hot reload", () => { process.env.OPENCLAW_SKIP_CHANNELS = "0"; delete process.env.OPENCLAW_SKIP_GMAIL_WATCHER; delete process.env.OPENCLAW_SKIP_PROVIDERS; + hoisted.cronInstances.length = 0; hoisted.activeEmbeddedRunCount.value = 0; hoisted.totalPendingReplies.value = 0; hoisted.totalQueueSize.value = 0; hoisted.activeTaskCount.value = 0; + embeddedRunMock.activeIds.clear(); hoisted.resetModelCatalogCache.mockReset(); hoisted.disposeAllSessionMcpRuntimes.mockReset(); hoisted.disposeAllSessionMcpRuntimes.mockResolvedValue(undefined); @@ -423,6 +427,196 @@ describe("gateway hot reload", () => { ); } + it("defers channel hot reload until active work drains", async () => { + await withNonMinimalGatewayServer(async () => { + const onHotReload = hoisted.getOnHotReload(); + expect(onHotReload).toBeTypeOf("function"); + + hoisted.providerManager.stopChannel.mockClear(); + hoisted.providerManager.startChannel.mockClear(); + hoisted.activeEmbeddedRunCount.value = 1; + embeddedRunMock.activeIds.add("reload-active"); + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const reloadPromise = onHotReload?.( + { + changedPaths: ["channels.discord.token"], + restartGateway: false, + restartReasons: [], + hotReasons: ["channels.discord.token"], + reloadHooks: false, + restartGmailWatcher: false, + restartCron: false, + restartHeartbeat: false, + restartChannels: new Set(["discord"]), + noopPaths: [], + }, + { + gateway: { reload: { deferralTimeoutMs: 60_000 } }, + channels: { discord: { token: "token" } }, + }, + ); + try { + await delay(550); + expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled(); + expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled(); + + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await reloadPromise; + } finally { + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await reloadPromise?.catch(() => {}); + } + + expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord"); + expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord"); + }); + }); + + it("uses the configured timeout when active work does not drain before channel reload", async () => { + await withNonMinimalGatewayServer(async () => { + const onHotReload = hoisted.getOnHotReload(); + expect(onHotReload).toBeTypeOf("function"); + + hoisted.providerManager.stopChannel.mockClear(); + hoisted.providerManager.startChannel.mockClear(); + hoisted.activeEmbeddedRunCount.value = 1; + embeddedRunMock.activeIds.add("reload-stuck"); + vi.useFakeTimers(); + const reloadPromise = onHotReload?.( + { + changedPaths: ["channels.discord.token"], + restartGateway: false, + restartReasons: [], + hotReasons: ["channels.discord.token"], + reloadHooks: false, + restartGmailWatcher: false, + restartCron: false, + restartHeartbeat: false, + restartChannels: new Set(["discord"]), + noopPaths: [], + }, + { + gateway: { reload: { deferralTimeoutMs: 1_000 } }, + channels: { discord: { token: "token" } }, + }, + ); + try { + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(500); + expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled(); + expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(500); + await reloadPromise; + } finally { + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await vi.advanceTimersByTimeAsync(500).catch(() => {}); + vi.useRealTimers(); + await reloadPromise?.catch(() => {}); + } + + expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord"); + expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord"); + }); + }); + + it("waits indefinitely for channel hot reload when deferral timeout is 0 or omitted", async () => { + await withNonMinimalGatewayServer(async () => { + const onHotReload = hoisted.getOnHotReload(); + expect(onHotReload).toBeTypeOf("function"); + + hoisted.providerManager.stopChannel.mockClear(); + hoisted.providerManager.startChannel.mockClear(); + hoisted.activeEmbeddedRunCount.value = 1; + embeddedRunMock.activeIds.add("reload-indefinite"); + vi.useFakeTimers(); + const reloadPromise = onHotReload?.( + { + changedPaths: ["channels.discord.token"], + restartGateway: false, + restartReasons: [], + hotReasons: ["channels.discord.token"], + reloadHooks: false, + restartGmailWatcher: false, + restartCron: false, + restartHeartbeat: false, + restartChannels: new Set(["discord"]), + noopPaths: [], + }, + { + gateway: { reload: { deferralTimeoutMs: 0 } }, + channels: { discord: { token: "token" } }, + }, + ); + try { + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(10 * 60_000); + expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled(); + expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled(); + + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await vi.advanceTimersByTimeAsync(500); + await reloadPromise; + } finally { + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await vi.advanceTimersByTimeAsync(500).catch(() => {}); + vi.useRealTimers(); + await reloadPromise?.catch(() => {}); + } + + expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("discord"); + expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("discord"); + + hoisted.providerManager.stopChannel.mockClear(); + hoisted.providerManager.startChannel.mockClear(); + hoisted.activeEmbeddedRunCount.value = 1; + embeddedRunMock.activeIds.add("reload-indefinite-omitted"); + vi.useFakeTimers(); + const omittedPromise = onHotReload?.( + { + changedPaths: ["channels.telegram.botToken"], + restartGateway: false, + restartReasons: [], + hotReasons: ["channels.telegram.botToken"], + reloadHooks: false, + restartGmailWatcher: false, + restartCron: false, + restartHeartbeat: false, + restartChannels: new Set(["telegram"]), + noopPaths: [], + }, + { + channels: { telegram: { botToken: "token" } }, + }, + ); + try { + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(10 * 60_000); + expect(hoisted.providerManager.stopChannel).not.toHaveBeenCalled(); + expect(hoisted.providerManager.startChannel).not.toHaveBeenCalled(); + + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await vi.advanceTimersByTimeAsync(500); + await omittedPromise; + } finally { + hoisted.activeEmbeddedRunCount.value = 0; + embeddedRunMock.activeIds.clear(); + await vi.advanceTimersByTimeAsync(500).catch(() => {}); + vi.useRealTimers(); + await omittedPromise?.catch(() => {}); + } + + expect(hoisted.providerManager.stopChannel).toHaveBeenCalledWith("telegram"); + expect(hoisted.providerManager.startChannel).toHaveBeenCalledWith("telegram"); + }); + }); + it("applies hot reload actions and emits restart signal", async () => { await withNonMinimalGatewayServer(async () => { const onHotReload = hoisted.getOnHotReload(); diff --git a/src/infra/json-utf8-bytes.test.ts b/src/infra/json-utf8-bytes.test.ts index 20dde70841f..7f1605c52fa 100644 --- a/src/infra/json-utf8-bytes.test.ts +++ b/src/infra/json-utf8-bytes.test.ts @@ -1,5 +1,10 @@ import { describe, expect, it } from "vitest"; -import { jsonUtf8Bytes } from "./json-utf8-bytes.js"; +import { + boundedJsonUtf8Bytes, + firstEnumerableOwnKeys, + jsonUtf8Bytes, + jsonUtf8BytesOrInfinity, +} from "./json-utf8-bytes.js"; function createCircularValue() { const circular: { self?: unknown } = {}; @@ -45,3 +50,69 @@ describe("jsonUtf8Bytes", () => { expect(jsonUtf8Bytes(value)).toBe(Buffer.byteLength(expected, "utf8")); }); }); + +describe("jsonUtf8BytesOrInfinity", () => { + it("returns exact JSON byte length for serializable values", () => { + const value = { a: "x", b: [1, 2, null] }; + expect(jsonUtf8BytesOrInfinity(value)).toBe(Buffer.byteLength(JSON.stringify(value), "utf8")); + }); + + it.each([createCircularValue(), 12n, undefined])( + "returns infinity for values that cannot be serialized as JSON", + (value) => { + expect(jsonUtf8BytesOrInfinity(value)).toBe(Number.POSITIVE_INFINITY); + }, + ); +}); + +describe("boundedJsonUtf8Bytes", () => { + it.each([ + { name: "plain object", value: { a: "x", b: [1, 2, null] } }, + { name: "unicode string", value: { value: "🙂" } }, + { + name: "array holes and undefined", + value: (() => { + const value = [undefined, () => undefined] as unknown[]; + value.length = 3; + return value; + })(), + }, + { name: "non-finite numbers", value: [Number.NaN, Number.POSITIVE_INFINITY] }, + { name: "date", value: { at: new Date("2026-04-25T12:00:00.000Z") } }, + ])("matches JSON.stringify byte length for $name", ({ value }) => { + expect(boundedJsonUtf8Bytes(value, 100_000)).toEqual({ + bytes: Buffer.byteLength(JSON.stringify(value), "utf8"), + complete: true, + }); + }); + + it("stops once the byte limit is exceeded", () => { + expect(boundedJsonUtf8Bytes({ value: "x".repeat(50_000) }, 8_192)).toEqual({ + bytes: 8_193, + complete: false, + }); + }); + + it.each([ + { name: "circular objects", value: createCircularValue() }, + { name: "BigInt", value: { value: 12n } }, + { name: "custom toJSON", value: { toJSON: () => ({ ok: true }) } }, + ])("marks $name incomplete instead of invoking unsafe JSON serialization", ({ value }) => { + const result = boundedJsonUtf8Bytes(value, 8_192); + expect(result.complete).toBe(false); + expect(result.bytes).toBeGreaterThan(8_192); + }); +}); + +describe("firstEnumerableOwnKeys", () => { + it("returns only own enumerable keys up to the limit", () => { + const inherited = { inherited: true }; + const value = Object.create(inherited) as Record; + value.a = 1; + value.b = 2; + value.c = 3; + Object.defineProperty(value, "hidden", { enumerable: false, value: true }); + + expect(firstEnumerableOwnKeys(value, 2)).toEqual(["a", "b"]); + }); +}); diff --git a/src/infra/json-utf8-bytes.ts b/src/infra/json-utf8-bytes.ts index ec677cffb32..c35406d8643 100644 --- a/src/infra/json-utf8-bytes.ts +++ b/src/infra/json-utf8-bytes.ts @@ -5,3 +5,144 @@ export function jsonUtf8Bytes(value: unknown): number { return Buffer.byteLength(String(value), "utf8"); } } + +export type BoundedJsonUtf8Bytes = { + bytes: number; + complete: boolean; +}; + +export function jsonUtf8BytesOrInfinity(value: unknown): number { + try { + const serialized = JSON.stringify(value); + return typeof serialized === "string" + ? Buffer.byteLength(serialized, "utf8") + : Number.POSITIVE_INFINITY; + } catch { + return Number.POSITIVE_INFINITY; + } +} + +function jsonStringByteLengthUpToLimit(value: string, remainingBytes: number): number { + if (value.length + 2 > remainingBytes) { + return remainingBytes + 1; + } + return jsonUtf8BytesOrInfinity(value); +} + +function* enumerableOwnEntries(value: object): Generator<[string, unknown]> { + const record = value as Record; + for (const key in record) { + if (Object.prototype.propertyIsEnumerable.call(record, key)) { + yield [key, record[key]]; + } + } +} + +export function firstEnumerableOwnKeys(value: object, maxKeys: number): string[] { + const keys: string[] = []; + for (const key in value as Record) { + if (!Object.prototype.propertyIsEnumerable.call(value, key)) { + continue; + } + keys.push(key); + if (keys.length >= maxKeys) { + break; + } + } + return keys; +} + +export function boundedJsonUtf8Bytes(value: unknown, maxBytes: number): BoundedJsonUtf8Bytes { + let bytes = 0; + const seen = new WeakSet(); + + const add = (amount: number): void => { + bytes += amount; + if (bytes > maxBytes) { + throw new Error("json_byte_limit_exceeded"); + } + }; + + const visit = (entry: unknown, inArray: boolean): void => { + if (entry === null) { + add(4); + return; + } + switch (typeof entry) { + case "string": + add(jsonStringByteLengthUpToLimit(entry, maxBytes - bytes)); + return; + case "number": + add(jsonUtf8BytesOrInfinity(Number.isFinite(entry) ? entry : null)); + return; + case "boolean": + add(entry ? 4 : 5); + return; + case "undefined": + case "function": + case "symbol": + if (inArray) { + add(4); + } + return; + case "bigint": + throw new Error("json_byte_length_unsupported"); + case "object": + break; + } + + const objectEntry = entry as object; + if (seen.has(objectEntry)) { + throw new Error("json_byte_length_circular"); + } + if ( + typeof (objectEntry as { toJSON?: unknown }).toJSON === "function" && + !(objectEntry instanceof Date) + ) { + throw new Error("json_byte_length_custom_to_json"); + } + seen.add(objectEntry); + try { + if (objectEntry instanceof Date) { + visit(objectEntry.toJSON(), inArray); + return; + } + if (Array.isArray(objectEntry)) { + add(1); + for (let index = 0; index < objectEntry.length; index += 1) { + if (index > 0) { + add(1); + } + visit(objectEntry[index], true); + } + add(1); + return; + } + + add(1); + let wroteField = false; + for (const [key, field] of enumerableOwnEntries(objectEntry)) { + if (field === undefined || typeof field === "function" || typeof field === "symbol") { + continue; + } + if (wroteField) { + add(1); + } + wroteField = true; + add(jsonStringByteLengthUpToLimit(key, maxBytes - bytes)); + add(1); + visit(field, false); + } + add(1); + } finally { + seen.delete(objectEntry); + } + }; + + try { + visit(value, false); + return { bytes, complete: true }; + } catch { + return { bytes: Math.max(bytes, maxBytes + 1), complete: false }; + } +} diff --git a/ui/src/ui/chat/grouped-render.test.ts b/ui/src/ui/chat/grouped-render.test.ts index 01e22a559f5..334a5e61921 100644 --- a/ui/src/ui/chat/grouped-render.test.ts +++ b/ui/src/ui/chat/grouped-render.test.ts @@ -106,6 +106,40 @@ function renderAssistantMessage( renderGroupedMessage(container, message, "assistant", opts); } +function renderAssistantMessages( + container: HTMLElement, + messages: unknown[], + opts: Partial = {}, +) { + const timestamp = + typeof messages[0] === "object" && + messages[0] !== null && + typeof (messages[0] as { timestamp?: unknown }).timestamp === "number" + ? (messages[0] as { timestamp: number }).timestamp + : Date.now(); + const group: MessageGroup = { + kind: "group", + key: "assistant-group", + role: "assistant", + messages: messages.map((message, index) => ({ + key: `assistant-message-${index}`, + message, + })), + timestamp, + isStreaming: false, + }; + render( + renderMessageGroup(group, { + showReasoning: true, + showToolCalls: true, + assistantName: "OpenClaw", + assistantAvatar: null, + ...opts, + }), + container, + ); +} + function renderGroupedMessage( container: HTMLElement, message: unknown, @@ -318,6 +352,32 @@ describe("grouped chat rendering", () => { expect(outputHeavy.querySelector(".msg-meta__ctx")?.textContent).toBe("10% ctx"); }); + it("uses the largest single assistant call for grouped context usage", () => { + const container = document.createElement("div"); + + renderAssistantMessages( + container, + [ + { + role: "assistant", + content: "Checking", + usage: { input: 105_944, output: 100 }, + timestamp: 1000, + }, + { + role: "assistant", + content: "Done", + usage: { input: 108_577, output: 100 }, + timestamp: 1001, + }, + ], + { contextWindow: 258_400 }, + ); + + expect(container.querySelector(".msg-meta__ctx")?.textContent).toBe("42% ctx"); + expect(container.textContent).toContain("↑214.5k"); + }); + it("renders full dates with message and streaming timestamps", () => { const container = document.createElement("div"); const timestamp = Date.UTC(2026, 3, 24, 18, 30); diff --git a/ui/src/ui/chat/grouped-render.ts b/ui/src/ui/chat/grouped-render.ts index 463069fb944..f298da6b3c2 100644 --- a/ui/src/ui/chat/grouped-render.ts +++ b/ui/src/ui/chat/grouped-render.ts @@ -432,6 +432,7 @@ function extractGroupMeta(group: MessageGroup, contextWindow: number | null): Gr let cost = 0; let model: string | null = null; let hasUsage = false; + let maxPromptTokens = 0; for (const { message } of group.messages) { const m = message as Record; @@ -441,10 +442,15 @@ function extractGroupMeta(group: MessageGroup, contextWindow: number | null): Gr const usage = m.usage as Record | undefined; if (usage) { hasUsage = true; - input += usage.input ?? usage.inputTokens ?? 0; - output += usage.output ?? usage.outputTokens ?? 0; - cacheRead += usage.cacheRead ?? usage.cache_read_input_tokens ?? 0; - cacheWrite += usage.cacheWrite ?? usage.cache_creation_input_tokens ?? 0; + const callInput = usage.input ?? usage.inputTokens ?? 0; + const callOutput = usage.output ?? usage.outputTokens ?? 0; + const callCacheRead = usage.cacheRead ?? usage.cache_read_input_tokens ?? 0; + const callCacheWrite = usage.cacheWrite ?? usage.cache_creation_input_tokens ?? 0; + input += callInput; + output += callOutput; + cacheRead += callCacheRead; + cacheWrite += callCacheWrite; + maxPromptTokens = Math.max(maxPromptTokens, callInput + callCacheRead + callCacheWrite); } const c = m.cost as Record | undefined; if (c?.total) { @@ -459,10 +465,9 @@ function extractGroupMeta(group: MessageGroup, contextWindow: number | null): Gr return null; } - const promptTokens = input + cacheRead + cacheWrite; const contextPercent = - contextWindow && promptTokens > 0 - ? Math.min(Math.round((promptTokens / contextWindow) * 100), 100) + contextWindow && maxPromptTokens > 0 + ? Math.min(Math.round((maxPromptTokens / contextWindow) * 100), 100) : null; return { input, output, cacheRead, cacheWrite, cost, model, contextPercent };