From 137a3629cc7f9c43c24edeb8f3d38eec7d20666d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 23 Apr 2026 04:02:10 +0100 Subject: [PATCH] fix: harden acpx openclaw bridge routing --- CHANGELOG.md | 1 + extensions/acpx/src/runtime.test.ts | 150 ++++++++++++++++++++++- extensions/acpx/src/runtime.ts | 184 +++++++++++++++++++++++----- 3 files changed, 300 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d10c0968087..d2413e424a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai - Agents/BTW: route `/btw` side questions through provider stream registration with the session workspace, so Ollama provider URL construction and workspace-scoped hooks apply correctly. Fixes #68336. (#70413) Thanks @suboss87. - Memory search: use sqlite-vec KNN for vector recall while preserving full post-filter result limits in multi-model indexes. Fixes #69666. (#69680) Thanks @aalekh-sarvam. - Providers/OpenAI Codex: stop stale per-agent `openai-codex:default` OAuth profiles from shadowing a newer main-agent identity-scoped profile, and let `openclaw doctor` offer the matching cleanup. (#70393) Thanks @pashpashpash. +- ACPX: route OpenClaw ACP bridge commands through the MCP-free runtime path even when the command is wrapped with `env`, has bridge flags, or is resumed from persisted session state, so documented `acpx openclaw` setups no longer fail on per-session MCP injection. (#68741) Thanks @alexlomt. - Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio. - WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. (#70428) Thanks @neeravmakwana. - Providers/SDK retry: cap long `Retry-After` sleeps in Stainless-based Anthropic/OpenAI model SDKs so 60s+ retry windows surface immediately for OpenClaw failover instead of blocking the run. (#68474) Thanks @jetd1. diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index 92702f27782..13e534858b2 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -7,6 +7,9 @@ type TestSessionStore = { save(record: Record): Promise; }; +const DOCUMENTED_OPENCLAW_BRIDGE_COMMAND = + "env OPENCLAW_HIDE_BANNER=1 OPENCLAW_SUPPRESS_NOTES=1 openclaw acp --url ws://127.0.0.1:18789 --token-file ~/.openclaw/gateway.token --session agent:main:main"; + function makeRuntime( baseStore: TestSessionStore, options: Partial[0]> = {}, @@ -17,11 +20,15 @@ function makeRuntime( close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; + isHealthy(): boolean; + probeAvailability(): Promise; }; bridgeSafeDelegate: { close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; + isHealthy(): boolean; + probeAvailability(): Promise; }; } { const runtime = new AcpxRuntime({ @@ -48,6 +55,8 @@ function makeRuntime( close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; + isHealthy(): boolean; + probeAvailability(): Promise; }; } ).delegate, @@ -57,6 +66,8 @@ function makeRuntime( close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; + isHealthy(): boolean; + probeAvailability(): Promise; }; } ).bridgeSafeDelegate, @@ -130,7 +141,7 @@ describe("AcpxRuntime fresh reset wrapper", () => { discardPersistentState: true, }); expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined(); - expect(baseStore.load).not.toHaveBeenCalled(); + expect(baseStore.load).toHaveBeenCalledOnce(); }); it("routes openclaw ensureSession through the bridge-safe delegate when MCP servers are configured", async () => { @@ -292,4 +303,141 @@ describe("AcpxRuntime fresh reset wrapper", () => { expect(bridgeEnsure).toHaveBeenCalledOnce(); expect(defaultEnsure).not.toHaveBeenCalled(); }); + + it("uses the bridge-safe delegate for documented env-wrapped openclaw bridge commands", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => undefined), + save: vi.fn(async () => {}), + }; + + const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, { + mcpServers: [{ name: "tools", command: "mcp-tools" }] as never, + agentRegistry: { + resolve: (agentName: string) => + agentName === "openclaw" ? DOCUMENTED_OPENCLAW_BRIDGE_COMMAND : agentName, + list: () => ["codex", "openclaw"], + }, + }); + const defaultEnsure = vi.spyOn(delegate, "ensureSession").mockResolvedValue({ + sessionKey: "agent:openclaw:acp:test", + backend: "acpx", + runtimeSessionName: "default", + }); + const bridgeEnsure = vi.spyOn(bridgeSafeDelegate, "ensureSession").mockResolvedValue({ + sessionKey: "agent:openclaw:acp:test", + backend: "acpx", + runtimeSessionName: "bridge", + }); + + const result = await runtime.ensureSession({ + sessionKey: "agent:openclaw:acp:test", + agent: "openclaw", + mode: "persistent", + }); + + expect(result.runtimeSessionName).toBe("bridge"); + expect(bridgeEnsure).toHaveBeenCalledOnce(); + expect(defaultEnsure).not.toHaveBeenCalled(); + }); + + it("uses the bridge-safe delegate for local node openclaw entrypoints", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => undefined), + save: vi.fn(async () => {}), + }; + + const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, { + mcpServers: [{ name: "tools", command: "mcp-tools" }] as never, + agentRegistry: { + resolve: (agentName: string) => + agentName === "openclaw" ? "env OPENCLAW_HIDE_BANNER=1 node openclaw.mjs acp" : agentName, + list: () => ["codex", "openclaw"], + }, + }); + const defaultEnsure = vi.spyOn(delegate, "ensureSession").mockResolvedValue({ + sessionKey: "agent:openclaw:acp:test", + backend: "acpx", + runtimeSessionName: "default", + }); + const bridgeEnsure = vi.spyOn(bridgeSafeDelegate, "ensureSession").mockResolvedValue({ + sessionKey: "agent:openclaw:acp:test", + backend: "acpx", + runtimeSessionName: "bridge", + }); + + const result = await runtime.ensureSession({ + sessionKey: "agent:openclaw:acp:test", + agent: "openclaw", + mode: "persistent", + }); + + expect(result.runtimeSessionName).toBe("bridge"); + expect(bridgeEnsure).toHaveBeenCalledOnce(); + expect(defaultEnsure).not.toHaveBeenCalled(); + }); + + it("routes follow-up calls by persisted agent command before current config", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:openclaw:acp:test", + agentCommand: DOCUMENTED_OPENCLAW_BRIDGE_COMMAND, + })), + save: vi.fn(async () => {}), + }; + + const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, { + mcpServers: [{ name: "tools", command: "mcp-tools" }] as never, + agentRegistry: { + resolve: (agentName: string) => (agentName === "openclaw" ? "codex" : agentName), + list: () => ["codex", "openclaw"], + }, + }); + const defaultStatus = vi.spyOn(delegate, "getStatus").mockResolvedValue({ + summary: "default", + }); + const bridgeStatus = vi.spyOn(bridgeSafeDelegate, "getStatus").mockResolvedValue({ + summary: "bridge", + }); + + const status = await runtime.getStatus({ + handle: { + sessionKey: "agent:openclaw:acp:test", + backend: "acpx", + runtimeSessionName: "agent:openclaw:acp:test", + }, + }); + + expect(status.summary).toBe("bridge"); + expect(bridgeStatus).toHaveBeenCalledOnce(); + expect(defaultStatus).not.toHaveBeenCalled(); + }); + + it("probes through the bridge-safe delegate when probeAgent resolves to openclaw bridge", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => undefined), + save: vi.fn(async () => {}), + }; + + const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, { + mcpServers: [{ name: "tools", command: "mcp-tools" }] as never, + probeAgent: "openclaw", + agentRegistry: { + resolve: (agentName: string) => + agentName === "openclaw" ? DOCUMENTED_OPENCLAW_BRIDGE_COMMAND : agentName, + list: () => ["codex", "openclaw"], + }, + }); + const defaultProbe = vi.spyOn(delegate, "probeAvailability").mockResolvedValue(undefined); + const bridgeProbe = vi + .spyOn(bridgeSafeDelegate, "probeAvailability") + .mockResolvedValue(undefined); + vi.spyOn(delegate, "isHealthy").mockReturnValue(false); + vi.spyOn(bridgeSafeDelegate, "isHealthy").mockReturnValue(true); + + await runtime.probeAvailability(); + + expect(runtime.isHealthy()).toBe(true); + expect(bridgeProbe).toHaveBeenCalledOnce(); + expect(defaultProbe).not.toHaveBeenCalled(); + }); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index e20a2e1d9ba..6c77f8f90c7 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -58,7 +58,8 @@ function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSes }; } -const OPENCLAW_BRIDGE_COMMAND = "openclaw acp"; +const OPENCLAW_BRIDGE_EXECUTABLE = "openclaw"; +const OPENCLAW_BRIDGE_SUBCOMMAND = "acp"; function normalizeAgentName(value: string | undefined): string | undefined { const normalized = value?.trim().toLowerCase(); @@ -85,6 +86,95 @@ function readAgentFromHandle(handle: AcpRuntimeHandle): string | undefined { return readAgentFromSessionKey(handle.sessionKey); } +function readAgentCommandFromRecord(record: AcpLoadedSessionRecord): string | undefined { + if (typeof record !== "object" || record === null) { + return undefined; + } + const { agentCommand } = record as { agentCommand?: unknown }; + return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined; +} + +function splitCommandParts(value: string): string[] { + const parts: string[] = []; + let current = ""; + let quote: "'" | '"' | null = null; + let escaping = false; + + for (const ch of value) { + if (escaping) { + current += ch; + escaping = false; + continue; + } + if (ch === "\\" && quote !== "'") { + escaping = true; + continue; + } + if (quote) { + if (ch === quote) { + quote = null; + } else { + current += ch; + } + continue; + } + if (ch === "'" || ch === '"') { + quote = ch; + continue; + } + if (/\s/.test(ch)) { + if (current) { + parts.push(current); + current = ""; + } + continue; + } + current += ch; + } + + if (escaping) { + current += "\\"; + } + if (current) { + parts.push(current); + } + return parts; +} + +function basename(value: string): string { + return value.split(/[\\/]/).pop() ?? value; +} + +function isEnvAssignment(value: string): boolean { + return /^[A-Za-z_][A-Za-z0-9_]*=/.test(value); +} + +function unwrapEnvCommand(parts: string[]): string[] { + if (!parts.length || basename(parts[0]) !== "env") { + return parts; + } + let index = 1; + while (index < parts.length && isEnvAssignment(parts[index])) { + index += 1; + } + return parts.slice(index); +} + +function isOpenClawBridgeCommand(command: string | undefined): boolean { + if (!command) { + return false; + } + const parts = unwrapEnvCommand(splitCommandParts(command.trim())); + if (basename(parts[0] ?? "") === OPENCLAW_BRIDGE_EXECUTABLE) { + return parts[1] === OPENCLAW_BRIDGE_SUBCOMMAND; + } + if (basename(parts[0] ?? "") !== "node") { + return false; + } + const scriptName = basename(parts[1] ?? ""); + return /^openclaw(?:\.[cm]?js)?$/i.test(scriptName) && parts[2] === OPENCLAW_BRIDGE_SUBCOMMAND; +} + function resolveAgentCommand(params: { agentName: string | undefined; agentRegistry: AcpAgentRegistry; @@ -97,11 +187,20 @@ function resolveAgentCommand(params: { return typeof resolvedCommand === "string" ? resolvedCommand.trim() || undefined : undefined; } -function shouldUseBridgeSafeDelegate(params: { +function resolveProbeAgentName(options: AcpRuntimeOptions): string { + const { probeAgent } = options as { probeAgent?: unknown }; + return normalizeAgentName(typeof probeAgent === "string" ? probeAgent : undefined) ?? "codex"; +} + +function resolveAgentCommandForName(params: { agentName: string | undefined; agentRegistry: AcpAgentRegistry; -}): boolean { - return resolveAgentCommand(params) === OPENCLAW_BRIDGE_COMMAND; +}): string | undefined { + return resolveAgentCommand(params); +} + +function shouldUseBridgeSafeDelegateForCommand(command: string | undefined): boolean { + return isOpenClawBridgeCommand(command); } function shouldUseDistinctBridgeDelegate(options: AcpRuntimeOptions): boolean { @@ -114,6 +213,7 @@ export class AcpxRuntime implements AcpRuntime { private readonly agentRegistry: AcpAgentRegistry; private readonly delegate: BaseAcpxRuntime; private readonly bridgeSafeDelegate: BaseAcpxRuntime; + private readonly probeDelegate: BaseAcpxRuntime; constructor( options: AcpRuntimeOptions, @@ -135,77 +235,93 @@ export class AcpxRuntime implements AcpRuntime { testOptions, ) : this.delegate; + this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options)); } private resolveDelegateForAgent(agentName: string | undefined): BaseAcpxRuntime { - return shouldUseBridgeSafeDelegate({ + const command = resolveAgentCommandForName({ agentName, agentRegistry: this.agentRegistry, - }) - ? this.bridgeSafeDelegate - : this.delegate; + }); + return this.resolveDelegateForCommand(command); } - private resolveDelegateForHandle(handle: AcpRuntimeHandle): BaseAcpxRuntime { + private resolveDelegateForCommand(command: string | undefined): BaseAcpxRuntime { + return shouldUseBridgeSafeDelegateForCommand(command) ? this.bridgeSafeDelegate : this.delegate; + } + + private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise { + const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey); + const recordCommand = readAgentCommandFromRecord(record); + if (recordCommand) { + return this.resolveDelegateForCommand(recordCommand); + } return this.resolveDelegateForAgent(readAgentFromHandle(handle)); } isHealthy(): boolean { - return this.delegate.isHealthy(); + return this.probeDelegate.isHealthy(); } probeAvailability(): Promise { - return this.delegate.probeAvailability(); + return this.probeDelegate.probeAvailability(); } doctor(): Promise { - return this.delegate.doctor(); + return this.probeDelegate.doctor(); } ensureSession(input: Parameters[0]): Promise { return this.resolveDelegateForAgent(input.agent).ensureSession(input); } - runTurn(input: Parameters[0]): AsyncIterable { - return this.resolveDelegateForHandle(input.handle).runTurn(input); + async *runTurn(input: Parameters[0]): AsyncIterable { + yield* (await this.resolveDelegateForHandle(input.handle)).runTurn(input); } getCapabilities(): ReturnType { return this.delegate.getCapabilities(); } - getStatus(input: Parameters>[0]): Promise { - return this.resolveDelegateForHandle(input.handle).getStatus(input); + async getStatus( + input: Parameters>[0], + ): Promise { + const delegate = await this.resolveDelegateForHandle(input.handle); + return delegate.getStatus(input); } - setMode(input: Parameters>[0]): Promise { - return this.resolveDelegateForHandle(input.handle).setMode(input); + async setMode(input: Parameters>[0]): Promise { + const delegate = await this.resolveDelegateForHandle(input.handle); + await delegate.setMode(input); } - setConfigOption(input: Parameters>[0]): Promise { - return this.resolveDelegateForHandle(input.handle).setConfigOption(input); + async setConfigOption( + input: Parameters>[0], + ): Promise { + const delegate = await this.resolveDelegateForHandle(input.handle); + await delegate.setConfigOption(input); } - cancel(input: Parameters[0]): Promise { - return this.resolveDelegateForHandle(input.handle).cancel(input); + async cancel(input: Parameters[0]): Promise { + const delegate = await this.resolveDelegateForHandle(input.handle); + await delegate.cancel(input); } async prepareFreshSession(input: { sessionKey: string }): Promise { this.sessionStore.markFresh(input.sessionKey); } - close(input: Parameters[0]): Promise { - return this.resolveDelegateForHandle(input.handle) - .close({ - handle: input.handle, - reason: input.reason, - discardPersistentState: input.discardPersistentState, - }) - .then(() => { - if (input.discardPersistentState) { - this.sessionStore.markFresh(input.handle.sessionKey); - } - }); + async close(input: Parameters[0]): Promise { + await ( + await this.resolveDelegateForHandle(input.handle) + ).close({ + handle: input.handle, + reason: input.reason, + discardPersistentState: input.discardPersistentState, + }); + if (input.discardPersistentState) { + this.sessionStore.markFresh(input.handle.sessionKey); + } } }