diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad03596f00..db6e5f310e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - Outbound/send config threading: pass resolved SecretRef config through outbound adapters and helper send paths so send flows do not reload unresolved runtime config. (#33987) Thanks @joshavant. - Sessions/subagent attachments: remove `attachments[].content.maxLength` from `sessions_spawn` schema to avoid llama.cpp GBNF repetition overflow, and preflight UTF-8 byte size before buffer allocation while keeping runtime file-size enforcement unchanged. (#33648) Thanks @anisoptera. - Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr. +- ACP/Discord startup hardening: clean up stuck ACP worker children on gateway restart, unbind stale ACP thread bindings during Discord startup reconciliation, and add per-thread listener watchdog timeouts so wedged turns cannot block later messages. (#33699) Thanks @dutifulbob. - Extensions/media local-root propagation: consistently forward `mediaLocalRoots` through extension `sendMedia` adapters (Google Chat, Slack, iMessage, Signal, WhatsApp), preserving non-local media behavior while restoring local attachment resolution from configured roots. Synthesis of #33581, #33545, #33540, #33536, #33528. Thanks @bmendonca3. - Gateway/security default response headers: add `Permissions-Policy: camera=(), microphone=(), geolocation=()` to baseline gateway HTTP security headers for all responses. (#30186) thanks @habakan. - Plugins/startup loading: lazily initialize plugin runtime, split startup-critical plugin SDK imports into `openclaw/plugin-sdk/core` and `openclaw/plugin-sdk/telegram`, and preserve `api.runtime` reflection semantics for plugin compatibility. (#28620) thanks @hmemcpy. diff --git a/extensions/acpx/src/runtime-internals/process.test.ts b/extensions/acpx/src/runtime-internals/process.test.ts index 85a72a13398..0eee162eddf 100644 --- a/extensions/acpx/src/runtime-internals/process.test.ts +++ b/extensions/acpx/src/runtime-internals/process.test.ts @@ -1,9 +1,15 @@ +import { spawn } from "node:child_process"; import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; import { createWindowsCmdShimFixture } from "../../../shared/windows-cmd-shim-test-fixtures.js"; -import { resolveSpawnCommand, type SpawnCommandCache } from "./process.js"; +import { + resolveSpawnCommand, + spawnAndCollect, + type SpawnCommandCache, + waitForExit, +} from "./process.js"; const tempDirs: string[] = []; @@ -225,3 +231,62 @@ describe("resolveSpawnCommand", () => { expect(second.args[0]).toBe(scriptPath); }); }); + +describe("waitForExit", () => { + it("resolves when the child already exited before waiting starts", async () => { + const child = spawn(process.execPath, ["-e", "process.exit(0)"], { + stdio: ["pipe", "pipe", "pipe"], + }); + + await new Promise((resolve, reject) => { + child.once("close", () => { + resolve(); + }); + child.once("error", reject); + }); + + const exit = await waitForExit(child); + expect(exit.code).toBe(0); + expect(exit.signal).toBeNull(); + expect(exit.error).toBeNull(); + }); +}); + +describe("spawnAndCollect", () => { + it("returns abort error immediately when signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + const result = await spawnAndCollect( + { + command: process.execPath, + args: ["-e", "process.exit(0)"], + cwd: process.cwd(), + }, + undefined, + { signal: controller.signal }, + ); + + expect(result.code).toBeNull(); + expect(result.error?.name).toBe("AbortError"); + }); + + it("terminates a running process when signal aborts", async () => { + const controller = new AbortController(); + const resultPromise = spawnAndCollect( + { + command: process.execPath, + args: ["-e", "setTimeout(() => process.stdout.write('done'), 10_000)"], + cwd: process.cwd(), + }, + undefined, + { signal: controller.signal }, + ); + + setTimeout(() => { + controller.abort(); + }, 10); + + const result = await resultPromise; + expect(result.error?.name).toBe("AbortError"); + }); +}); diff --git a/extensions/acpx/src/runtime-internals/process.ts b/extensions/acpx/src/runtime-internals/process.ts index 953f088586e..4df84aece2f 100644 --- a/extensions/acpx/src/runtime-internals/process.ts +++ b/extensions/acpx/src/runtime-internals/process.ts @@ -114,6 +114,12 @@ export function resolveSpawnCommand( }; } +function createAbortError(): Error { + const error = new Error("Operation aborted."); + error.name = "AbortError"; + return error; +} + export function spawnWithResolvedCommand( params: { command: string; @@ -140,6 +146,15 @@ export function spawnWithResolvedCommand( } export async function waitForExit(child: ChildProcessWithoutNullStreams): Promise { + // Handle callers that start waiting after the child has already exited. + if (child.exitCode !== null || child.signalCode !== null) { + return { + code: child.exitCode, + signal: child.signalCode, + error: null, + }; + } + return await new Promise((resolve) => { let settled = false; const finish = (result: SpawnExit) => { @@ -167,12 +182,23 @@ export async function spawnAndCollect( cwd: string; }, options?: SpawnCommandOptions, + runtime?: { + signal?: AbortSignal; + }, ): Promise<{ stdout: string; stderr: string; code: number | null; error: Error | null; }> { + if (runtime?.signal?.aborted) { + return { + stdout: "", + stderr: "", + code: null, + error: createAbortError(), + }; + } const child = spawnWithResolvedCommand(params, options); child.stdin.end(); @@ -185,13 +211,43 @@ export async function spawnAndCollect( stderr += String(chunk); }); - const exit = await waitForExit(child); - return { - stdout, - stderr, - code: exit.code, - error: exit.error, + let abortKillTimer: NodeJS.Timeout | undefined; + let aborted = false; + const onAbort = () => { + aborted = true; + try { + child.kill("SIGTERM"); + } catch { + // Ignore kill races when child already exited. + } + abortKillTimer = setTimeout(() => { + if (child.exitCode !== null || child.signalCode !== null) { + return; + } + try { + child.kill("SIGKILL"); + } catch { + // Ignore kill races when child already exited. + } + }, 250); + abortKillTimer.unref?.(); }; + runtime?.signal?.addEventListener("abort", onAbort, { once: true }); + + try { + const exit = await waitForExit(child); + return { + stdout, + stderr, + code: exit.code, + error: aborted ? createAbortError() : exit.error, + }; + } finally { + runtime?.signal?.removeEventListener("abort", onAbort); + if (abortKillTimer) { + clearTimeout(abortKillTimer); + } + } } export function resolveSpawnFailure( diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index fc66b394b3c..8a7783a704c 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -353,7 +353,10 @@ export class AcpxRuntime implements AcpRuntime { return ACPX_CAPABILITIES; } - async getStatus(input: { handle: AcpRuntimeHandle }): Promise { + async getStatus(input: { + handle: AcpRuntimeHandle; + signal?: AbortSignal; + }): Promise { const state = this.resolveHandleState(input.handle); const events = await this.runControlCommand({ args: this.buildControlArgs({ @@ -363,6 +366,7 @@ export class AcpxRuntime implements AcpRuntime { cwd: state.cwd, fallbackCode: "ACP_TURN_FAILED", ignoreNoSession: true, + signal: input.signal, }); const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0]; if (!detail) { @@ -586,6 +590,7 @@ export class AcpxRuntime implements AcpRuntime { cwd: string; fallbackCode: AcpRuntimeErrorCode; ignoreNoSession?: boolean; + signal?: AbortSignal; }): Promise { const result = await spawnAndCollect( { @@ -594,6 +599,9 @@ export class AcpxRuntime implements AcpRuntime { cwd: params.cwd, }, this.spawnCommandOptions, + { + signal: params.signal, + }, ); if (result.error) { diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index 99ec096bb7f..4d45a7693a9 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -316,70 +316,85 @@ export class AcpSessionManager { async getSessionStatus(params: { cfg: OpenClawConfig; sessionKey: string; + signal?: AbortSignal; }): Promise { const sessionKey = normalizeSessionKey(params.sessionKey); if (!sessionKey) { throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required."); } + this.throwIfAborted(params.signal); await this.evictIdleRuntimeHandles({ cfg: params.cfg }); - return await this.withSessionActor(sessionKey, async () => { - const resolution = this.resolveSession({ - cfg: params.cfg, - sessionKey, - }); - if (resolution.kind === "none") { - throw new AcpRuntimeError( - "ACP_SESSION_INIT_FAILED", - `Session is not ACP-enabled: ${sessionKey}`, - ); - } - if (resolution.kind === "stale") { - throw resolution.error; - } - const { - runtime, - handle: ensuredHandle, - meta: ensuredMeta, - } = await this.ensureRuntimeHandle({ - cfg: params.cfg, - sessionKey, - meta: resolution.meta, - }); - let handle = ensuredHandle; - let meta = ensuredMeta; - const capabilities = await this.resolveRuntimeCapabilities({ runtime, handle }); - let runtimeStatus: AcpRuntimeStatus | undefined; - if (runtime.getStatus) { - runtimeStatus = await withAcpRuntimeErrorBoundary({ - run: async () => await runtime.getStatus!({ handle }), - fallbackCode: "ACP_TURN_FAILED", - fallbackMessage: "Could not read ACP runtime status.", + return await this.withSessionActor( + sessionKey, + async () => { + this.throwIfAborted(params.signal); + const resolution = this.resolveSession({ + cfg: params.cfg, + sessionKey, }); - } - ({ handle, meta, runtimeStatus } = await this.reconcileRuntimeSessionIdentifiers({ - cfg: params.cfg, - sessionKey, - runtime, - handle, - meta, - runtimeStatus, - failOnStatusError: true, - })); - const identity = resolveSessionIdentityFromMeta(meta); - return { - sessionKey, - backend: handle.backend || meta.backend, - agent: meta.agent, - ...(identity ? { identity } : {}), - state: meta.state, - mode: meta.mode, - runtimeOptions: resolveRuntimeOptionsFromMeta(meta), - capabilities, - runtimeStatus, - lastActivityAt: meta.lastActivityAt, - lastError: meta.lastError, - }; - }); + if (resolution.kind === "none") { + throw new AcpRuntimeError( + "ACP_SESSION_INIT_FAILED", + `Session is not ACP-enabled: ${sessionKey}`, + ); + } + if (resolution.kind === "stale") { + throw resolution.error; + } + const { + runtime, + handle: ensuredHandle, + meta: ensuredMeta, + } = await this.ensureRuntimeHandle({ + cfg: params.cfg, + sessionKey, + meta: resolution.meta, + }); + let handle = ensuredHandle; + let meta = ensuredMeta; + const capabilities = await this.resolveRuntimeCapabilities({ runtime, handle }); + let runtimeStatus: AcpRuntimeStatus | undefined; + if (runtime.getStatus) { + runtimeStatus = await withAcpRuntimeErrorBoundary({ + run: async () => { + this.throwIfAborted(params.signal); + const status = await runtime.getStatus!({ + handle, + ...(params.signal ? { signal: params.signal } : {}), + }); + this.throwIfAborted(params.signal); + return status; + }, + fallbackCode: "ACP_TURN_FAILED", + fallbackMessage: "Could not read ACP runtime status.", + }); + } + ({ handle, meta, runtimeStatus } = await this.reconcileRuntimeSessionIdentifiers({ + cfg: params.cfg, + sessionKey, + runtime, + handle, + meta, + runtimeStatus, + failOnStatusError: true, + })); + const identity = resolveSessionIdentityFromMeta(meta); + return { + sessionKey, + backend: handle.backend || meta.backend, + agent: meta.agent, + ...(identity ? { identity } : {}), + state: meta.state, + mode: meta.mode, + runtimeOptions: resolveRuntimeOptionsFromMeta(meta), + capabilities, + runtimeStatus, + lastActivityAt: meta.lastActivityAt, + lastError: meta.lastError, + }; + }, + params.signal, + ); } async setSessionRuntimeMode(params: { @@ -1295,9 +1310,23 @@ export class AcpSessionManager { } } - private async withSessionActor(sessionKey: string, op: () => Promise): Promise { + private async withSessionActor( + sessionKey: string, + op: () => Promise, + signal?: AbortSignal, + ): Promise { const actorKey = normalizeActorKey(sessionKey); - return await this.actorQueue.run(actorKey, op); + return await this.actorQueue.run(actorKey, async () => { + this.throwIfAborted(signal); + return await op(); + }); + } + + private throwIfAborted(signal?: AbortSignal): void { + if (!signal?.aborted) { + return; + } + throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP operation aborted."); } private getCachedRuntimeState(sessionKey: string): CachedRuntimeState | null { diff --git a/src/acp/runtime/types.ts b/src/acp/runtime/types.ts index ff4f39a70ee..6a3d3bb3f8e 100644 --- a/src/acp/runtime/types.ts +++ b/src/acp/runtime/types.ts @@ -117,7 +117,7 @@ export interface AcpRuntime { handle?: AcpRuntimeHandle; }): Promise | AcpRuntimeCapabilities; - getStatus?(input: { handle: AcpRuntimeHandle }): Promise; + getStatus?(input: { handle: AcpRuntimeHandle; signal?: AbortSignal }): Promise; setMode?(input: { handle: AcpRuntimeHandle; mode: string }): Promise; diff --git a/src/daemon/systemd-unit.test.ts b/src/daemon/systemd-unit.test.ts index bd65e34bba4..5c5562b25e6 100644 --- a/src/daemon/systemd-unit.test.ts +++ b/src/daemon/systemd-unit.test.ts @@ -12,6 +12,15 @@ describe("buildSystemdUnit", () => { expect(execStart).toBe('ExecStart=/usr/bin/openclaw gateway --name "My Bot"'); }); + it("renders control-group kill mode for child-process cleanup", () => { + const unit = buildSystemdUnit({ + description: "OpenClaw Gateway", + programArguments: ["/usr/bin/openclaw", "gateway", "run"], + environment: {}, + }); + expect(unit).toContain("KillMode=control-group"); + }); + it("rejects environment values with line breaks", () => { expect(() => buildSystemdUnit({ diff --git a/src/daemon/systemd-unit.ts b/src/daemon/systemd-unit.ts index 000f4b64a92..9cddbee24d1 100644 --- a/src/daemon/systemd-unit.ts +++ b/src/daemon/systemd-unit.ts @@ -59,10 +59,9 @@ export function buildSystemdUnit({ `ExecStart=${execStart}`, "Restart=always", "RestartSec=5", - // KillMode=process ensures systemd only waits for the main process to exit. - // Without this, podman's conmon (container monitor) processes block shutdown - // since they run as children of the gateway and stay in the same cgroup. - "KillMode=process", + // Keep service children in the same lifecycle so restarts do not leave + // orphan ACP/runtime workers behind. + "KillMode=control-group", workingDirLine, ...envLines, "", diff --git a/src/discord/monitor.test.ts b/src/discord/monitor.test.ts index 6f555ede67d..50bb52af18d 100644 --- a/src/discord/monitor.test.ts +++ b/src/discord/monitor.test.ts @@ -197,9 +197,9 @@ describe("DiscordMessageListener", () => { // Release the background handler and allow slow-log finalizer to run. deferred.resolve(); - await Promise.resolve(); - - expect(logger.warn).toHaveBeenCalled(); + await vi.waitFor(() => { + expect(logger.warn).toHaveBeenCalled(); + }); const warnMock = logger.warn as unknown as { mock: { calls: unknown[][] } }; const [, meta] = warnMock.mock.calls[0] ?? []; const durationMs = (meta as { durationMs?: number } | undefined)?.durationMs; diff --git a/src/discord/monitor/listeners.test.ts b/src/discord/monitor/listeners.test.ts index 6264ab218db..d1342b3ddb2 100644 --- a/src/discord/monitor/listeners.test.ts +++ b/src/discord/monitor/listeners.test.ts @@ -121,4 +121,110 @@ describe("DiscordMessageListener", () => { ); }); }); + + it("continues same-channel processing after handler timeout", async () => { + vi.useFakeTimers(); + try { + const never = new Promise(() => {}); + const handler = vi.fn(async () => { + if (handler.mock.calls.length === 1) { + await never; + return; + } + }); + const logger = createLogger(); + const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { + timeoutMs: 50, + }); + + await listener.handle(fakeEvent("ch-1"), {} as never); + await listener.handle(fakeEvent("ch-1"), {} as never); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(60); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); + } finally { + vi.useRealTimers(); + } + }); + + it("aborts timed-out handlers and prevents late side effects", async () => { + vi.useFakeTimers(); + try { + let abortReceived = false; + let lateSideEffect = false; + const handler = vi.fn( + async ( + _data: unknown, + _client: unknown, + options?: { + abortSignal?: AbortSignal; + }, + ) => { + await new Promise((resolve) => { + if (options?.abortSignal?.aborted) { + abortReceived = true; + resolve(); + return; + } + options?.abortSignal?.addEventListener( + "abort", + () => { + abortReceived = true; + resolve(); + }, + { once: true }, + ); + }); + if (options?.abortSignal?.aborted) { + return; + } + lateSideEffect = true; + }, + ); + const logger = createLogger(); + const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { + timeoutMs: 50, + }); + + await listener.handle(fakeEvent("ch-1"), {} as never); + await listener.handle(fakeEvent("ch-1"), {} as never); + + await vi.advanceTimersByTimeAsync(60); + await vi.waitFor(() => { + expect(handler).toHaveBeenCalledTimes(2); + }); + expect(abortReceived).toBe(true); + expect(lateSideEffect).toBe(false); + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); + } finally { + vi.useRealTimers(); + } + }); + + it("does not emit slow-listener warnings when timeout already fired", async () => { + vi.useFakeTimers(); + try { + const never = new Promise(() => {}); + const handler = vi.fn(async () => { + await never; + }); + const logger = createLogger(); + const listener = new DiscordMessageListener(handler as never, logger as never, undefined, { + timeoutMs: 31_000, + }); + + await listener.handle(fakeEvent("ch-1"), {} as never); + await vi.advanceTimersByTimeAsync(31_100); + await vi.waitFor(() => { + expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("timed out after")); + }); + expect(logger.warn).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 71d7cfbddf9..5297460e228 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -41,7 +41,11 @@ type Logger = ReturnType[0]; -export type DiscordMessageHandler = (data: DiscordMessageEvent, client: Client) => Promise; +export type DiscordMessageHandler = ( + data: DiscordMessageEvent, + client: Client, + options?: { abortSignal?: AbortSignal }, +) => Promise; type DiscordReactionEvent = Parameters[0]; @@ -66,13 +70,50 @@ type DiscordReactionRoutingParams = { }; const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000; +const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000; const discordEventQueueLog = createSubsystemLogger("discord/event-queue"); +function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number { + if (!Number.isFinite(raw) || (raw ?? 0) <= 0) { + return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS; + } + return Math.max(1_000, Math.floor(raw!)); +} + +function formatListenerContextValue(value: unknown): string | null { + if (value === undefined || value === null) { + return null; + } + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; + } + if (typeof value === "number" || typeof value === "boolean" || typeof value === "bigint") { + return String(value); + } + return null; +} + +function formatListenerContextSuffix(context?: Record): string { + if (!context) { + return ""; + } + const entries = Object.entries(context).flatMap(([key, value]) => { + const formatted = formatListenerContextValue(value); + return formatted ? [`${key}=${formatted}`] : []; + }); + if (entries.length === 0) { + return ""; + } + return ` (${entries.join(" ")})`; +} + function logSlowDiscordListener(params: { logger: Logger | undefined; listener: string; event: string; durationMs: number; + context?: Record; }) { if (params.durationMs < DISCORD_SLOW_LISTENER_THRESHOLD_MS) { return; @@ -88,7 +129,8 @@ function logSlowDiscordListener(params: { event: params.event, durationMs: params.durationMs, duration, - consoleMessage: message, + ...params.context, + consoleMessage: `${message}${formatListenerContextSuffix(params.context)}`, }); } @@ -96,12 +138,59 @@ async function runDiscordListenerWithSlowLog(params: { logger: Logger | undefined; listener: string; event: string; - run: () => Promise; + run: (abortSignal: AbortSignal) => Promise; + timeoutMs?: number; + context?: Record; onError?: (err: unknown) => void; }) { const startedAt = Date.now(); + const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs); + let timedOut = false; + let timeoutHandle: ReturnType | null = null; + const logger = params.logger ?? discordEventQueueLog; + const abortController = new AbortController(); + const runPromise = params.run(abortController.signal).catch((err) => { + if (timedOut) { + const errorName = + err && typeof err === "object" && "name" in err ? String(err.name) : undefined; + if (abortController.signal.aborted && errorName === "AbortError") { + logger.warn( + `discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`, + ); + return; + } + logger.error( + danger( + `discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`, + ), + ); + return; + } + throw err; + }); + try { - await params.run(); + const timeoutPromise = new Promise<"timeout">((resolve) => { + timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs); + timeoutHandle.unref?.(); + }); + const result = await Promise.race([ + runPromise.then(() => "completed" as const), + timeoutPromise, + ]); + if (result === "timeout") { + timedOut = true; + abortController.abort(); + logger.error( + danger( + `discord handler timed out after ${formatDurationSeconds(timeoutMs, { + decimals: 1, + unit: "seconds", + })}${formatListenerContextSuffix(params.context)}`, + ), + ); + return; + } } catch (err) { if (params.onError) { params.onError(err); @@ -109,12 +198,18 @@ async function runDiscordListenerWithSlowLog(params: { } throw err; } finally { - logSlowDiscordListener({ - logger: params.logger, - listener: params.listener, - event: params.event, - durationMs: Date.now() - startedAt, - }); + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + if (!timedOut) { + logSlowDiscordListener({ + logger: params.logger, + listener: params.listener, + event: params.event, + durationMs: Date.now() - startedAt, + context: params.context, + }); + } } } @@ -128,18 +223,26 @@ export function registerDiscordListener(listeners: Array, listener: obje export class DiscordMessageListener extends MessageCreateListener { private readonly channelQueue = new KeyedAsyncQueue(); + private readonly listenerTimeoutMs: number; constructor( private handler: DiscordMessageHandler, private logger?: Logger, private onEvent?: () => void, + options?: { timeoutMs?: number }, ) { super(); + this.listenerTimeoutMs = normalizeDiscordListenerTimeoutMs(options?.timeoutMs); } async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); const channelId = data.channel_id; + const context = { + channelId, + messageId: (data as { message?: { id?: string } }).message?.id, + guildId: (data as { guild_id?: string }).guild_id, + } satisfies Record; // Serialize messages within the same channel to preserve ordering, // but allow different channels to proceed in parallel so that // channel-bound agents are not blocked by each other. @@ -148,7 +251,9 @@ export class DiscordMessageListener extends MessageCreateListener { logger: this.logger, listener: this.constructor.name, event: this.type, - run: () => this.handler(data, client), + timeoutMs: this.listenerTimeoutMs, + context, + run: (abortSignal) => this.handler(data, client, { abortSignal }), onError: (err) => { const logger = this.logger ?? discordEventQueueLog; logger.error(danger(`discord handler failed: ${String(err)}`)); @@ -206,7 +311,7 @@ async function runDiscordReactionHandler(params: { logger: params.handlerParams.logger, listener: params.listener, event: params.event, - run: () => + run: async () => handleDiscordReactionEvent({ data: params.data, client: params.client, diff --git a/src/discord/monitor/message-handler.preflight.ts b/src/discord/monitor/message-handler.preflight.ts index 7339caf0604..2aea357d236 100644 --- a/src/discord/monitor/message-handler.preflight.ts +++ b/src/discord/monitor/message-handler.preflight.ts @@ -68,6 +68,10 @@ export type { const DISCORD_BOUND_THREAD_SYSTEM_PREFIXES = ["⚙️", "🤖", "🧰"]; +function isPreflightAborted(abortSignal?: AbortSignal): boolean { + return Boolean(abortSignal?.aborted); +} + function isBoundThreadBotSystemMessage(params: { isBoundThreadSession: boolean; isBotAuthor: boolean; @@ -124,6 +128,9 @@ export function shouldIgnoreBoundThreadWebhookMessage(params: { export async function preflightDiscordMessage( params: DiscordMessagePreflightParams, ): Promise { + if (isPreflightAborted(params.abortSignal)) { + return null; + } const logger = getChildLogger({ module: "discord-auto-reply" }); const message = params.data.message; const author = params.data.author; @@ -157,6 +164,9 @@ export async function preflightDiscordMessage( messageId: message.id, config: pluralkitConfig, }); + if (isPreflightAborted(params.abortSignal)) { + return null; + } } catch (err) { logVerbose(`discord: pluralkit lookup failed for ${message.id}: ${String(err)}`); } @@ -176,6 +186,9 @@ export async function preflightDiscordMessage( const isGuildMessage = Boolean(params.data.guild_id); const channelInfo = await resolveDiscordChannelInfo(params.client, messageChannelId); + if (isPreflightAborted(params.abortSignal)) { + return null; + } const isDirectMessage = channelInfo?.type === ChannelType.DM; const isGroupDm = channelInfo?.type === ChannelType.GroupDM; logDebug( @@ -213,6 +226,9 @@ export async function preflightDiscordMessage( allowNameMatching, useAccessGroups, }); + if (isPreflightAborted(params.abortSignal)) { + return null; + } commandAuthorized = dmAccess.commandAuthorized; if (dmAccess.decision !== "allow") { const allowMatchMeta = formatAllowlistMatchMeta( @@ -300,6 +316,9 @@ export async function preflightDiscordMessage( threadChannel: earlyThreadChannel, channelInfo, }); + if (isPreflightAborted(params.abortSignal)) { + return null; + } earlyThreadParentId = parentInfo.id; earlyThreadParentName = parentInfo.name; earlyThreadParentType = parentInfo.type; @@ -548,7 +567,11 @@ export async function preflightDiscordMessage( shouldRequireMention, mentionRegexes, cfg: params.cfg, + abortSignal: params.abortSignal, }); + if (isPreflightAborted(params.abortSignal)) { + return null; + } const mentionText = hasTypedText ? baseText : ""; const wasMentioned = @@ -727,6 +750,7 @@ export async function preflightDiscordMessage( token: params.token, runtime: params.runtime, botUserId: params.botUserId, + abortSignal: params.abortSignal, guildHistories: params.guildHistories, historyLimit: params.historyLimit, mediaMaxBytes: params.mediaMaxBytes, diff --git a/src/discord/monitor/message-handler.preflight.types.ts b/src/discord/monitor/message-handler.preflight.types.ts index 0cca0cb4085..a2b3c210a1c 100644 --- a/src/discord/monitor/message-handler.preflight.types.ts +++ b/src/discord/monitor/message-handler.preflight.types.ts @@ -25,6 +25,7 @@ export type DiscordMessagePreflightContext = { token: string; runtime: RuntimeEnv; botUserId?: string; + abortSignal?: AbortSignal; guildHistories: Map; historyLimit: number; mediaMaxBytes: number; @@ -95,6 +96,7 @@ export type DiscordMessagePreflightParams = { token: string; runtime: RuntimeEnv; botUserId?: string; + abortSignal?: AbortSignal; guildHistories: Map; historyLimit: number; mediaMaxBytes: number; diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 748ee921c72..9bc9cf77498 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -345,6 +345,32 @@ describe("processDiscordMessage ack reactions", () => { expect(emojis).toContain("🟦"); expect(emojis).toContain("🏁"); }); + + it("clears status reactions when dispatch aborts and removeAckAfterReply is enabled", async () => { + const abortController = new AbortController(); + dispatchInboundMessage.mockImplementationOnce(async () => { + abortController.abort(); + throw new Error("aborted"); + }); + + const ctx = await createBaseContext({ + abortSignal: abortController.signal, + cfg: { + messages: { + ackReaction: "👀", + removeAckAfterReply: true, + }, + session: { store: "/tmp/openclaw-discord-process-test-sessions.json" }, + }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + await vi.waitFor(() => { + expect(sendMocks.removeReactionDiscord).toHaveBeenCalledWith("c1", "m1", "👀", { rest: {} }); + }); + }); }); describe("processDiscordMessage session routing", () => { diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index cf942046ce1..3b7082dc218 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -60,6 +60,10 @@ function sleep(ms: number): Promise { const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000; +function isProcessAborted(abortSignal?: AbortSignal): boolean { + return Boolean(abortSignal?.aborted); +} + export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { const { cfg, @@ -105,16 +109,26 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) route, commandAuthorized, discordRestFetch, + abortSignal, } = ctx; + if (isProcessAborted(abortSignal)) { + return; + } const ssrfPolicy = cfg.browser?.ssrfPolicy; const mediaList = await resolveMediaList(message, mediaMaxBytes, discordRestFetch, ssrfPolicy); + if (isProcessAborted(abortSignal)) { + return; + } const forwardedMediaList = await resolveForwardedMediaList( message, mediaMaxBytes, discordRestFetch, ssrfPolicy, ); + if (isProcessAborted(abortSignal)) { + return; + } mediaList.push(...forwardedMediaList); const text = messageText; if (!text) { @@ -585,6 +599,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) humanDelay: resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, deliver: async (payload: ReplyPayload, info) => { + if (isProcessAborted(abortSignal)) { + return; + } const isFinal = info.kind === "final"; if (payload.isReasoning) { // Reasoning/thinking payloads should not be delivered to Discord. @@ -607,6 +624,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) if (canFinalizeViaPreviewEdit) { await draftStream.stop(); + if (isProcessAborted(abortSignal)) { + return; + } try { await editMessageDiscord( deliverChannelId, @@ -627,6 +647,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) // Check if stop() flushed a message we can edit if (!finalizedViaPreviewMessage) { await draftStream.stop(); + if (isProcessAborted(abortSignal)) { + return; + } const messageIdAfterStop = draftStream.messageId(); if ( typeof messageIdAfterStop === "string" && @@ -657,6 +680,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) await draftStream.clear(); } } + if (isProcessAborted(abortSignal)) { + return; + } const replyToId = replyReference.use(); await deliverDiscordReply({ @@ -682,6 +708,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: async () => { + if (isProcessAborted(abortSignal)) { + return; + } await typingCallbacks.onReplyStart(); await statusReactions.setThinking(); }, @@ -689,13 +718,19 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) let dispatchResult: Awaited> | null = null; let dispatchError = false; + let dispatchAborted = false; try { + if (isProcessAborted(abortSignal)) { + dispatchAborted = true; + return; + } dispatchResult = await dispatchInboundMessage({ ctx: ctxPayload, cfg, dispatcher, replyOptions: { ...replyOptions, + abortSignal, skillFilter: channelConfig?.skills, disableBlockStreaming: disableBlockStreamingForDraft ?? @@ -730,11 +765,22 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) await statusReactions.setThinking(); }, onToolStart: async (payload) => { + if (isProcessAborted(abortSignal)) { + return; + } await statusReactions.setTool(payload.name); }, }, }); + if (isProcessAborted(abortSignal)) { + dispatchAborted = true; + return; + } } catch (err) { + if (isProcessAborted(abortSignal)) { + dispatchAborted = true; + return; + } dispatchError = true; throw err; } finally { @@ -752,21 +798,32 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) markDispatchIdle(); } if (statusReactionsEnabled) { - if (dispatchError) { - await statusReactions.setError(); + if (dispatchAborted) { + if (removeAckAfterReply) { + void statusReactions.clear(); + } else { + void statusReactions.restoreInitial(); + } } else { - await statusReactions.setDone(); - } - if (removeAckAfterReply) { - void (async () => { - await sleep(dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs); - await statusReactions.clear(); - })(); - } else { - void statusReactions.restoreInitial(); + if (dispatchError) { + await statusReactions.setError(); + } else { + await statusReactions.setDone(); + } + if (removeAckAfterReply) { + void (async () => { + await sleep(dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs); + await statusReactions.clear(); + })(); + } else { + void statusReactions.restoreInitial(); + } } } } + if (dispatchAborted) { + return; + } if (!dispatchResult?.queuedFinal) { if (isGuildMessage) { diff --git a/src/discord/monitor/message-handler.queue.test.ts b/src/discord/monitor/message-handler.queue.test.ts index 1424b29d46d..9ab7914adcc 100644 --- a/src/discord/monitor/message-handler.queue.test.ts +++ b/src/discord/monitor/message-handler.queue.test.ts @@ -26,6 +26,7 @@ function createDeferred() { function createHandlerParams(overrides?: { setStatus?: (patch: Record) => void; abortSignal?: AbortSignal; + listenerTimeoutMs?: number; }) { const cfg: OpenClawConfig = { channels: { @@ -64,6 +65,7 @@ function createHandlerParams(overrides?: { threadBindings: createNoopThreadBindingManager("default"), setStatus: overrides?.setStatus, abortSignal: overrides?.abortSignal, + listenerTimeoutMs: overrides?.listenerTimeoutMs, }; } @@ -167,6 +169,55 @@ describe("createDiscordMessageHandler queue behavior", () => { }); }); + it("applies listener timeout to queued runs so stalled runs do not block the queue", async () => { + vi.useFakeTimers(); + try { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + + processDiscordMessageMock + .mockImplementationOnce(async (ctx: { abortSignal?: AbortSignal }) => { + await new Promise((resolve) => { + if (ctx.abortSignal?.aborted) { + resolve(); + return; + } + ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); + }); + }) + .mockImplementationOnce(async () => undefined); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const params = createHandlerParams({ listenerTimeoutMs: 50 }); + const handler = createDiscordMessageHandler(params); + + await expect( + handler(createMessageData("m-1") as never, {} as never), + ).resolves.toBeUndefined(); + await expect( + handler(createMessageData("m-2") as never, {} as never), + ).resolves.toBeUndefined(); + + await vi.advanceTimersByTimeAsync(60); + await vi.waitFor(() => { + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + }); + + const firstCtx = processDiscordMessageMock.mock.calls[0]?.[0] as + | { abortSignal?: AbortSignal } + | undefined; + expect(firstCtx?.abortSignal?.aborted).toBe(true); + expect(params.runtime.error).toHaveBeenCalledWith( + expect.stringContaining("discord queued run timed out after"), + ); + } finally { + vi.useRealTimers(); + } + }); + it("refreshes run activity while active runs are in progress", async () => { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index a069a5a52ec..2d8a245c328 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -6,6 +6,7 @@ import { import { createRunStateMachine } from "../../channels/run-state-machine.js"; import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js"; import { danger } from "../../globals.js"; +import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts"; import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { preflightDiscordMessage } from "./message-handler.preflight.js"; @@ -27,12 +28,142 @@ type DiscordMessageHandlerParams = Omit< > & { setStatus?: DiscordMonitorStatusSink; abortSignal?: AbortSignal; + listenerTimeoutMs?: number; }; export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { deactivate: () => void; }; +const DEFAULT_DISCORD_RUN_TIMEOUT_MS = 120_000; +const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647; + +function normalizeDiscordRunTimeoutMs(timeoutMs?: number): number { + if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return DEFAULT_DISCORD_RUN_TIMEOUT_MS; + } + return Math.max(1, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS)); +} + +function isAbortError(error: unknown): boolean { + if (typeof error !== "object" || error === null) { + return false; + } + return "name" in error && String((error as { name?: unknown }).name) === "AbortError"; +} + +function formatDiscordRunContextSuffix(ctx: DiscordMessagePreflightContext): string { + const eventData = ctx as { + data?: { + channel_id?: string; + message?: { + id?: string; + }; + }; + }; + const channelId = ctx.messageChannelId?.trim() || eventData.data?.channel_id?.trim(); + const messageId = eventData.data?.message?.id?.trim(); + const details = [ + channelId ? `channelId=${channelId}` : null, + messageId ? `messageId=${messageId}` : null, + ].filter((entry): entry is string => Boolean(entry)); + if (details.length === 0) { + return ""; + } + return ` (${details.join(", ")})`; +} + +function mergeAbortSignals(signals: Array): AbortSignal | undefined { + const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal)); + if (activeSignals.length === 0) { + return undefined; + } + if (activeSignals.length === 1) { + return activeSignals[0]; + } + if (typeof AbortSignal.any === "function") { + return AbortSignal.any(activeSignals); + } + const fallbackController = new AbortController(); + for (const signal of activeSignals) { + if (signal.aborted) { + fallbackController.abort(); + return fallbackController.signal; + } + } + const abortFallback = () => { + fallbackController.abort(); + for (const signal of activeSignals) { + signal.removeEventListener("abort", abortFallback); + } + }; + for (const signal of activeSignals) { + signal.addEventListener("abort", abortFallback, { once: true }); + } + return fallbackController.signal; +} + +async function processDiscordRunWithTimeout(params: { + ctx: DiscordMessagePreflightContext; + runtime: DiscordMessagePreflightParams["runtime"]; + lifecycleSignal?: AbortSignal; + timeoutMs?: number; +}) { + const timeoutMs = normalizeDiscordRunTimeoutMs(params.timeoutMs); + const timeoutAbortController = new AbortController(); + const combinedSignal = mergeAbortSignals([ + params.ctx.abortSignal, + params.lifecycleSignal, + timeoutAbortController.signal, + ]); + const processCtx = + combinedSignal && combinedSignal !== params.ctx.abortSignal + ? { ...params.ctx, abortSignal: combinedSignal } + : params.ctx; + const contextSuffix = formatDiscordRunContextSuffix(params.ctx); + let timedOut = false; + let timeoutHandle: ReturnType | null = null; + const processPromise = processDiscordMessage(processCtx).catch((error) => { + if (timedOut) { + if (timeoutAbortController.signal.aborted && isAbortError(error)) { + return; + } + params.runtime.error?.( + danger(`discord queued run failed after timeout: ${String(error)}${contextSuffix}`), + ); + return; + } + throw error; + }); + + try { + const timeoutPromise = new Promise<"timeout">((resolve) => { + timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs); + timeoutHandle.unref?.(); + }); + const result = await Promise.race([ + processPromise.then(() => "completed" as const), + timeoutPromise, + ]); + if (result === "timeout") { + timedOut = true; + timeoutAbortController.abort(); + params.runtime.error?.( + danger( + `discord queued run timed out after ${formatDurationSeconds(timeoutMs, { + decimals: 1, + unit: "seconds", + })}${contextSuffix}`, + ), + ); + } + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string { const sessionKey = ctx.route.sessionKey?.trim(); if (sessionKey) { @@ -75,7 +206,12 @@ export function createDiscordMessageHandler( if (!runState.isActive()) { return; } - await processDiscordMessage(ctx); + await processDiscordRunWithTimeout({ + ctx, + runtime: params.runtime, + lifecycleSignal: params.abortSignal, + timeoutMs: params.listenerTimeoutMs, + }); } finally { runState.onRunEnd(); } @@ -88,6 +224,7 @@ export function createDiscordMessageHandler( const { debouncer } = createChannelInboundDebouncer<{ data: DiscordMessageEvent; client: Client; + abortSignal?: AbortSignal; }>({ cfg: params.cfg, channel: "discord", @@ -126,11 +263,16 @@ export function createDiscordMessageHandler( if (!last) { return; } + const abortSignal = last.abortSignal; + if (abortSignal?.aborted) { + return; + } if (entries.length === 1) { const ctx = await preflightDiscordMessage({ ...params, ackReactionScope, groupPolicy, + abortSignal, data: last.data, client: last.client, }); @@ -162,6 +304,7 @@ export function createDiscordMessageHandler( ...params, ackReactionScope, groupPolicy, + abortSignal, data: syntheticData, client: last.client, }); @@ -188,19 +331,22 @@ export function createDiscordMessageHandler( }, }); - const handler: DiscordMessageHandlerWithLifecycle = async (data, client) => { - // Filter bot-own messages before they enter the debounce queue. - // The same check exists in preflightDiscordMessage(), but by that point - // the message has already consumed debounce capacity and blocked - // legitimate user messages. On active servers this causes cumulative - // slowdown (see #15874). - const msgAuthorId = data.message?.author?.id ?? data.author?.id; - if (params.botUserId && msgAuthorId === params.botUserId) { - return; - } - + const handler: DiscordMessageHandlerWithLifecycle = async (data, client, options) => { try { - await debouncer.enqueue({ data, client }); + if (options?.abortSignal?.aborted) { + return; + } + // Filter bot-own messages before they enter the debounce queue. + // The same check exists in preflightDiscordMessage(), but by that point + // the message has already consumed debounce capacity and blocked + // legitimate user messages. On active servers this causes cumulative + // slowdown (see #15874). + const msgAuthorId = data.message?.author?.id ?? data.author?.id; + if (params.botUserId && msgAuthorId === params.botUserId) { + return; + } + + await debouncer.enqueue({ data, client, abortSignal: options?.abortSignal }); } catch (err) { params.runtime.error?.(danger(`handler failed: ${String(err)}`)); } diff --git a/src/discord/monitor/preflight-audio.ts b/src/discord/monitor/preflight-audio.ts index 89e4ae8c3e1..307abcc6b43 100644 --- a/src/discord/monitor/preflight-audio.ts +++ b/src/discord/monitor/preflight-audio.ts @@ -24,6 +24,7 @@ export async function resolveDiscordPreflightAudioMentionContext(params: { shouldRequireMention: boolean; mentionRegexes: RegExp[]; cfg: OpenClawConfig; + abortSignal?: AbortSignal; }): Promise<{ hasAudioAttachment: boolean; hasTypedText: boolean; @@ -42,8 +43,20 @@ export async function resolveDiscordPreflightAudioMentionContext(params: { let transcript: string | undefined; if (needsPreflightTranscription) { + if (params.abortSignal?.aborted) { + return { + hasAudioAttachment, + hasTypedText, + }; + } try { const { transcribeFirstAudio } = await import("../../media-understanding/audio-preflight.js"); + if (params.abortSignal?.aborted) { + return { + hasAudioAttachment, + hasTypedText, + }; + } const audioUrls = audioAttachments .map((att) => att.url) .filter((url): url is string => typeof url === "string" && url.length > 0); @@ -58,6 +71,9 @@ export async function resolveDiscordPreflightAudioMentionContext(params: { cfg: params.cfg, agentDir: undefined, }); + if (params.abortSignal?.aborted) { + transcript = undefined; + } } } catch (err) { logVerbose(`discord: audio preflight transcription failed: ${String(err)}`); diff --git a/src/discord/monitor/provider.test.ts b/src/discord/monitor/provider.test.ts index 8481b5356f6..e3bc0ca36c1 100644 --- a/src/discord/monitor/provider.test.ts +++ b/src/discord/monitor/provider.test.ts @@ -1,5 +1,6 @@ import { EventEmitter } from "node:events"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { AcpRuntimeError } from "../../acp/runtime/errors.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { RuntimeEnv } from "../../runtime.js"; @@ -25,6 +26,7 @@ const { createThreadBindingManagerMock, reconcileAcpThreadBindingsOnStartupMock, createdBindingManagers, + getAcpSessionStatusMock, getPluginCommandSpecsMock, listNativeCommandSpecsForConfigMock, listSkillCommandsForAgentsMock, @@ -63,6 +65,11 @@ const { staleSessionKeys: [], })), createdBindingManagers, + getAcpSessionStatusMock: vi.fn( + async (_params: { cfg: OpenClawConfig; sessionKey: string; signal?: AbortSignal }) => ({ + state: "idle", + }), + ), getPluginCommandSpecsMock: vi.fn<() => PluginCommandSpecMock[]>(() => []), listNativeCommandSpecsForConfigMock: vi.fn<() => NativeCommandSpecMock[]>(() => [ { name: "cmd", description: "built-in", acceptsArgs: false }, @@ -127,6 +134,12 @@ vi.mock("../../auto-reply/chunk.js", () => ({ resolveTextChunkLimit: () => 2000, })); +vi.mock("../../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + getSessionStatus: getAcpSessionStatusMock, + }), +})); + vi.mock("../../auto-reply/commands-registry.js", () => ({ listNativeCommandSpecsForConfig: listNativeCommandSpecsForConfigMock, })); @@ -272,6 +285,21 @@ vi.mock("./thread-bindings.js", () => ({ })); describe("monitorDiscordProvider", () => { + type ReconcileHealthProbeParams = { + cfg: OpenClawConfig; + accountId: string; + sessionKey: string; + binding: unknown; + session: unknown; + }; + + type ReconcileStartupParams = { + cfg: OpenClawConfig; + healthProbe?: ( + params: ReconcileHealthProbeParams, + ) => Promise<{ status: string; reason?: string }>; + }; + const baseRuntime = (): RuntimeEnv => { return { log: vi.fn(), @@ -299,6 +327,16 @@ describe("monitorDiscordProvider", () => { return opts.eventQueue; }; + const getHealthProbe = () => { + expect(reconcileAcpThreadBindingsOnStartupMock).toHaveBeenCalledTimes(1); + const firstCall = reconcileAcpThreadBindingsOnStartupMock.mock.calls.at(0) as + | [ReconcileStartupParams] + | undefined; + const reconcileParams = firstCall?.[0]; + expect(typeof reconcileParams?.healthProbe).toBe("function"); + return reconcileParams?.healthProbe as NonNullable; + }; + beforeEach(() => { clientConstructorOptionsMock.mockClear(); createDiscordAutoPresenceControllerMock.mockClear().mockImplementation(() => ({ @@ -318,6 +356,7 @@ describe("monitorDiscordProvider", () => { removed: 0, staleSessionKeys: [], }); + getAcpSessionStatusMock.mockClear().mockResolvedValue({ state: "idle" }); createdBindingManagers.length = 0; getPluginCommandSpecsMock.mockClear().mockReturnValue([]); listNativeCommandSpecsForConfigMock @@ -368,6 +407,167 @@ describe("monitorDiscordProvider", () => { expect(reconcileAcpThreadBindingsOnStartupMock).toHaveBeenCalledTimes(1); }); + it("treats ACP error status as uncertain during startup thread-binding probes", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + getAcpSessionStatusMock.mockResolvedValue({ state: "error" }); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + const probeResult = await getHealthProbe()({ + cfg: baseConfig(), + accountId: "default", + sessionKey: "agent:codex:acp:error", + binding: {} as never, + session: { + acp: { + state: "error", + lastActivityAt: Date.now(), + }, + } as never, + }); + + expect(probeResult).toEqual({ + status: "uncertain", + reason: "status-error-state", + }); + }); + + it("classifies typed ACP session init failures as stale", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + getAcpSessionStatusMock.mockRejectedValue( + new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "missing ACP metadata"), + ); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + const probeResult = await getHealthProbe()({ + cfg: baseConfig(), + accountId: "default", + sessionKey: "agent:codex:acp:stale", + binding: {} as never, + session: { + acp: { + state: "idle", + lastActivityAt: Date.now(), + }, + } as never, + }); + + expect(probeResult).toEqual({ + status: "stale", + reason: "session-init-failed", + }); + }); + + it("classifies typed non-init ACP errors as uncertain when not stale-running", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + getAcpSessionStatusMock.mockRejectedValue( + new AcpRuntimeError("ACP_BACKEND_UNAVAILABLE", "runtime unavailable"), + ); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + const probeResult = await getHealthProbe()({ + cfg: baseConfig(), + accountId: "default", + sessionKey: "agent:codex:acp:uncertain", + binding: {} as never, + session: { + acp: { + state: "idle", + lastActivityAt: Date.now(), + }, + } as never, + }); + + expect(probeResult).toEqual({ + status: "uncertain", + reason: "status-error", + }); + }); + + it("aborts timed-out ACP status probes during startup thread-binding health checks", async () => { + vi.useFakeTimers(); + try { + const { monitorDiscordProvider } = await import("./provider.js"); + getAcpSessionStatusMock.mockImplementation( + ({ signal }: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + signal?.addEventListener("abort", () => reject(new Error("aborted")), { once: true }); + }), + ); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + const probePromise = getHealthProbe()({ + cfg: baseConfig(), + accountId: "default", + sessionKey: "agent:codex:acp:timeout", + binding: {} as never, + session: { + acp: { + state: "idle", + lastActivityAt: Date.now(), + }, + } as never, + }); + + await vi.advanceTimersByTimeAsync(8_100); + await expect(probePromise).resolves.toEqual({ + status: "uncertain", + reason: "status-timeout", + }); + + const firstCall = getAcpSessionStatusMock.mock.calls[0]?.[0] as + | { signal?: AbortSignal } + | undefined; + expect(firstCall?.signal).toBeDefined(); + expect(firstCall?.signal?.aborted).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + + it("falls back to legacy missing-session message classification", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + getAcpSessionStatusMock.mockRejectedValue(new Error("ACP session metadata missing")); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + const probeResult = await getHealthProbe()({ + cfg: baseConfig(), + accountId: "default", + sessionKey: "agent:codex:acp:legacy", + binding: {} as never, + session: { + acp: { + state: "idle", + lastActivityAt: Date.now(), + }, + } as never, + }); + + expect(probeResult).toEqual({ + status: "stale", + reason: "session-missing", + }); + }); + it("captures gateway errors emitted before lifecycle wait starts", async () => { const { monitorDiscordProvider } = await import("./provider.js"); const emitter = new EventEmitter(); diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index d69cc6d163e..a4f5b13f4e5 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -10,6 +10,8 @@ import { import { GatewayCloseCodes, type GatewayPlugin } from "@buape/carbon/gateway"; import { VoicePlugin } from "@buape/carbon/voice"; import { Routes } from "discord-api-types/v10"; +import { getAcpSessionManager } from "../../acp/control-plane/manager.js"; +import { isAcpRuntimeError } from "../../acp/runtime/errors.js"; import { resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import type { NativeCommandSpec } from "../../auto-reply/commands-registry.js"; import { listNativeCommandSpecsForConfig } from "../../auto-reply/commands-registry.js"; @@ -175,6 +177,92 @@ function appendPluginCommandSpecs(params: { return merged; } +const DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS = 8_000; +const DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS = 2 * 60 * 1000; + +function isLegacyMissingSessionError(message: string): boolean { + return ( + message.includes("Session is not ACP-enabled") || + message.includes("ACP session metadata missing") + ); +} + +function classifyAcpStatusProbeError(params: { error: unknown; isStaleRunning: boolean }): { + status: "stale" | "uncertain"; + reason: string; +} { + if (isAcpRuntimeError(params.error) && params.error.code === "ACP_SESSION_INIT_FAILED") { + return { status: "stale", reason: "session-init-failed" }; + } + + const message = params.error instanceof Error ? params.error.message : String(params.error); + if (isLegacyMissingSessionError(message)) { + return { status: "stale", reason: "session-missing" }; + } + + return params.isStaleRunning + ? { status: "stale", reason: "status-error-running-stale" } + : { status: "uncertain", reason: "status-error" }; +} + +async function probeDiscordAcpBindingHealth(params: { + cfg: OpenClawConfig; + sessionKey: string; + storedState?: "idle" | "running" | "error"; + lastActivityAt?: number; +}): Promise<{ status: "healthy" | "stale" | "uncertain"; reason?: string }> { + const manager = getAcpSessionManager(); + const statusProbeAbortController = new AbortController(); + const statusPromise = manager + .getSessionStatus({ + cfg: params.cfg, + sessionKey: params.sessionKey, + signal: statusProbeAbortController.signal, + }) + .then((status) => ({ kind: "status" as const, status })) + .catch((error: unknown) => ({ kind: "error" as const, error })); + + let timeoutTimer: ReturnType | null = null; + const timeoutPromise = new Promise<{ kind: "timeout" }>((resolve) => { + timeoutTimer = setTimeout( + () => resolve({ kind: "timeout" }), + DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS, + ); + timeoutTimer.unref?.(); + }); + const result = await Promise.race([statusPromise, timeoutPromise]); + if (timeoutTimer) { + clearTimeout(timeoutTimer); + } + if (result.kind === "timeout") { + statusProbeAbortController.abort(); + } + const runningForMs = + params.storedState === "running" && Number.isFinite(params.lastActivityAt) + ? Date.now() - Math.max(0, Math.floor(params.lastActivityAt ?? 0)) + : 0; + const isStaleRunning = + params.storedState === "running" && runningForMs >= DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS; + + if (result.kind === "timeout") { + return isStaleRunning + ? { status: "stale", reason: "status-timeout-running-stale" } + : { status: "uncertain", reason: "status-timeout" }; + } + if (result.kind === "error") { + return classifyAcpStatusProbeError({ + error: result.error, + isStaleRunning, + }); + } + if (result.status.state === "error") { + // ACP error state is recoverable (next turn can clear it), so keep the + // binding unless stronger stale signals exist. + return { status: "uncertain", reason: "status-error-state" }; + } + return { status: "healthy" }; +} + async function deployDiscordCommands(params: { client: Client; runtime: RuntimeEnv; @@ -382,14 +470,32 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { }) : createNoopThreadBindingManager(account.accountId); if (threadBindingsEnabled) { - const reconciliation = reconcileAcpThreadBindingsOnStartup({ + const uncertainProbeKeys = new Set(); + const reconciliation = await reconcileAcpThreadBindingsOnStartup({ cfg, accountId: account.accountId, sendFarewell: false, + healthProbe: async ({ sessionKey, session }) => { + const probe = await probeDiscordAcpBindingHealth({ + cfg, + sessionKey, + storedState: session.acp?.state, + lastActivityAt: session.acp?.lastActivityAt, + }); + if (probe.status === "uncertain") { + uncertainProbeKeys.add(`${sessionKey}${probe.reason ? ` (${probe.reason})` : ""}`); + } + return probe; + }, }); if (reconciliation.removed > 0) { logVerbose( - `discord: removed ${reconciliation.removed}/${reconciliation.checked} stale ACP thread bindings on startup for account ${account.accountId}`, + `discord: removed ${reconciliation.removed}/${reconciliation.checked} stale ACP thread bindings on startup for account ${account.accountId}: ${reconciliation.staleSessionKeys.join(", ")}`, + ); + } + if (uncertainProbeKeys.size > 0) { + logVerbose( + `discord: ACP thread-binding health probe uncertain for account ${account.accountId}: ${[...uncertainProbeKeys].join(", ")}`, ); } } @@ -599,6 +705,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime, setStatus: opts.setStatus, abortSignal: opts.abortSignal, + listenerTimeoutMs: eventQueueOpts.listenerTimeout, botUserId, guildHistories, historyLimit, @@ -623,7 +730,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { registerDiscordListener( client.listeners, - new DiscordMessageListener(messageHandler, logger, trackInboundEvent), + new DiscordMessageListener(messageHandler, logger, trackInboundEvent, { + timeoutMs: eventQueueOpts.listenerTimeout, + }), ); const reactionListenerOptions = { cfg, diff --git a/src/discord/monitor/thread-bindings.lifecycle.test.ts b/src/discord/monitor/thread-bindings.lifecycle.test.ts index 0e5518d928a..b4eeb229f6f 100644 --- a/src/discord/monitor/thread-bindings.lifecycle.test.ts +++ b/src/discord/monitor/thread-bindings.lifecycle.test.ts @@ -811,7 +811,7 @@ describe("thread binding lifecycle", () => { }; }); - const result = reconcileAcpThreadBindingsOnStartup({ + const result = await reconcileAcpThreadBindingsOnStartup({ cfg: {} as OpenClawConfig, accountId: "default", }); @@ -855,7 +855,7 @@ describe("thread binding lifecycle", () => { acp: undefined, }); - const result = reconcileAcpThreadBindingsOnStartup({ + const result = await reconcileAcpThreadBindingsOnStartup({ cfg: {} as OpenClawConfig, accountId: "default", }); @@ -866,6 +866,287 @@ describe("thread binding lifecycle", () => { expect(manager.getByThreadId("thread-acp-uncertain")).toBeDefined(); }); + it("removes ACP bindings when health probe marks running session as stale", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + await manager.bindTarget({ + threadId: "thread-acp-running", + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: "agent:codex:acp:running", + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + + hoisted.readAcpSessionEntry.mockReturnValue({ + sessionKey: "agent:codex:acp:running", + storeSessionKey: "agent:codex:acp:running", + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:running", + mode: "persistent", + state: "running", + lastActivityAt: Date.now() - 5 * 60 * 1000, + }, + }); + + const result = await reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + healthProbe: async () => ({ status: "stale", reason: "status-timeout-running-stale" }), + }); + + expect(result.checked).toBe(1); + expect(result.removed).toBe(1); + expect(result.staleSessionKeys).toContain("agent:codex:acp:running"); + expect(manager.getByThreadId("thread-acp-running")).toBeUndefined(); + }); + + it("keeps running ACP bindings when health probe is uncertain", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + await manager.bindTarget({ + threadId: "thread-acp-running-uncertain", + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: "agent:codex:acp:running-uncertain", + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + + hoisted.readAcpSessionEntry.mockReturnValue({ + sessionKey: "agent:codex:acp:running-uncertain", + storeSessionKey: "agent:codex:acp:running-uncertain", + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:running-uncertain", + mode: "persistent", + state: "running", + lastActivityAt: Date.now(), + }, + }); + + const result = await reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + healthProbe: async () => ({ status: "uncertain", reason: "status-timeout" }), + }); + + expect(result.checked).toBe(1); + expect(result.removed).toBe(0); + expect(result.staleSessionKeys).toEqual([]); + expect(manager.getByThreadId("thread-acp-running-uncertain")).toBeDefined(); + }); + + it("keeps ACP bindings in stored error state when no explicit stale probe verdict exists", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + await manager.bindTarget({ + threadId: "thread-acp-error", + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: "agent:codex:acp:error", + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + + hoisted.readAcpSessionEntry.mockReturnValue({ + sessionKey: "agent:codex:acp:error", + storeSessionKey: "agent:codex:acp:error", + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "runtime:error", + mode: "persistent", + state: "error", + lastActivityAt: Date.now(), + }, + }); + + const result = await reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + }); + + expect(result.checked).toBe(1); + expect(result.removed).toBe(0); + expect(result.staleSessionKeys).toEqual([]); + expect(manager.getByThreadId("thread-acp-error")).toBeDefined(); + }); + + it("starts ACP health probes in parallel during startup reconciliation", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + await manager.bindTarget({ + threadId: "thread-acp-probe-1", + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: "agent:codex:acp:probe-1", + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + await manager.bindTarget({ + threadId: "thread-acp-probe-2", + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: "agent:codex:acp:probe-2", + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + + hoisted.readAcpSessionEntry.mockImplementation((paramsUnknown: unknown) => { + const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? ""; + return { + sessionKey, + storeSessionKey: sessionKey, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: `runtime:${sessionKey}`, + mode: "persistent", + state: "running", + lastActivityAt: Date.now(), + }, + }; + }); + + let resolveFirstProbe: ((value: { status: "healthy" }) => void) | undefined; + const firstProbe = new Promise<{ status: "healthy" }>((resolve) => { + resolveFirstProbe = resolve; + }); + let probeCallCount = 0; + let secondProbeStartedBeforeFirstResolved = false; + + const reconcilePromise = reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + healthProbe: async () => { + probeCallCount += 1; + if (probeCallCount === 1) { + return await firstProbe; + } + secondProbeStartedBeforeFirstResolved = true; + return { status: "healthy" as const }; + }, + }); + + await Promise.resolve(); + await Promise.resolve(); + const observedParallelStart = secondProbeStartedBeforeFirstResolved; + + resolveFirstProbe?.({ status: "healthy" }); + const result = await reconcilePromise; + + expect(observedParallelStart).toBe(true); + expect(result.checked).toBe(2); + expect(result.removed).toBe(0); + }); + + it("caps ACP startup health probe concurrency", async () => { + const manager = createThreadBindingManager({ + accountId: "default", + persist: false, + enableSweeper: false, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + }); + + for (let index = 0; index < 12; index += 1) { + const key = `agent:codex:acp:cap-${index}`; + await manager.bindTarget({ + threadId: `thread-acp-cap-${index}`, + channelId: "parent-1", + targetKind: "acp", + targetSessionKey: key, + agentId: "codex", + webhookId: "wh-1", + webhookToken: "tok-1", + }); + } + + hoisted.readAcpSessionEntry.mockImplementation((paramsUnknown: unknown) => { + const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? ""; + return { + sessionKey, + storeSessionKey: sessionKey, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: `runtime:${sessionKey}`, + mode: "persistent", + state: "running", + lastActivityAt: Date.now(), + }, + }; + }); + + const PROBE_LIMIT = 8; + let probeCalls = 0; + let inFlight = 0; + let maxInFlight = 0; + let releaseFirstWave: (() => void) | undefined; + const firstWaveGate = new Promise((resolve) => { + releaseFirstWave = resolve; + }); + + const reconcilePromise = reconcileAcpThreadBindingsOnStartup({ + cfg: {} as OpenClawConfig, + accountId: "default", + healthProbe: async () => { + probeCalls += 1; + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + if (probeCalls <= PROBE_LIMIT) { + await firstWaveGate; + } + inFlight -= 1; + return { status: "healthy" as const }; + }, + }); + + await vi.waitFor(() => { + expect(probeCalls).toBe(PROBE_LIMIT); + }); + expect(maxInFlight).toBe(PROBE_LIMIT); + + releaseFirstWave?.(); + const result = await reconcilePromise; + expect(result.checked).toBe(12); + expect(result.removed).toBe(0); + expect(maxInFlight).toBeLessThanOrEqual(PROBE_LIMIT); + }); + it("migrates legacy expiresAt bindings to idle/max-age semantics", () => { const previousStateDir = process.env.OPENCLAW_STATE_DIR; const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-thread-bindings-")); diff --git a/src/discord/monitor/thread-bindings.lifecycle.ts b/src/discord/monitor/thread-bindings.lifecycle.ts index bfc6c8513fb..f5beb9a3e6f 100644 --- a/src/discord/monitor/thread-bindings.lifecycle.ts +++ b/src/discord/monitor/thread-bindings.lifecycle.ts @@ -1,4 +1,4 @@ -import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; +import { readAcpSessionEntry, type AcpSessionStoreEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import { normalizeAccountId } from "../../routing/session-key.js"; import { parseDiscordTarget } from "../targets.js"; @@ -29,6 +29,50 @@ export type AcpThreadBindingReconciliationResult = { staleSessionKeys: string[]; }; +export type AcpThreadBindingHealthStatus = "healthy" | "stale" | "uncertain"; + +export type AcpThreadBindingHealthProbe = (params: { + cfg: OpenClawConfig; + accountId: string; + sessionKey: string; + binding: ThreadBindingRecord; + session: AcpSessionStoreEntry; +}) => Promise<{ + status: AcpThreadBindingHealthStatus; + reason?: string; +}>; + +// Cap startup fan-out so large binding sets do not create unbounded ACP probe spikes. +const ACP_STARTUP_HEALTH_PROBE_CONCURRENCY_LIMIT = 8; + +async function mapWithConcurrency(params: { + items: TItem[]; + limit: number; + worker: (item: TItem, index: number) => Promise; +}): Promise { + if (params.items.length === 0) { + return []; + } + const limit = Math.max(1, Math.floor(params.limit)); + const resultsByIndex = new Map(); + let nextIndex = 0; + + const runWorker = async () => { + for (;;) { + const index = nextIndex; + nextIndex += 1; + if (index >= params.items.length) { + return; + } + resultsByIndex.set(index, await params.worker(params.items[index], index)); + } + }; + + const workers = Array.from({ length: Math.min(limit, params.items.length) }, () => runWorker()); + await Promise.all(workers); + return params.items.map((_item, index) => resultsByIndex.get(index)!); +} + function normalizeNonNegativeMs(raw: number): number { if (!Number.isFinite(raw)) { return 0; @@ -259,11 +303,21 @@ export function setThreadBindingMaxAgeBySessionKey(params: { return updated; } -export function reconcileAcpThreadBindingsOnStartup(params: { +function resolveStoredAcpBindingHealth(params: { + session: AcpSessionStoreEntry; +}): AcpThreadBindingHealthStatus { + if (!params.session.acp) { + return "stale"; + } + return "healthy"; +} + +export async function reconcileAcpThreadBindingsOnStartup(params: { cfg: OpenClawConfig; accountId?: string; sendFarewell?: boolean; -}): AcpThreadBindingReconciliationResult { + healthProbe?: AcpThreadBindingHealthProbe; +}): Promise { const manager = getThreadBindingManager(params.accountId); if (!manager) { return { @@ -274,21 +328,77 @@ export function reconcileAcpThreadBindingsOnStartup(params: { } const acpBindings = manager.listBindings().filter((binding) => binding.targetKind === "acp"); - const staleBindings = acpBindings.filter((binding) => { + const staleBindings: ThreadBindingRecord[] = []; + const probeTargets: Array<{ + binding: ThreadBindingRecord; + sessionKey: string; + session: AcpSessionStoreEntry; + }> = []; + + for (const binding of acpBindings) { const sessionKey = binding.targetSessionKey.trim(); if (!sessionKey) { - return true; + staleBindings.push(binding); + continue; } const session = readAcpSessionEntry({ cfg: params.cfg, sessionKey, }); - // Session store read failures are transient; never auto-unbind on uncertain reads. - if (session?.storeReadFailed) { - return false; + if (!session) { + staleBindings.push(binding); + continue; } - return !session?.acp; - }); + // Session store read failures are transient; never auto-unbind on uncertain reads. + if (session.storeReadFailed) { + continue; + } + + if (resolveStoredAcpBindingHealth({ session }) === "stale") { + staleBindings.push(binding); + continue; + } + + if (!params.healthProbe) { + continue; + } + probeTargets.push({ binding, sessionKey, session }); + } + + if (params.healthProbe && probeTargets.length > 0) { + const probeResults = await mapWithConcurrency({ + items: probeTargets, + limit: ACP_STARTUP_HEALTH_PROBE_CONCURRENCY_LIMIT, + worker: async ({ binding, sessionKey, session }) => { + try { + const result = await params.healthProbe?.({ + cfg: params.cfg, + accountId: manager.accountId, + sessionKey, + binding, + session, + }); + return { + binding, + status: result?.status ?? ("uncertain" satisfies AcpThreadBindingHealthStatus), + }; + } catch { + // Treat probe failures as uncertain and keep the binding. + return { + binding, + status: "uncertain" satisfies AcpThreadBindingHealthStatus, + }; + } + }, + }); + + for (const probeResult of probeResults) { + if (probeResult.status === "stale") { + staleBindings.push(probeResult.binding); + } + } + } + if (staleBindings.length === 0) { return { checked: acpBindings.length, diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index 1acd1bea175..2c7e7a6aeba 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -144,20 +144,23 @@ function getCachedAgentRun(runId: string) { export async function waitForAgentJob(params: { runId: string; timeoutMs: number; + signal?: AbortSignal; + ignoreCachedSnapshot?: boolean; }): Promise { - const { runId, timeoutMs } = params; + const { runId, timeoutMs, signal, ignoreCachedSnapshot = false } = params; ensureAgentRunListener(); - const cached = getCachedAgentRun(runId); + const cached = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId); if (cached) { return cached; } - if (timeoutMs <= 0) { + if (timeoutMs <= 0 || signal?.aborted) { return null; } return await new Promise((resolve) => { let settled = false; let pendingErrorTimer: NodeJS.Timeout | undefined; + let onAbort: (() => void) | undefined; const clearPendingErrorTimer = () => { if (!pendingErrorTimer) { @@ -175,6 +178,9 @@ export async function waitForAgentJob(params: { clearTimeout(timer); clearPendingErrorTimer(); unsubscribe(); + if (onAbort) { + signal?.removeEventListener("abort", onAbort); + } resolve(entry); }; @@ -185,7 +191,7 @@ export async function waitForAgentJob(params: { clearPendingErrorTimer(); const effectiveDelay = Math.max(1, Math.min(Math.floor(delayMs), 2_147_483_647)); pendingErrorTimer = setTimeout(() => { - const latest = getCachedAgentRun(runId); + const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId); if (latest) { finish(latest); return; @@ -196,9 +202,11 @@ export async function waitForAgentJob(params: { pendingErrorTimer.unref?.(); }; - const pending = getPendingAgentRunError(runId); - if (pending) { - scheduleErrorFinish(pending.snapshot, pending.dueAt - Date.now()); + if (!ignoreCachedSnapshot) { + const pending = getPendingAgentRunError(runId); + if (pending) { + scheduleErrorFinish(pending.snapshot, pending.dueAt - Date.now()); + } } const unsubscribe = onAgentEvent((evt) => { @@ -216,7 +224,7 @@ export async function waitForAgentJob(params: { if (phase !== "end" && phase !== "error") { return; } - const latest = getCachedAgentRun(runId); + const latest = ignoreCachedSnapshot ? undefined : getCachedAgentRun(runId); if (latest) { finish(latest); return; @@ -236,6 +244,8 @@ export async function waitForAgentJob(params: { const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647)); const timer = setTimeout(() => finish(null), timerDelayMs); + onAbort = () => finish(null); + signal?.addEventListener("abort", onAbort, { once: true }); }); } diff --git a/src/gateway/server-methods/agent-wait-dedupe.test.ts b/src/gateway/server-methods/agent-wait-dedupe.test.ts new file mode 100644 index 00000000000..e9a1899c88b --- /dev/null +++ b/src/gateway/server-methods/agent-wait-dedupe.test.ts @@ -0,0 +1,323 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + __testing, + readTerminalSnapshotFromGatewayDedupe, + setGatewayDedupeEntry, + waitForTerminalGatewayDedupe, +} from "./agent-wait-dedupe.js"; + +describe("agent wait dedupe helper", () => { + beforeEach(() => { + __testing.resetWaiters(); + vi.useFakeTimers(); + }); + + afterEach(() => { + __testing.resetWaiters(); + vi.useRealTimers(); + }); + + it("unblocks waiters when a terminal chat dedupe entry is written", async () => { + const dedupe = new Map(); + const runId = "run-chat-terminal"; + const waiter = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 1_000, + }); + + await Promise.resolve(); + expect(__testing.getWaiterCount(runId)).toBe(1); + + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "ok", + startedAt: 100, + endedAt: 200, + }, + }, + }); + + await expect(waiter).resolves.toEqual({ + status: "ok", + startedAt: 100, + endedAt: 200, + error: undefined, + }); + expect(__testing.getWaiterCount(runId)).toBe(0); + }); + + it("keeps stale chat dedupe blocked while agent dedupe is in-flight", async () => { + const dedupe = new Map(); + const runId = "run-stale-chat"; + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "ok", + }, + }, + }); + setGatewayDedupeEntry({ + dedupe, + key: `agent:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "accepted", + }, + }, + }); + + const snapshot = readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }); + expect(snapshot).toBeNull(); + + const blockedWait = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 25, + }); + await vi.advanceTimersByTimeAsync(30); + await expect(blockedWait).resolves.toBeNull(); + expect(__testing.getWaiterCount(runId)).toBe(0); + }); + + it("uses newer terminal chat snapshot when agent entry is non-terminal", () => { + const dedupe = new Map(); + const runId = "run-nonterminal-agent-with-newer-chat"; + setGatewayDedupeEntry({ + dedupe, + key: `agent:${runId}`, + entry: { + ts: 100, + ok: true, + payload: { + runId, + status: "accepted", + }, + }, + }); + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: 200, + ok: true, + payload: { + runId, + status: "ok", + startedAt: 1, + endedAt: 2, + }, + }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "ok", + startedAt: 1, + endedAt: 2, + error: undefined, + }); + }); + + it("ignores stale agent snapshots when waiting for an active chat run", async () => { + const dedupe = new Map(); + const runId = "run-chat-active-ignore-agent"; + setGatewayDedupeEntry({ + dedupe, + key: `agent:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "ok", + }, + }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + ignoreAgentTerminalSnapshot: true, + }), + ).toBeNull(); + + const wait = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 1_000, + ignoreAgentTerminalSnapshot: true, + }); + await Promise.resolve(); + expect(__testing.getWaiterCount(runId)).toBe(1); + + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { + runId, + status: "ok", + startedAt: 123, + endedAt: 456, + }, + }, + }); + + await expect(wait).resolves.toEqual({ + status: "ok", + startedAt: 123, + endedAt: 456, + error: undefined, + }); + }); + + it("prefers the freshest terminal snapshot when agent/chat dedupe keys collide", () => { + const runId = "run-collision"; + const dedupe = new Map(); + + setGatewayDedupeEntry({ + dedupe, + key: `agent:${runId}`, + entry: { + ts: 100, + ok: true, + payload: { runId, status: "ok", startedAt: 10, endedAt: 20 }, + }, + }); + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: 200, + ok: false, + payload: { runId, status: "error", startedAt: 30, endedAt: 40, error: "chat failed" }, + }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe, + runId, + }), + ).toEqual({ + status: "error", + startedAt: 30, + endedAt: 40, + error: "chat failed", + }); + + const dedupeReverse = new Map(); + setGatewayDedupeEntry({ + dedupe: dedupeReverse, + key: `chat:${runId}`, + entry: { + ts: 100, + ok: true, + payload: { runId, status: "ok", startedAt: 1, endedAt: 2 }, + }, + }); + setGatewayDedupeEntry({ + dedupe: dedupeReverse, + key: `agent:${runId}`, + entry: { + ts: 200, + ok: true, + payload: { runId, status: "timeout", startedAt: 3, endedAt: 4, error: "still running" }, + }, + }); + + expect( + readTerminalSnapshotFromGatewayDedupe({ + dedupe: dedupeReverse, + runId, + }), + ).toEqual({ + status: "timeout", + startedAt: 3, + endedAt: 4, + error: "still running", + }); + }); + + it("resolves multiple waiters for the same run id", async () => { + const dedupe = new Map(); + const runId = "run-multi"; + const first = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 1_000, + }); + const second = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 1_000, + }); + + await Promise.resolve(); + expect(__testing.getWaiterCount(runId)).toBe(2); + + setGatewayDedupeEntry({ + dedupe, + key: `chat:${runId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { runId, status: "ok" }, + }, + }); + + await expect(first).resolves.toEqual( + expect.objectContaining({ + status: "ok", + }), + ); + await expect(second).resolves.toEqual( + expect.objectContaining({ + status: "ok", + }), + ); + expect(__testing.getWaiterCount(runId)).toBe(0); + }); + + it("cleans up waiter registration on timeout", async () => { + const dedupe = new Map(); + const runId = "run-timeout"; + const wait = waitForTerminalGatewayDedupe({ + dedupe, + runId, + timeoutMs: 20, + }); + + await Promise.resolve(); + expect(__testing.getWaiterCount(runId)).toBe(1); + + await vi.advanceTimersByTimeAsync(25); + await expect(wait).resolves.toBeNull(); + expect(__testing.getWaiterCount(runId)).toBe(0); + }); +}); diff --git a/src/gateway/server-methods/agent-wait-dedupe.ts b/src/gateway/server-methods/agent-wait-dedupe.ts new file mode 100644 index 00000000000..98d0df72fa3 --- /dev/null +++ b/src/gateway/server-methods/agent-wait-dedupe.ts @@ -0,0 +1,244 @@ +import type { DedupeEntry } from "../server-shared.js"; + +export type AgentWaitTerminalSnapshot = { + status: "ok" | "error" | "timeout"; + startedAt?: number; + endedAt?: number; + error?: string; +}; + +const AGENT_WAITERS_BY_RUN_ID = new Map void>>(); + +function parseRunIdFromDedupeKey(key: string): string | null { + if (key.startsWith("agent:")) { + return key.slice("agent:".length) || null; + } + if (key.startsWith("chat:")) { + return key.slice("chat:".length) || null; + } + return null; +} + +function asFiniteNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function addWaiter(runId: string, waiter: () => void): () => void { + const normalizedRunId = runId.trim(); + if (!normalizedRunId) { + return () => {}; + } + const existing = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId); + if (existing) { + existing.add(waiter); + return () => { + const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId); + if (!waiters) { + return; + } + waiters.delete(waiter); + if (waiters.size === 0) { + AGENT_WAITERS_BY_RUN_ID.delete(normalizedRunId); + } + }; + } + AGENT_WAITERS_BY_RUN_ID.set(normalizedRunId, new Set([waiter])); + return () => { + const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId); + if (!waiters) { + return; + } + waiters.delete(waiter); + if (waiters.size === 0) { + AGENT_WAITERS_BY_RUN_ID.delete(normalizedRunId); + } + }; +} + +function notifyWaiters(runId: string): void { + const normalizedRunId = runId.trim(); + if (!normalizedRunId) { + return; + } + const waiters = AGENT_WAITERS_BY_RUN_ID.get(normalizedRunId); + if (!waiters || waiters.size === 0) { + return; + } + for (const waiter of waiters) { + waiter(); + } +} + +export function readTerminalSnapshotFromDedupeEntry( + entry: DedupeEntry, +): AgentWaitTerminalSnapshot | null { + const payload = entry.payload as + | { + status?: unknown; + startedAt?: unknown; + endedAt?: unknown; + error?: unknown; + summary?: unknown; + } + | undefined; + const status = typeof payload?.status === "string" ? payload.status : undefined; + if (status === "accepted" || status === "started" || status === "in_flight") { + return null; + } + + const startedAt = asFiniteNumber(payload?.startedAt); + const endedAt = asFiniteNumber(payload?.endedAt) ?? entry.ts; + const errorMessage = + typeof payload?.error === "string" + ? payload.error + : typeof payload?.summary === "string" + ? payload.summary + : entry.error?.message; + + if (status === "ok" || status === "timeout") { + return { + status, + startedAt, + endedAt, + error: status === "timeout" ? errorMessage : undefined, + }; + } + if (status === "error" || !entry.ok) { + return { + status: "error", + startedAt, + endedAt, + error: errorMessage, + }; + } + return null; +} + +export function readTerminalSnapshotFromGatewayDedupe(params: { + dedupe: Map; + runId: string; + ignoreAgentTerminalSnapshot?: boolean; +}): AgentWaitTerminalSnapshot | null { + if (params.ignoreAgentTerminalSnapshot) { + const chatEntry = params.dedupe.get(`chat:${params.runId}`); + if (!chatEntry) { + return null; + } + return readTerminalSnapshotFromDedupeEntry(chatEntry); + } + + const chatEntry = params.dedupe.get(`chat:${params.runId}`); + const chatSnapshot = chatEntry ? readTerminalSnapshotFromDedupeEntry(chatEntry) : null; + + const agentEntry = params.dedupe.get(`agent:${params.runId}`); + const agentSnapshot = agentEntry ? readTerminalSnapshotFromDedupeEntry(agentEntry) : null; + if (agentEntry) { + if (!agentSnapshot) { + // If agent is still in-flight, only trust chat if it was written after + // this agent entry (indicating a newer completed chat run reused runId). + if (chatSnapshot && chatEntry && chatEntry.ts > agentEntry.ts) { + return chatSnapshot; + } + return null; + } + } + + if (agentSnapshot && chatSnapshot && agentEntry && chatEntry) { + // Reused idempotency keys can leave both records present. Prefer the + // freshest terminal snapshot so callers observe the latest run outcome. + return chatEntry.ts > agentEntry.ts ? chatSnapshot : agentSnapshot; + } + + return agentSnapshot ?? chatSnapshot; +} + +export async function waitForTerminalGatewayDedupe(params: { + dedupe: Map; + runId: string; + timeoutMs: number; + signal?: AbortSignal; + ignoreAgentTerminalSnapshot?: boolean; +}): Promise { + const initial = readTerminalSnapshotFromGatewayDedupe(params); + if (initial) { + return initial; + } + if (params.timeoutMs <= 0 || params.signal?.aborted) { + return null; + } + + return await new Promise((resolve) => { + let settled = false; + let timeoutHandle: NodeJS.Timeout | undefined; + let onAbort: (() => void) | undefined; + let removeWaiter: (() => void) | undefined; + + const finish = (snapshot: AgentWaitTerminalSnapshot | null) => { + if (settled) { + return; + } + settled = true; + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + if (onAbort) { + params.signal?.removeEventListener("abort", onAbort); + } + removeWaiter?.(); + resolve(snapshot); + }; + + const onWake = () => { + const snapshot = readTerminalSnapshotFromGatewayDedupe(params); + if (snapshot) { + finish(snapshot); + } + }; + + removeWaiter = addWaiter(params.runId, onWake); + onWake(); + if (settled) { + return; + } + + const timeoutDelayMs = Math.max(1, Math.min(Math.floor(params.timeoutMs), 2_147_483_647)); + timeoutHandle = setTimeout(() => finish(null), timeoutDelayMs); + timeoutHandle.unref?.(); + + onAbort = () => finish(null); + params.signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + +export function setGatewayDedupeEntry(params: { + dedupe: Map; + key: string; + entry: DedupeEntry; +}) { + params.dedupe.set(params.key, params.entry); + const runId = parseRunIdFromDedupeKey(params.key); + if (!runId) { + return; + } + const snapshot = readTerminalSnapshotFromDedupeEntry(params.entry); + if (!snapshot) { + return; + } + notifyWaiters(runId); +} + +export const __testing = { + getWaiterCount(runId?: string): number { + if (runId) { + return AGENT_WAITERS_BY_RUN_ID.get(runId)?.size ?? 0; + } + let total = 0; + for (const waiters of AGENT_WAITERS_BY_RUN_ID.values()) { + total += waiters.size; + } + return total; + }, + resetWaiters() { + AGENT_WAITERS_BY_RUN_ID.clear(); + }, +}; diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 41228b4ffae..aa56b857aca 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -51,6 +51,12 @@ import { import { formatForLog } from "../ws-log.js"; import { waitForAgentJob } from "./agent-job.js"; import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; +import { + readTerminalSnapshotFromGatewayDedupe, + setGatewayDedupeEntry, + type AgentWaitTerminalSnapshot, + waitForTerminalGatewayDedupe, +} from "./agent-wait-dedupe.js"; import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js"; import { sessionsHandlers } from "./sessions.js"; import type { GatewayRequestHandlerOptions, GatewayRequestHandlers } from "./types.js"; @@ -593,10 +599,14 @@ export const agentHandlers: GatewayRequestHandlers = { acceptedAt: Date.now(), }; // Store an in-flight ack so retries do not spawn a second run. - context.dedupe.set(`agent:${idem}`, { - ts: Date.now(), - ok: true, - payload: accepted, + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `agent:${idem}`, + entry: { + ts: Date.now(), + ok: true, + payload: accepted, + }, }); respond(true, accepted, undefined, { runId }); @@ -647,10 +657,14 @@ export const agentHandlers: GatewayRequestHandlers = { summary: "completed", result, }; - context.dedupe.set(`agent:${idem}`, { - ts: Date.now(), - ok: true, - payload, + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `agent:${idem}`, + entry: { + ts: Date.now(), + ok: true, + payload, + }, }); // Send a second res frame (same id) so TS clients with expectFinal can wait. // Swift clients will typically treat the first res as the result and ignore this. @@ -663,11 +677,15 @@ export const agentHandlers: GatewayRequestHandlers = { status: "error" as const, summary: String(err), }; - context.dedupe.set(`agent:${idem}`, { - ts: Date.now(), - ok: false, - payload, - error, + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `agent:${idem}`, + entry: { + ts: Date.now(), + ok: false, + payload, + error, + }, }); respond(false, payload, error, { runId, @@ -729,7 +747,7 @@ export const agentHandlers: GatewayRequestHandlers = { }) ?? identity.avatar; respond(true, { ...identity, avatar: avatarValue }, undefined); }, - "agent.wait": async ({ params, respond }) => { + "agent.wait": async ({ params, respond, context }) => { if (!validateAgentWaitParams(params)) { respond( false, @@ -747,11 +765,61 @@ export const agentHandlers: GatewayRequestHandlers = { typeof p.timeoutMs === "number" && Number.isFinite(p.timeoutMs) ? Math.max(0, Math.floor(p.timeoutMs)) : 30_000; + const hasActiveChatRun = context.chatAbortControllers.has(runId); - const snapshot = await waitForAgentJob({ + const cachedGatewaySnapshot = readTerminalSnapshotFromGatewayDedupe({ + dedupe: context.dedupe, + runId, + ignoreAgentTerminalSnapshot: hasActiveChatRun, + }); + if (cachedGatewaySnapshot) { + respond(true, { + runId, + status: cachedGatewaySnapshot.status, + startedAt: cachedGatewaySnapshot.startedAt, + endedAt: cachedGatewaySnapshot.endedAt, + error: cachedGatewaySnapshot.error, + }); + return; + } + + const lifecycleAbortController = new AbortController(); + const dedupeAbortController = new AbortController(); + const lifecyclePromise = waitForAgentJob({ runId, timeoutMs, + signal: lifecycleAbortController.signal, + // When chat.send is active with the same runId, ignore cached lifecycle + // snapshots so stale agent results do not preempt the active chat run. + ignoreCachedSnapshot: hasActiveChatRun, }); + const dedupePromise = waitForTerminalGatewayDedupe({ + dedupe: context.dedupe, + runId, + timeoutMs, + signal: dedupeAbortController.signal, + ignoreAgentTerminalSnapshot: hasActiveChatRun, + }); + + const first = await Promise.race([ + lifecyclePromise.then((snapshot) => ({ source: "lifecycle" as const, snapshot })), + dedupePromise.then((snapshot) => ({ source: "dedupe" as const, snapshot })), + ]); + + let snapshot: AgentWaitTerminalSnapshot | Awaited> = + first.snapshot; + if (snapshot) { + if (first.source === "lifecycle") { + dedupeAbortController.abort(); + } else { + lifecycleAbortController.abort(); + } + } else { + snapshot = first.source === "lifecycle" ? await dedupePromise : await lifecyclePromise; + lifecycleAbortController.abort(); + dedupeAbortController.abort(); + } + if (!snapshot) { respond(true, { runId, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index db78d79666a..13feee2d131 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -47,6 +47,7 @@ import { } from "../session-utils.js"; import { formatForLog } from "../ws-log.js"; import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; +import { setGatewayDedupeEntry } from "./agent-wait-dedupe.js"; import { normalizeRpcAttachmentsToChatAttachments } from "./attachment-normalize.js"; import { appendInjectedAssistantMessageToTranscript } from "./chat-transcript-inject.js"; import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; @@ -1030,23 +1031,31 @@ export const chatHandlers: GatewayRequestHandlers = { message, }); } - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: true, - payload: { runId: clientRunId, status: "ok" as const }, + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `chat:${clientRunId}`, + entry: { + ts: Date.now(), + ok: true, + payload: { runId: clientRunId, status: "ok" as const }, + }, }); }) .catch((err) => { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: false, - payload: { - runId: clientRunId, - status: "error" as const, - summary: String(err), + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `chat:${clientRunId}`, + entry: { + ts: Date.now(), + ok: false, + payload: { + runId: clientRunId, + status: "error" as const, + summary: String(err), + }, + error, }, - error, }); broadcastChatError({ context, @@ -1065,11 +1074,15 @@ export const chatHandlers: GatewayRequestHandlers = { status: "error" as const, summary: String(err), }; - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: false, - payload, - error, + setGatewayDedupeEntry({ + dedupe: context.dedupe, + key: `chat:${clientRunId}`, + entry: { + ts: Date.now(), + ok: false, + payload, + error, + }, }); respond(false, payload, error, { runId: clientRunId, diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 920d51b0400..4ea91ea247f 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -69,6 +69,43 @@ describe("waitForAgentJob", () => { expect(snapshot?.startedAt).toBe(300); expect(snapshot?.endedAt).toBe(400); }); + + it("can ignore cached snapshots and wait for fresh lifecycle events", async () => { + const runId = `run-ignore-cache-${Date.now()}-${Math.random().toString(36).slice(2)}`; + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "end", startedAt: 100, endedAt: 110 }, + }); + + const cached = await waitForAgentJob({ runId, timeoutMs: 1_000 }); + expect(cached?.status).toBe("ok"); + expect(cached?.startedAt).toBe(100); + expect(cached?.endedAt).toBe(110); + + const freshWait = waitForAgentJob({ + runId, + timeoutMs: 1_000, + ignoreCachedSnapshot: true, + }); + queueMicrotask(() => { + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 200 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "end", startedAt: 200, endedAt: 210 }, + }); + }); + + const fresh = await freshWait; + expect(fresh?.status).toBe("ok"); + expect(fresh?.startedAt).toBe(200); + expect(fresh?.endedAt).toBe(210); + }); }); describe("injectTimestamp", () => { diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index e110ace1d73..7a5d84e62d8 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -466,6 +466,245 @@ describe("gateway server chat", () => { ]); }); + test("agent.wait resolves chat.send runs that finish without lifecycle events", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + try { + testState.sessionStorePath = path.join(dir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + }); + + const runId = "idem-wait-chat-1"; + const sendRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "/context list", + idempotencyKey: runId, + }); + expect(sendRes.ok).toBe(true); + expect(sendRes.payload?.status).toBe("started"); + + const waitRes = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 1_000, + }); + expect(waitRes.ok).toBe(true); + expect(waitRes.payload?.status).toBe("ok"); + } finally { + testState.sessionStorePath = undefined; + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + test("agent.wait ignores stale chat dedupe when an agent run with the same runId is in flight", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + let resolveAgentRun: (() => void) | undefined; + const blockedAgentRun = new Promise((resolve) => { + resolveAgentRun = resolve; + }); + const agentSpy = vi.mocked(agentCommand); + agentSpy.mockImplementationOnce(async () => { + await blockedAgentRun; + return undefined; + }); + + try { + testState.sessionStorePath = path.join(dir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + }); + + const runId = "idem-wait-chat-vs-agent"; + const sendRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "/context list", + idempotencyKey: runId, + }); + expect(sendRes.ok).toBe(true); + expect(sendRes.payload?.status).toBe("started"); + + const chatWaitRes = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 1_000, + }); + expect(chatWaitRes.ok).toBe(true); + expect(chatWaitRes.payload?.status).toBe("ok"); + + const agentRes = await rpcReq(ws, "agent", { + sessionKey: "main", + message: "hold this run open", + idempotencyKey: runId, + }); + expect(agentRes.ok).toBe(true); + expect(agentRes.payload?.status).toBe("accepted"); + + const waitWhileAgentInFlight = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 40, + }); + expectAgentWaitTimeout(waitWhileAgentInFlight); + + resolveAgentRun?.(); + const waitAfterAgentCompletion = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 1_000, + }); + expect(waitAfterAgentCompletion.ok).toBe(true); + expect(waitAfterAgentCompletion.payload?.status).toBe("ok"); + } finally { + resolveAgentRun?.(); + testState.sessionStorePath = undefined; + await fs.rm(dir, { recursive: true, force: true }); + } + }); + + test("agent.wait ignores stale agent snapshots while same-runId chat.send is active", async () => { + await withMainSessionStore(async () => { + const runId = "idem-wait-chat-active-vs-stale-agent"; + const seedAgentRes = await rpcReq(ws, "agent", { + sessionKey: "main", + message: "seed stale agent snapshot", + idempotencyKey: runId, + }); + expect(seedAgentRes.ok).toBe(true); + expect(seedAgentRes.payload?.status).toBe("accepted"); + + const seedWaitRes = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 1_000, + }); + expect(seedWaitRes.ok).toBe(true); + expect(seedWaitRes.payload?.status).toBe("ok"); + + let releaseBlockedReply: (() => void) | undefined; + const blockedReply = new Promise((resolve) => { + releaseBlockedReply = resolve; + }); + const replySpy = vi.mocked(getReplyFromConfig); + replySpy.mockImplementationOnce(async (_ctx, opts) => { + await new Promise((resolve) => { + let settled = false; + const finish = () => { + if (settled) { + return; + } + settled = true; + resolve(); + }; + void blockedReply.then(finish); + if (opts?.abortSignal?.aborted) { + finish(); + return; + } + opts?.abortSignal?.addEventListener("abort", finish, { once: true }); + }); + return undefined; + }); + + try { + const chatRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hold chat run open", + idempotencyKey: runId, + }); + expect(chatRes.ok).toBe(true); + expect(chatRes.payload?.status).toBe("started"); + + const waitWhileChatActive = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 40, + }); + expectAgentWaitTimeout(waitWhileChatActive); + + const abortRes = await rpcReq(ws, "chat.abort", { + sessionKey: "main", + runId, + }); + expect(abortRes.ok).toBe(true); + } finally { + releaseBlockedReply?.(); + } + }); + }); + + test("agent.wait keeps lifecycle wait active while same-runId chat.send is active", async () => { + await withMainSessionStore(async () => { + const runId = "idem-wait-chat-active-with-agent-lifecycle"; + let releaseBlockedReply: (() => void) | undefined; + const blockedReply = new Promise((resolve) => { + releaseBlockedReply = resolve; + }); + const replySpy = vi.mocked(getReplyFromConfig); + replySpy.mockImplementationOnce(async (_ctx, opts) => { + await new Promise((resolve) => { + let settled = false; + const finish = () => { + if (settled) { + return; + } + settled = true; + resolve(); + }; + void blockedReply.then(finish); + if (opts?.abortSignal?.aborted) { + finish(); + return; + } + opts?.abortSignal?.addEventListener("abort", finish, { once: true }); + }); + return undefined; + }); + + try { + const chatRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hold chat run open", + idempotencyKey: runId, + }); + expect(chatRes.ok).toBe(true); + expect(chatRes.payload?.status).toBe("started"); + + const waitP = rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 1_000, + }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "start", startedAt: 1 }, + }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "end", startedAt: 1, endedAt: 2 }, + }); + + const waitRes = await waitP; + expect(waitRes.ok).toBe(true); + expect(waitRes.payload?.status).toBe("ok"); + + const abortRes = await rpcReq(ws, "chat.abort", { + sessionKey: "main", + runId, + }); + expect(abortRes.ok).toBe(true); + } finally { + releaseBlockedReply?.(); + } + }); + }); + test("agent events include sessionKey and agent.wait covers lifecycle flows", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); testState.sessionStorePath = path.join(dir, "sessions.json"); diff --git a/src/plugin-sdk/root-alias.cjs b/src/plugin-sdk/root-alias.cjs index 37626deebaf..aa2127bdc9a 100644 --- a/src/plugin-sdk/root-alias.cjs +++ b/src/plugin-sdk/root-alias.cjs @@ -4,6 +4,7 @@ const path = require("node:path"); const fs = require("node:fs"); let monolithicSdk = null; +let jitiLoader = null; function emptyPluginConfigSchema() { function error(message) { @@ -31,16 +32,54 @@ function emptyPluginConfigSchema() { }; } +function resolveCommandAuthorizedFromAuthorizers(params) { + const { useAccessGroups, authorizers } = params; + const mode = params.modeWhenAccessGroupsOff ?? "allow"; + if (!useAccessGroups) { + if (mode === "allow") { + return true; + } + if (mode === "deny") { + return false; + } + const anyConfigured = authorizers.some((entry) => entry.configured); + if (!anyConfigured) { + return true; + } + return authorizers.some((entry) => entry.configured && entry.allowed); + } + return authorizers.some((entry) => entry.configured && entry.allowed); +} + +function resolveControlCommandGate(params) { + const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({ + useAccessGroups: params.useAccessGroups, + authorizers: params.authorizers, + modeWhenAccessGroupsOff: params.modeWhenAccessGroupsOff, + }); + const shouldBlock = params.allowTextCommands && params.hasControlCommand && !commandAuthorized; + return { commandAuthorized, shouldBlock }; +} + +function getJiti() { + if (jitiLoader) { + return jitiLoader; + } + + const { createJiti } = require("jiti"); + jitiLoader = createJiti(__filename, { + interopDefault: true, + extensions: [".ts", ".tsx", ".mts", ".cts", ".mtsx", ".ctsx", ".js", ".mjs", ".cjs", ".json"], + }); + return jitiLoader; +} + function loadMonolithicSdk() { if (monolithicSdk) { return monolithicSdk; } - const { createJiti } = require("jiti"); - const jiti = createJiti(__filename, { - interopDefault: true, - extensions: [".ts", ".tsx", ".mts", ".cts", ".mtsx", ".ctsx", ".js", ".mjs", ".cjs", ".json"], - }); + const jiti = getJiti(); const distCandidate = path.resolve(__dirname, "..", "..", "dist", "plugin-sdk", "index.js"); if (fs.existsSync(distCandidate)) { @@ -56,8 +95,17 @@ function loadMonolithicSdk() { return monolithicSdk; } +function tryLoadMonolithicSdk() { + try { + return loadMonolithicSdk(); + } catch { + return null; + } +} + const fastExports = { emptyPluginConfigSchema, + resolveControlCommandGate, }; const rootProxy = new Proxy(fastExports, { @@ -80,15 +128,18 @@ const rootProxy = new Proxy(fastExports, { if (Reflect.has(target, prop)) { return true; } - return prop in loadMonolithicSdk(); + const monolithic = tryLoadMonolithicSdk(); + return monolithic ? prop in monolithic : false; }, ownKeys(target) { - const keys = new Set([ - ...Reflect.ownKeys(target), - ...Reflect.ownKeys(loadMonolithicSdk()), - "default", - "__esModule", - ]); + const keys = new Set([...Reflect.ownKeys(target), "default", "__esModule"]); + // Keep Object.keys/property reflection fast and deterministic. + // Only expose monolithic keys if it was already loaded by direct access. + if (monolithicSdk) { + for (const key of Reflect.ownKeys(monolithicSdk)) { + keys.add(key); + } + } return [...keys]; }, getOwnPropertyDescriptor(target, prop) { @@ -112,12 +163,15 @@ const rootProxy = new Proxy(fastExports, { if (own) { return own; } - const descriptor = Object.getOwnPropertyDescriptor(loadMonolithicSdk(), prop); + const monolithic = tryLoadMonolithicSdk(); + if (!monolithic) { + return undefined; + } + const descriptor = Object.getOwnPropertyDescriptor(monolithic, prop); if (!descriptor) { return undefined; } if (descriptor.get || descriptor.set) { - const monolithic = loadMonolithicSdk(); return { configurable: true, enumerable: descriptor.enumerable ?? true,