mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:10:43 +00:00
fix: harden acpx openclaw bridge routing
This commit is contained in:
@@ -37,6 +37,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/BTW: route `/btw` side questions through provider stream registration with the session workspace, so Ollama provider URL construction and workspace-scoped hooks apply correctly. Fixes #68336. (#70413) Thanks @suboss87.
|
||||
- Memory search: use sqlite-vec KNN for vector recall while preserving full post-filter result limits in multi-model indexes. Fixes #69666. (#69680) Thanks @aalekh-sarvam.
|
||||
- Providers/OpenAI Codex: stop stale per-agent `openai-codex:default` OAuth profiles from shadowing a newer main-agent identity-scoped profile, and let `openclaw doctor` offer the matching cleanup. (#70393) Thanks @pashpashpash.
|
||||
- ACPX: route OpenClaw ACP bridge commands through the MCP-free runtime path even when the command is wrapped with `env`, has bridge flags, or is resumed from persisted session state, so documented `acpx openclaw` setups no longer fail on per-session MCP injection. (#68741) Thanks @alexlomt.
|
||||
- Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio.
|
||||
- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386. (#70428) Thanks @neeravmakwana.
|
||||
- Providers/SDK retry: cap long `Retry-After` sleeps in Stainless-based Anthropic/OpenAI model SDKs so 60s+ retry windows surface immediately for OpenClaw failover instead of blocking the run. (#68474) Thanks @jetd1.
|
||||
|
||||
@@ -7,6 +7,9 @@ type TestSessionStore = {
|
||||
save(record: Record<string, unknown>): Promise<void>;
|
||||
};
|
||||
|
||||
const DOCUMENTED_OPENCLAW_BRIDGE_COMMAND =
|
||||
"env OPENCLAW_HIDE_BANNER=1 OPENCLAW_SUPPRESS_NOTES=1 openclaw acp --url ws://127.0.0.1:18789 --token-file ~/.openclaw/gateway.token --session agent:main:main";
|
||||
|
||||
function makeRuntime(
|
||||
baseStore: TestSessionStore,
|
||||
options: Partial<ConstructorParameters<typeof AcpxRuntime>[0]> = {},
|
||||
@@ -17,11 +20,15 @@ function makeRuntime(
|
||||
close: AcpRuntime["close"];
|
||||
ensureSession: AcpRuntime["ensureSession"];
|
||||
getStatus: NonNullable<AcpRuntime["getStatus"]>;
|
||||
isHealthy(): boolean;
|
||||
probeAvailability(): Promise<void>;
|
||||
};
|
||||
bridgeSafeDelegate: {
|
||||
close: AcpRuntime["close"];
|
||||
ensureSession: AcpRuntime["ensureSession"];
|
||||
getStatus: NonNullable<AcpRuntime["getStatus"]>;
|
||||
isHealthy(): boolean;
|
||||
probeAvailability(): Promise<void>;
|
||||
};
|
||||
} {
|
||||
const runtime = new AcpxRuntime({
|
||||
@@ -48,6 +55,8 @@ function makeRuntime(
|
||||
close: AcpRuntime["close"];
|
||||
ensureSession: AcpRuntime["ensureSession"];
|
||||
getStatus: NonNullable<AcpRuntime["getStatus"]>;
|
||||
isHealthy(): boolean;
|
||||
probeAvailability(): Promise<void>;
|
||||
};
|
||||
}
|
||||
).delegate,
|
||||
@@ -57,6 +66,8 @@ function makeRuntime(
|
||||
close: AcpRuntime["close"];
|
||||
ensureSession: AcpRuntime["ensureSession"];
|
||||
getStatus: NonNullable<AcpRuntime["getStatus"]>;
|
||||
isHealthy(): boolean;
|
||||
probeAvailability(): Promise<void>;
|
||||
};
|
||||
}
|
||||
).bridgeSafeDelegate,
|
||||
@@ -130,7 +141,7 @@ describe("AcpxRuntime fresh reset wrapper", () => {
|
||||
discardPersistentState: true,
|
||||
});
|
||||
expect(await wrappedStore.load("agent:codex:acp:binding:test")).toBeUndefined();
|
||||
expect(baseStore.load).not.toHaveBeenCalled();
|
||||
expect(baseStore.load).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("routes openclaw ensureSession through the bridge-safe delegate when MCP servers are configured", async () => {
|
||||
@@ -292,4 +303,141 @@ describe("AcpxRuntime fresh reset wrapper", () => {
|
||||
expect(bridgeEnsure).toHaveBeenCalledOnce();
|
||||
expect(defaultEnsure).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses the bridge-safe delegate for documented env-wrapped openclaw bridge commands", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
|
||||
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
|
||||
mcpServers: [{ name: "tools", command: "mcp-tools" }] as never,
|
||||
agentRegistry: {
|
||||
resolve: (agentName: string) =>
|
||||
agentName === "openclaw" ? DOCUMENTED_OPENCLAW_BRIDGE_COMMAND : agentName,
|
||||
list: () => ["codex", "openclaw"],
|
||||
},
|
||||
});
|
||||
const defaultEnsure = vi.spyOn(delegate, "ensureSession").mockResolvedValue({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "default",
|
||||
});
|
||||
const bridgeEnsure = vi.spyOn(bridgeSafeDelegate, "ensureSession").mockResolvedValue({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "bridge",
|
||||
});
|
||||
|
||||
const result = await runtime.ensureSession({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
agent: "openclaw",
|
||||
mode: "persistent",
|
||||
});
|
||||
|
||||
expect(result.runtimeSessionName).toBe("bridge");
|
||||
expect(bridgeEnsure).toHaveBeenCalledOnce();
|
||||
expect(defaultEnsure).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses the bridge-safe delegate for local node openclaw entrypoints", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
|
||||
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
|
||||
mcpServers: [{ name: "tools", command: "mcp-tools" }] as never,
|
||||
agentRegistry: {
|
||||
resolve: (agentName: string) =>
|
||||
agentName === "openclaw" ? "env OPENCLAW_HIDE_BANNER=1 node openclaw.mjs acp" : agentName,
|
||||
list: () => ["codex", "openclaw"],
|
||||
},
|
||||
});
|
||||
const defaultEnsure = vi.spyOn(delegate, "ensureSession").mockResolvedValue({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "default",
|
||||
});
|
||||
const bridgeEnsure = vi.spyOn(bridgeSafeDelegate, "ensureSession").mockResolvedValue({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "bridge",
|
||||
});
|
||||
|
||||
const result = await runtime.ensureSession({
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
agent: "openclaw",
|
||||
mode: "persistent",
|
||||
});
|
||||
|
||||
expect(result.runtimeSessionName).toBe("bridge");
|
||||
expect(bridgeEnsure).toHaveBeenCalledOnce();
|
||||
expect(defaultEnsure).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("routes follow-up calls by persisted agent command before current config", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => ({
|
||||
acpxRecordId: "agent:openclaw:acp:test",
|
||||
agentCommand: DOCUMENTED_OPENCLAW_BRIDGE_COMMAND,
|
||||
})),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
|
||||
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
|
||||
mcpServers: [{ name: "tools", command: "mcp-tools" }] as never,
|
||||
agentRegistry: {
|
||||
resolve: (agentName: string) => (agentName === "openclaw" ? "codex" : agentName),
|
||||
list: () => ["codex", "openclaw"],
|
||||
},
|
||||
});
|
||||
const defaultStatus = vi.spyOn(delegate, "getStatus").mockResolvedValue({
|
||||
summary: "default",
|
||||
});
|
||||
const bridgeStatus = vi.spyOn(bridgeSafeDelegate, "getStatus").mockResolvedValue({
|
||||
summary: "bridge",
|
||||
});
|
||||
|
||||
const status = await runtime.getStatus({
|
||||
handle: {
|
||||
sessionKey: "agent:openclaw:acp:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "agent:openclaw:acp:test",
|
||||
},
|
||||
});
|
||||
|
||||
expect(status.summary).toBe("bridge");
|
||||
expect(bridgeStatus).toHaveBeenCalledOnce();
|
||||
expect(defaultStatus).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("probes through the bridge-safe delegate when probeAgent resolves to openclaw bridge", async () => {
|
||||
const baseStore: TestSessionStore = {
|
||||
load: vi.fn(async () => undefined),
|
||||
save: vi.fn(async () => {}),
|
||||
};
|
||||
|
||||
const { runtime, delegate, bridgeSafeDelegate } = makeRuntime(baseStore, {
|
||||
mcpServers: [{ name: "tools", command: "mcp-tools" }] as never,
|
||||
probeAgent: "openclaw",
|
||||
agentRegistry: {
|
||||
resolve: (agentName: string) =>
|
||||
agentName === "openclaw" ? DOCUMENTED_OPENCLAW_BRIDGE_COMMAND : agentName,
|
||||
list: () => ["codex", "openclaw"],
|
||||
},
|
||||
});
|
||||
const defaultProbe = vi.spyOn(delegate, "probeAvailability").mockResolvedValue(undefined);
|
||||
const bridgeProbe = vi
|
||||
.spyOn(bridgeSafeDelegate, "probeAvailability")
|
||||
.mockResolvedValue(undefined);
|
||||
vi.spyOn(delegate, "isHealthy").mockReturnValue(false);
|
||||
vi.spyOn(bridgeSafeDelegate, "isHealthy").mockReturnValue(true);
|
||||
|
||||
await runtime.probeAvailability();
|
||||
|
||||
expect(runtime.isHealthy()).toBe(true);
|
||||
expect(bridgeProbe).toHaveBeenCalledOnce();
|
||||
expect(defaultProbe).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -58,7 +58,8 @@ function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSes
|
||||
};
|
||||
}
|
||||
|
||||
const OPENCLAW_BRIDGE_COMMAND = "openclaw acp";
|
||||
const OPENCLAW_BRIDGE_EXECUTABLE = "openclaw";
|
||||
const OPENCLAW_BRIDGE_SUBCOMMAND = "acp";
|
||||
|
||||
function normalizeAgentName(value: string | undefined): string | undefined {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
@@ -85,6 +86,95 @@ function readAgentFromHandle(handle: AcpRuntimeHandle): string | undefined {
|
||||
return readAgentFromSessionKey(handle.sessionKey);
|
||||
}
|
||||
|
||||
function readAgentCommandFromRecord(record: AcpLoadedSessionRecord): string | undefined {
|
||||
if (typeof record !== "object" || record === null) {
|
||||
return undefined;
|
||||
}
|
||||
const { agentCommand } = record as { agentCommand?: unknown };
|
||||
return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined;
|
||||
}
|
||||
|
||||
function splitCommandParts(value: string): string[] {
|
||||
const parts: string[] = [];
|
||||
let current = "";
|
||||
let quote: "'" | '"' | null = null;
|
||||
let escaping = false;
|
||||
|
||||
for (const ch of value) {
|
||||
if (escaping) {
|
||||
current += ch;
|
||||
escaping = false;
|
||||
continue;
|
||||
}
|
||||
if (ch === "\\" && quote !== "'") {
|
||||
escaping = true;
|
||||
continue;
|
||||
}
|
||||
if (quote) {
|
||||
if (ch === quote) {
|
||||
quote = null;
|
||||
} else {
|
||||
current += ch;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (ch === "'" || ch === '"') {
|
||||
quote = ch;
|
||||
continue;
|
||||
}
|
||||
if (/\s/.test(ch)) {
|
||||
if (current) {
|
||||
parts.push(current);
|
||||
current = "";
|
||||
}
|
||||
continue;
|
||||
}
|
||||
current += ch;
|
||||
}
|
||||
|
||||
if (escaping) {
|
||||
current += "\\";
|
||||
}
|
||||
if (current) {
|
||||
parts.push(current);
|
||||
}
|
||||
return parts;
|
||||
}
|
||||
|
||||
function basename(value: string): string {
|
||||
return value.split(/[\\/]/).pop() ?? value;
|
||||
}
|
||||
|
||||
function isEnvAssignment(value: string): boolean {
|
||||
return /^[A-Za-z_][A-Za-z0-9_]*=/.test(value);
|
||||
}
|
||||
|
||||
function unwrapEnvCommand(parts: string[]): string[] {
|
||||
if (!parts.length || basename(parts[0]) !== "env") {
|
||||
return parts;
|
||||
}
|
||||
let index = 1;
|
||||
while (index < parts.length && isEnvAssignment(parts[index])) {
|
||||
index += 1;
|
||||
}
|
||||
return parts.slice(index);
|
||||
}
|
||||
|
||||
function isOpenClawBridgeCommand(command: string | undefined): boolean {
|
||||
if (!command) {
|
||||
return false;
|
||||
}
|
||||
const parts = unwrapEnvCommand(splitCommandParts(command.trim()));
|
||||
if (basename(parts[0] ?? "") === OPENCLAW_BRIDGE_EXECUTABLE) {
|
||||
return parts[1] === OPENCLAW_BRIDGE_SUBCOMMAND;
|
||||
}
|
||||
if (basename(parts[0] ?? "") !== "node") {
|
||||
return false;
|
||||
}
|
||||
const scriptName = basename(parts[1] ?? "");
|
||||
return /^openclaw(?:\.[cm]?js)?$/i.test(scriptName) && parts[2] === OPENCLAW_BRIDGE_SUBCOMMAND;
|
||||
}
|
||||
|
||||
function resolveAgentCommand(params: {
|
||||
agentName: string | undefined;
|
||||
agentRegistry: AcpAgentRegistry;
|
||||
@@ -97,11 +187,20 @@ function resolveAgentCommand(params: {
|
||||
return typeof resolvedCommand === "string" ? resolvedCommand.trim() || undefined : undefined;
|
||||
}
|
||||
|
||||
function shouldUseBridgeSafeDelegate(params: {
|
||||
function resolveProbeAgentName(options: AcpRuntimeOptions): string {
|
||||
const { probeAgent } = options as { probeAgent?: unknown };
|
||||
return normalizeAgentName(typeof probeAgent === "string" ? probeAgent : undefined) ?? "codex";
|
||||
}
|
||||
|
||||
function resolveAgentCommandForName(params: {
|
||||
agentName: string | undefined;
|
||||
agentRegistry: AcpAgentRegistry;
|
||||
}): boolean {
|
||||
return resolveAgentCommand(params) === OPENCLAW_BRIDGE_COMMAND;
|
||||
}): string | undefined {
|
||||
return resolveAgentCommand(params);
|
||||
}
|
||||
|
||||
function shouldUseBridgeSafeDelegateForCommand(command: string | undefined): boolean {
|
||||
return isOpenClawBridgeCommand(command);
|
||||
}
|
||||
|
||||
function shouldUseDistinctBridgeDelegate(options: AcpRuntimeOptions): boolean {
|
||||
@@ -114,6 +213,7 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
private readonly agentRegistry: AcpAgentRegistry;
|
||||
private readonly delegate: BaseAcpxRuntime;
|
||||
private readonly bridgeSafeDelegate: BaseAcpxRuntime;
|
||||
private readonly probeDelegate: BaseAcpxRuntime;
|
||||
|
||||
constructor(
|
||||
options: AcpRuntimeOptions,
|
||||
@@ -135,77 +235,93 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
testOptions,
|
||||
)
|
||||
: this.delegate;
|
||||
this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options));
|
||||
}
|
||||
|
||||
private resolveDelegateForAgent(agentName: string | undefined): BaseAcpxRuntime {
|
||||
return shouldUseBridgeSafeDelegate({
|
||||
const command = resolveAgentCommandForName({
|
||||
agentName,
|
||||
agentRegistry: this.agentRegistry,
|
||||
})
|
||||
? this.bridgeSafeDelegate
|
||||
: this.delegate;
|
||||
});
|
||||
return this.resolveDelegateForCommand(command);
|
||||
}
|
||||
|
||||
private resolveDelegateForHandle(handle: AcpRuntimeHandle): BaseAcpxRuntime {
|
||||
private resolveDelegateForCommand(command: string | undefined): BaseAcpxRuntime {
|
||||
return shouldUseBridgeSafeDelegateForCommand(command) ? this.bridgeSafeDelegate : this.delegate;
|
||||
}
|
||||
|
||||
private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise<BaseAcpxRuntime> {
|
||||
const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey);
|
||||
const recordCommand = readAgentCommandFromRecord(record);
|
||||
if (recordCommand) {
|
||||
return this.resolveDelegateForCommand(recordCommand);
|
||||
}
|
||||
return this.resolveDelegateForAgent(readAgentFromHandle(handle));
|
||||
}
|
||||
|
||||
isHealthy(): boolean {
|
||||
return this.delegate.isHealthy();
|
||||
return this.probeDelegate.isHealthy();
|
||||
}
|
||||
|
||||
probeAvailability(): Promise<void> {
|
||||
return this.delegate.probeAvailability();
|
||||
return this.probeDelegate.probeAvailability();
|
||||
}
|
||||
|
||||
doctor(): Promise<AcpRuntimeDoctorReport> {
|
||||
return this.delegate.doctor();
|
||||
return this.probeDelegate.doctor();
|
||||
}
|
||||
|
||||
ensureSession(input: Parameters<AcpRuntime["ensureSession"]>[0]): Promise<AcpRuntimeHandle> {
|
||||
return this.resolveDelegateForAgent(input.agent).ensureSession(input);
|
||||
}
|
||||
|
||||
runTurn(input: Parameters<AcpRuntime["runTurn"]>[0]): AsyncIterable<AcpRuntimeEvent> {
|
||||
return this.resolveDelegateForHandle(input.handle).runTurn(input);
|
||||
async *runTurn(input: Parameters<AcpRuntime["runTurn"]>[0]): AsyncIterable<AcpRuntimeEvent> {
|
||||
yield* (await this.resolveDelegateForHandle(input.handle)).runTurn(input);
|
||||
}
|
||||
|
||||
getCapabilities(): ReturnType<BaseAcpxRuntime["getCapabilities"]> {
|
||||
return this.delegate.getCapabilities();
|
||||
}
|
||||
|
||||
getStatus(input: Parameters<NonNullable<AcpRuntime["getStatus"]>>[0]): Promise<AcpRuntimeStatus> {
|
||||
return this.resolveDelegateForHandle(input.handle).getStatus(input);
|
||||
async getStatus(
|
||||
input: Parameters<NonNullable<AcpRuntime["getStatus"]>>[0],
|
||||
): Promise<AcpRuntimeStatus> {
|
||||
const delegate = await this.resolveDelegateForHandle(input.handle);
|
||||
return delegate.getStatus(input);
|
||||
}
|
||||
|
||||
setMode(input: Parameters<NonNullable<AcpRuntime["setMode"]>>[0]): Promise<void> {
|
||||
return this.resolveDelegateForHandle(input.handle).setMode(input);
|
||||
async setMode(input: Parameters<NonNullable<AcpRuntime["setMode"]>>[0]): Promise<void> {
|
||||
const delegate = await this.resolveDelegateForHandle(input.handle);
|
||||
await delegate.setMode(input);
|
||||
}
|
||||
|
||||
setConfigOption(input: Parameters<NonNullable<AcpRuntime["setConfigOption"]>>[0]): Promise<void> {
|
||||
return this.resolveDelegateForHandle(input.handle).setConfigOption(input);
|
||||
async setConfigOption(
|
||||
input: Parameters<NonNullable<AcpRuntime["setConfigOption"]>>[0],
|
||||
): Promise<void> {
|
||||
const delegate = await this.resolveDelegateForHandle(input.handle);
|
||||
await delegate.setConfigOption(input);
|
||||
}
|
||||
|
||||
cancel(input: Parameters<AcpRuntime["cancel"]>[0]): Promise<void> {
|
||||
return this.resolveDelegateForHandle(input.handle).cancel(input);
|
||||
async cancel(input: Parameters<AcpRuntime["cancel"]>[0]): Promise<void> {
|
||||
const delegate = await this.resolveDelegateForHandle(input.handle);
|
||||
await delegate.cancel(input);
|
||||
}
|
||||
|
||||
async prepareFreshSession(input: { sessionKey: string }): Promise<void> {
|
||||
this.sessionStore.markFresh(input.sessionKey);
|
||||
}
|
||||
|
||||
close(input: Parameters<AcpRuntime["close"]>[0]): Promise<void> {
|
||||
return this.resolveDelegateForHandle(input.handle)
|
||||
.close({
|
||||
handle: input.handle,
|
||||
reason: input.reason,
|
||||
discardPersistentState: input.discardPersistentState,
|
||||
})
|
||||
.then(() => {
|
||||
if (input.discardPersistentState) {
|
||||
this.sessionStore.markFresh(input.handle.sessionKey);
|
||||
}
|
||||
});
|
||||
async close(input: Parameters<AcpRuntime["close"]>[0]): Promise<void> {
|
||||
await (
|
||||
await this.resolveDelegateForHandle(input.handle)
|
||||
).close({
|
||||
handle: input.handle,
|
||||
reason: input.reason,
|
||||
discardPersistentState: input.discardPersistentState,
|
||||
});
|
||||
if (input.discardPersistentState) {
|
||||
this.sessionStore.markFresh(input.handle.sessionKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user