From 3b5dab372ac2501097f9fcc82ce107bba2db282d Mon Sep 17 00:00:00 2001 From: pash-openai Date: Wed, 29 Apr 2026 16:57:12 -0400 Subject: [PATCH] Keep Codex Computer Use hook relays live across turns (#74107) * Fix Codex native hook relay across processes * fix: harden native hook relay bridge * test: stabilize pairing store cache assertion --------- Co-authored-by: pashpashpash --- docs/plugins/codex-computer-use.md | 8 +- .../codex/src/app-server/run-attempt.test.ts | 65 +++ .../codex/src/app-server/run-attempt.ts | 21 + src/agents/harness/native-hook-relay.test.ts | 206 ++++++++ src/agents/harness/native-hook-relay.ts | 489 +++++++++++++++++- src/cli/native-hook-relay-cli.ts | 18 + src/pairing/pairing-store.test.ts | 104 ++-- 7 files changed, 843 insertions(+), 68 deletions(-) diff --git a/docs/plugins/codex-computer-use.md b/docs/plugins/codex-computer-use.md index 8ca1f29533b..a60e92b365b 100644 --- a/docs/plugins/codex-computer-use.md +++ b/docs/plugins/codex-computer-use.md @@ -281,10 +281,10 @@ restart Codex Computer Use, relaunch Codex Desktop if needed, then retry in a fresh OpenClaw session. **A Computer Use tool says `Native hook relay unavailable`.** The Codex-native -tool hook reached OpenClaw with a stale or missing relay registration. Start a -fresh OpenClaw session with `/new` or `/reset`. If it keeps happening, restart -the gateway so old app-server threads and hook registrations are dropped, then -retry. +tool hook could not reach an active OpenClaw relay through the local bridge or +Gateway fallback. Start a fresh OpenClaw session with `/new` or `/reset`. If it +keeps happening, restart the gateway so old app-server threads and hook +registrations are dropped, then retry. **Turn-start auto-install refuses a source.** This is intentional. Add the source with explicit `/codex computer-use install --source ` diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 3f99e5f7931..3c09a6dac73 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -735,6 +735,71 @@ describe("runCodexAppServerAttempt", () => { expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined(); }); + it("reuses the Codex native hook relay id across runs for the same session", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const firstHarness = createStartedThreadHarness(); + + const firstRun = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir), { + nativeHookRelay: { + enabled: true, + events: ["pre_tool_use"], + }, + }); + await firstHarness.waitForMethod("turn/start"); + await firstHarness.completeTurn({ threadId: "thread-1", turnId: "turn-1" }); + await firstRun; + + const firstStartRequest = firstHarness.requests.find( + (request) => request.method === "thread/start", + ); + const firstRelayId = extractRelayIdFromThreadRequest(firstStartRequest?.params); + expect( + nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId), + ).toBeUndefined(); + + const secondHarness = createResumeHarness(); + const secondParams = createParams(sessionFile, workspaceDir); + secondParams.runId = "run-2"; + const secondRun = runCodexAppServerAttempt(secondParams, { + nativeHookRelay: { + enabled: true, + events: ["pre_tool_use"], + }, + }); + await secondHarness.waitForMethod("turn/start"); + + const resumeRequest = secondHarness.requests.find( + (request) => request.method === "thread/resume", + ); + const secondRelayId = extractRelayIdFromThreadRequest(resumeRequest?.params); + expect(secondRelayId).toBe(firstRelayId); + expect( + nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId), + ).toMatchObject({ + runId: "run-2", + allowedEvents: ["pre_tool_use"], + }); + + await secondHarness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" }); + await secondRun; + expect( + nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId), + ).toBeUndefined(); + }); + + it("builds deterministic opaque Codex native hook relay ids", () => { + const relayId = __testing.buildCodexNativeHookRelayId({ + agentId: "dev-codex", + sessionId: "cu-pr-relay-smoke", + sessionKey: "agent:dev-codex:cu-pr-relay-smoke", + }); + + expect(relayId).toBe("codex-8810b5252975550c887ff0def512b25e944bac39"); + expect(relayId).not.toContain("dev-codex"); + expect(relayId).not.toContain("cu-pr-relay-smoke"); + }); + it("sends clearing Codex native hook config when the relay is disabled", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index ce0d0f895f1..4b3a9e4e79a 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs/promises"; import { SessionManager } from "@mariozechner/pi-coding-agent"; import { @@ -1009,6 +1010,11 @@ function createCodexNativeHookRelay(params: { } return registerNativeHookRelay({ provider: "codex", + relayId: buildCodexNativeHookRelayId({ + agentId: params.agentId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + }), ...(params.agentId ? { agentId: params.agentId } : {}), sessionId: params.sessionId, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), @@ -1022,6 +1028,20 @@ function createCodexNativeHookRelay(params: { }); } +function buildCodexNativeHookRelayId(params: { + agentId: string | undefined; + sessionId: string; + sessionKey: string | undefined; +}): string { + const hash = createHash("sha256"); + hash.update("openclaw:codex:native-hook-relay:v1"); + hash.update("\0"); + hash.update(params.agentId?.trim() || ""); + hash.update("\0"); + hash.update(params.sessionKey?.trim() || params.sessionId); + return `codex-${hash.digest("hex").slice(0, 40)}`; +} + function interruptCodexTurnBestEffort( client: CodexAppServerClient, params: { @@ -1297,6 +1317,7 @@ function handleApprovalRequest(params: { export const __testing = { CODEX_DYNAMIC_TOOL_TIMEOUT_MS, CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS, + buildCodexNativeHookRelayId, filterToolsForVisionInputs, handleDynamicToolCallWithTimeout, ...createCodexAppServerClientFactoryTestHooks((factory) => { diff --git a/src/agents/harness/native-hook-relay.test.ts b/src/agents/harness/native-hook-relay.test.ts index b3e50786d4c..16f8583b7b8 100644 --- a/src/agents/harness/native-hook-relay.test.ts +++ b/src/agents/harness/native-hook-relay.test.ts @@ -1,3 +1,5 @@ +import { statSync, writeFileSync } from "node:fs"; +import { createServer } from "node:http"; import { afterEach, describe, expect, it, vi } from "vitest"; import { initializeGlobalHookRunner, @@ -8,6 +10,7 @@ import { __testing, buildNativeHookRelayCommand, invokeNativeHookRelay, + invokeNativeHookRelayBridge, registerNativeHookRelay, } from "./native-hook-relay.js"; @@ -17,6 +20,17 @@ afterEach(() => { __testing.clearNativeHookRelaysForTests(); }); +async function waitForNativeHookRelayBridgeRecord( + relayId: string, +): Promise> { + let record: Record | undefined; + await vi.waitFor(() => { + record = __testing.getNativeHookRelayBridgeRecordForTests(relayId); + expect(record).toBeDefined(); + }); + return record!; +} + describe("native hook relay registry", () => { it("registers a short-lived relay and builds hidden CLI commands", () => { const relay = registerNativeHookRelay({ @@ -46,6 +60,198 @@ describe("native hook relay registry", () => { ); }); + it("allows callers to replace a relay at a stable id", () => { + const first = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-stable-session", + sessionId: "session-1", + runId: "run-1", + allowedEvents: ["pre_tool_use"], + }); + + const second = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-stable-session", + sessionId: "session-1", + runId: "run-2", + allowedEvents: ["post_tool_use"], + }); + + expect(second.relayId).toBe(first.relayId); + expect(__testing.getNativeHookRelayRegistrationForTests(first.relayId)).toMatchObject({ + runId: "run-2", + allowedEvents: ["post_tool_use"], + }); + }); + + it("exposes registered relays through the direct hook bridge", async () => { + const relay = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-bridge-session", + sessionId: "session-1", + runId: "run-1", + allowedEvents: ["pre_tool_use"], + }); + + const response = await invokeNativeHookRelayBridge({ + provider: "codex", + relayId: relay.relayId, + event: "pre_tool_use", + timeoutMs: 2_000, + rawPayload: { + hook_event_name: "PreToolUse", + tool_name: "Bash", + tool_input: { command: "pnpm test" }, + }, + }); + + expect(response).toEqual({ stdout: "", stderr: "", exitCode: 0 }); + expect(__testing.getNativeHookRelayInvocationsForTests()).toEqual([ + expect.objectContaining({ + relayId: relay.relayId, + event: "pre_tool_use", + runId: "run-1", + }), + ]); + }); + + it("keeps direct bridge registry files private and loopback-only", async () => { + const relay = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-private-bridge-session", + sessionId: "session-1", + runId: "run-1", + allowedEvents: ["pre_tool_use"], + }); + + const record = await waitForNativeHookRelayBridgeRecord(relay.relayId); + const bridgeDir = __testing.getNativeHookRelayBridgeDirForTests(); + const registryPath = __testing.getNativeHookRelayBridgeRegistryPathForTests(relay.relayId); + expect(statSync(bridgeDir).mode & 0o077).toBe(0); + expect(statSync(registryPath).mode & 0o077).toBe(0); + + writeFileSync( + registryPath, + `${JSON.stringify({ + ...record, + hostname: "192.0.2.1", + expiresAtMs: Date.now() + 10_000, + })}\n`, + { mode: 0o600 }, + ); + + await expect( + invokeNativeHookRelayBridge({ + provider: "codex", + relayId: relay.relayId, + event: "pre_tool_use", + registrationTimeoutMs: 1, + timeoutMs: 50, + rawPayload: { + hook_event_name: "PreToolUse", + tool_name: "Bash", + tool_input: { command: "pnpm test" }, + }, + }), + ).rejects.toThrow("native hook relay bridge not found"); + }); + + it("binds direct bridge tokens to the relay they were issued for", async () => { + const first = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-first-bridge-session", + sessionId: "session-1", + runId: "run-1", + allowedEvents: ["pre_tool_use"], + }); + const second = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-second-bridge-session", + sessionId: "session-2", + runId: "run-2", + allowedEvents: ["pre_tool_use"], + }); + + const firstRecord = await waitForNativeHookRelayBridgeRecord(first.relayId); + await waitForNativeHookRelayBridgeRecord(second.relayId); + writeFileSync( + __testing.getNativeHookRelayBridgeRegistryPathForTests(second.relayId), + `${JSON.stringify({ + ...firstRecord, + relayId: second.relayId, + expiresAtMs: Date.now() + 10_000, + })}\n`, + { mode: 0o600 }, + ); + + await expect( + invokeNativeHookRelayBridge({ + provider: "codex", + relayId: second.relayId, + event: "pre_tool_use", + timeoutMs: 500, + rawPayload: { + hook_event_name: "PreToolUse", + tool_name: "Bash", + tool_input: { command: "pnpm test" }, + }, + }), + ).rejects.toThrow("native hook relay bridge target mismatch"); + expect(__testing.getNativeHookRelayInvocationsForTests()).toEqual([]); + }); + + it("rejects oversized direct bridge responses", async () => { + const relay = registerNativeHookRelay({ + provider: "codex", + relayId: "codex-oversized-bridge-response", + sessionId: "session-1", + runId: "run-1", + allowedEvents: ["pre_tool_use"], + }); + const record = await waitForNativeHookRelayBridgeRecord(relay.relayId); + const server = createServer((_req, res) => { + res.writeHead(200, { "content-type": "application/json" }); + res.end("x".repeat(5_000_001)); + }); + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + try { + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("test bridge server address unavailable"); + } + writeFileSync( + __testing.getNativeHookRelayBridgeRegistryPathForTests(relay.relayId), + `${JSON.stringify({ + ...record, + port: address.port, + token: "test-token", + expiresAtMs: Date.now() + 10_000, + })}\n`, + { mode: 0o600 }, + ); + + await expect( + invokeNativeHookRelayBridge({ + provider: "codex", + relayId: relay.relayId, + event: "pre_tool_use", + timeoutMs: 500, + rawPayload: { + hook_event_name: "PreToolUse", + tool_name: "Bash", + tool_input: { command: "pnpm test" }, + }, + }), + ).rejects.toThrow("native hook relay bridge response too large"); + } finally { + await new Promise((resolve) => { + server.close(() => resolve()); + }); + } + }); + it("accepts an allowed Codex invocation and preserves raw payload", async () => { const relay = registerNativeHookRelay({ provider: "codex", diff --git a/src/agents/harness/native-hook-relay.ts b/src/agents/harness/native-hook-relay.ts index d1b201fd817..7ac309c96f1 100644 --- a/src/agents/harness/native-hook-relay.ts +++ b/src/agents/harness/native-hook-relay.ts @@ -1,6 +1,24 @@ import { createHash, randomUUID } from "node:crypto"; -import { existsSync } from "node:fs"; +import { + chmodSync, + existsSync, + lstatSync, + mkdirSync, + readFileSync, + renameSync, + rmSync, + writeFileSync, +} from "node:fs"; +import { + createServer, + request as httpRequest, + type IncomingMessage, + type Server, + type ServerResponse, +} from "node:http"; +import { tmpdir } from "node:os"; import path from "node:path"; +import { resolveOpenClawPackageRootSync } from "../../infra/openclaw-root.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { PluginApprovalResolutions } from "../../plugins/types.js"; import { runBeforeToolCallHook } from "../pi-tools.before-tool-call.js"; @@ -76,6 +94,7 @@ export type NativeHookRelayRegistrationHandle = NativeHookRelayRegistration & { export type RegisterNativeHookRelayParams = { provider: NativeHookRelayProvider; + relayId?: string; agentId?: string; sessionId: string; sessionKey?: string; @@ -99,6 +118,11 @@ export type InvokeNativeHookRelayParams = { rawPayload: unknown; }; +export type InvokeNativeHookRelayBridgeParams = InvokeNativeHookRelayParams & { + registrationTimeoutMs?: number; + timeoutMs?: number; +}; + type NativeHookRelayInvocationMetadata = Partial< Pick< NativeHookRelayInvocation, @@ -148,8 +172,12 @@ const MAX_APPROVAL_TITLE_LENGTH = 80; const MAX_APPROVAL_DESCRIPTION_LENGTH = 700; const MAX_PERMISSION_APPROVALS_PER_WINDOW = 12; const PERMISSION_APPROVAL_WINDOW_MS = 60_000; +const MAX_NATIVE_HOOK_BRIDGE_BODY_BYTES = 5_000_000; +const MAX_NATIVE_HOOK_BRIDGE_RESPONSE_BYTES = 5_000_000; +const NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS = 25; const ANSI_ESCAPE_PATTERN = new RegExp(`${String.fromCharCode(27)}\\[[0-?]*[ -/]*[@-~]`, "g"); const relays = new Map(); +const relayBridges = new Map(); const invocations: NativeHookRelayInvocation[] = []; const pendingPermissionApprovals = new Map< string, @@ -180,6 +208,23 @@ type NativeHookRelayPermissionApprovalRequester = ( request: NativeHookRelayPermissionApprovalRequest, ) => Promise; +type NativeHookRelayBridgeRegistration = { + relayId: string; + registryPath: string; + token: string; + server: Server; +}; + +type NativeHookRelayBridgeRecord = { + version: 1; + relayId: string; + pid: number; + hostname: string; + port: number; + token: string; + expiresAtMs: number; +}; + let nativeHookRelayPermissionApprovalRequester: NativeHookRelayPermissionApprovalRequester = requestNativeHookRelayPermissionApproval; @@ -245,8 +290,9 @@ export function registerNativeHookRelay( params: RegisterNativeHookRelayParams, ): NativeHookRelayRegistrationHandle { pruneExpiredNativeHookRelays(); - const relayId = randomUUID(); + const relayId = normalizeRelayId(params.relayId) ?? randomUUID(); const allowedEvents = normalizeAllowedEvents(params.allowedEvents); + unregisterNativeHookRelay(relayId); const registration: NativeHookRelayRegistration = { relayId, provider: params.provider, @@ -259,6 +305,7 @@ export function registerNativeHookRelay( ...(params.signal ? { signal: params.signal } : {}), }; relays.set(relayId, registration); + registerNativeHookRelayBridge(registration); return { ...registration, commandForEvent: (event) => @@ -275,11 +322,23 @@ export function registerNativeHookRelay( } export function unregisterNativeHookRelay(relayId: string): void { + unregisterNativeHookRelayBridge(relayId); relays.delete(relayId); removeNativeHookRelayInvocations(relayId); removeNativeHookRelayPermissionState(relayId); } +function normalizeRelayId(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + if (trimmed.length > 160 || !/^[A-Za-z0-9._:-]+$/u.test(trimmed)) { + throw new Error("native hook relay id must be non-empty, compact, and URL-safe"); + } + return trimmed; +} + export function buildNativeHookRelayCommand(params: { provider: NativeHookRelayProvider; relayId: string; @@ -348,6 +407,52 @@ export async function invokeNativeHookRelay( }); } +export async function invokeNativeHookRelayBridge( + params: InvokeNativeHookRelayBridgeParams, +): Promise { + const provider = readNativeHookRelayProvider(params.provider); + const relayId = readNonEmptyString(params.relayId, "relayId"); + const event = readNativeHookRelayEvent(params.event); + const timeoutMs = normalizePositiveInteger(params.timeoutMs, DEFAULT_RELAY_TIMEOUT_MS); + const registrationTimeoutMs = normalizePositiveInteger(params.registrationTimeoutMs, timeoutMs); + const startedAt = Date.now(); + let lastError: unknown = new Error("native hook relay bridge not found"); + while (Date.now() - startedAt < timeoutMs) { + try { + const record = readNativeHookRelayBridgeRecord(relayId); + if (Date.now() > record.expiresAtMs) { + throw new Error("native hook relay bridge expired"); + } + return await invokeNativeHookRelayBridgeRecord({ + record, + timeoutMs: Math.max(1, timeoutMs - (Date.now() - startedAt)), + payload: { + provider, + relayId, + event, + rawPayload: params.rawPayload, + }, + }); + } catch (error) { + lastError = error; + if ( + error instanceof Error && + error.message === "native hook relay bridge not found" && + Date.now() - startedAt >= registrationTimeoutMs + ) { + break; + } + if (!isRetryableNativeHookRelayBridgeError(error)) { + break; + } + await delay( + Math.min(NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS, timeoutMs - (Date.now() - startedAt)), + ); + } + } + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} + export function renderNativeHookRelayUnavailableResponse(params: { provider: unknown; event: unknown; @@ -388,11 +493,358 @@ function pruneExpiredNativeHookRelays(now = Date.now()): void { for (const [relayId, registration] of relays) { if (now > registration.expiresAtMs) { relays.delete(relayId); + unregisterNativeHookRelayBridge(relayId); removeNativeHookRelayInvocations(relayId); } } } +function registerNativeHookRelayBridge(registration: NativeHookRelayRegistration): void { + unregisterNativeHookRelayBridge(registration.relayId); + const token = randomUUID(); + const bridgeDir = ensureNativeHookRelayBridgeDir(); + const bridgeKey = nativeHookRelayBridgeKey(registration.relayId); + const registryPath = path.join(bridgeDir, `${bridgeKey}.json`); + const server = createServer((req, res) => { + void handleNativeHookRelayBridgeRequest(req, res, { + provider: registration.provider, + relayId: registration.relayId, + token, + }); + }); + const bridge: NativeHookRelayBridgeRegistration = { + relayId: registration.relayId, + registryPath, + token, + server, + }; + relayBridges.set(registration.relayId, bridge); + server.on("error", (error) => { + log.debug("native hook relay bridge server error", { error, relayId: registration.relayId }); + }); + server.listen(0, "127.0.0.1", () => { + if (relayBridges.get(registration.relayId) !== bridge) { + return; + } + const address = server.address(); + if (!address || typeof address === "string") { + log.debug("native hook relay bridge server address unavailable", { + relayId: registration.relayId, + }); + return; + } + const record: NativeHookRelayBridgeRecord = { + version: 1, + relayId: registration.relayId, + pid: process.pid, + hostname: "127.0.0.1", + port: address.port, + token, + expiresAtMs: registration.expiresAtMs, + }; + writeNativeHookRelayBridgeRecord(registryPath, record); + }); + server.unref(); +} + +function unregisterNativeHookRelayBridge(relayId: string): void { + const bridge = relayBridges.get(relayId); + if (!bridge) { + return; + } + relayBridges.delete(relayId); + bridge.server.close(); + const record = readNativeHookRelayBridgeRecordIfExists(relayId); + if (record?.token === bridge.token) { + rmSync(bridge.registryPath, { force: true }); + } +} + +async function handleNativeHookRelayBridgeRequest( + req: IncomingMessage, + res: ServerResponse, + auth: { provider: NativeHookRelayProvider; relayId: string; token: string }, +): Promise { + try { + if (req.method !== "POST" || req.url !== "/invoke") { + writeNativeHookRelayBridgeJson(res, 404, { ok: false, error: "not found" }); + return; + } + if (req.headers.authorization !== `Bearer ${auth.token}`) { + writeNativeHookRelayBridgeJson(res, 403, { ok: false, error: "forbidden" }); + return; + } + const body = await readNativeHookRelayBridgeBody(req); + const payload = readNativeHookRelayBridgePayload(JSON.parse(body)); + if (payload.provider !== auth.provider || payload.relayId !== auth.relayId) { + writeNativeHookRelayBridgeJson(res, 403, { + ok: false, + error: "native hook relay bridge target mismatch", + }); + return; + } + const result = await invokeNativeHookRelay(payload); + writeNativeHookRelayBridgeJson(res, 200, { ok: true, result }); + } catch (error) { + writeNativeHookRelayBridgeJson(res, 500, { + ok: false, + error: error instanceof Error ? error.message : String(error), + }); + } +} + +async function readNativeHookRelayBridgeBody(req: NodeJS.ReadableStream): Promise { + const chunks: Buffer[] = []; + let total = 0; + for await (const chunk of req) { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + total += buffer.byteLength; + if (total > MAX_NATIVE_HOOK_BRIDGE_BODY_BYTES) { + throw new Error("native hook relay bridge payload too large"); + } + chunks.push(buffer); + } + return Buffer.concat(chunks, total).toString("utf8"); +} + +function readNativeHookRelayBridgePayload(value: unknown): InvokeNativeHookRelayParams { + if (!isJsonObject(value)) { + throw new Error("native hook relay bridge payload must be an object"); + } + return { + provider: value.provider, + relayId: value.relayId, + event: value.event, + rawPayload: value.rawPayload, + }; +} + +function writeNativeHookRelayBridgeJson( + res: ServerResponse, + statusCode: number, + payload: unknown, +): void { + const body = JSON.stringify(payload); + res.writeHead(statusCode, { + "content-type": "application/json", + "content-length": Buffer.byteLength(body), + }); + res.end(body); +} + +function readNativeHookRelayBridgeRecord(relayId: string): NativeHookRelayBridgeRecord { + const record = readNativeHookRelayBridgeRecordIfExists(relayId); + if (!record) { + throw new Error("native hook relay bridge not found"); + } + return record; +} + +function readNativeHookRelayBridgeRecordIfExists( + relayId: string, +): NativeHookRelayBridgeRecord | undefined { + const registryPath = nativeHookRelayBridgeRegistryPath(relayId); + try { + const parsed: unknown = JSON.parse(readFileSync(registryPath, "utf8")); + if (isNativeHookRelayBridgeRecord(parsed, relayId)) { + return parsed; + } + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + log.debug("failed to read native hook relay bridge registry", { error, relayId }); + } + } + return undefined; +} + +function isNativeHookRelayBridgeRecord( + value: unknown, + relayId: string, +): value is NativeHookRelayBridgeRecord { + return ( + isJsonObject(value) && + value.version === 1 && + value.relayId === relayId && + typeof value.pid === "number" && + Number.isInteger(value.pid) && + value.hostname === "127.0.0.1" && + typeof value.port === "number" && + Number.isInteger(value.port) && + value.port > 0 && + value.port <= 65_535 && + typeof value.token === "string" && + value.token.length > 0 && + typeof value.expiresAtMs === "number" + ); +} + +async function invokeNativeHookRelayBridgeRecord(params: { + record: NativeHookRelayBridgeRecord; + timeoutMs: number; + payload: InvokeNativeHookRelayParams; +}): Promise { + const startedAt = Date.now(); + let lastError: unknown; + while (Date.now() - startedAt < params.timeoutMs) { + try { + return await postNativeHookRelayBridgeRecord({ + ...params, + timeoutMs: Math.max(1, params.timeoutMs - (Date.now() - startedAt)), + }); + } catch (error) { + lastError = error; + if (!isRetryableNativeHookRelayBridgeError(error)) { + break; + } + await delay( + Math.min(NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS, params.timeoutMs - (Date.now() - startedAt)), + ); + } + } + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} + +function postNativeHookRelayBridgeRecord(params: { + record: NativeHookRelayBridgeRecord; + timeoutMs: number; + payload: InvokeNativeHookRelayParams; +}): Promise { + const body = JSON.stringify(params.payload); + return new Promise((resolve, reject) => { + let settled = false; + const resolveOnce = (value: NativeHookRelayProcessResponse) => { + if (!settled) { + settled = true; + resolve(value); + } + }; + const rejectOnce = (error: unknown) => { + if (!settled) { + settled = true; + reject(error); + } + }; + const req = httpRequest( + { + hostname: params.record.hostname, + method: "POST", + path: "/invoke", + port: params.record.port, + timeout: params.timeoutMs, + headers: { + authorization: `Bearer ${params.record.token}`, + "content-type": "application/json", + "content-length": Buffer.byteLength(body), + }, + }, + (res) => { + let responseText = ""; + let responseBytes = 0; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + const chunkText = typeof chunk === "string" ? chunk : String(chunk); + responseBytes += Buffer.byteLength(chunkText); + if (responseBytes > MAX_NATIVE_HOOK_BRIDGE_RESPONSE_BYTES) { + rejectOnce(new Error("native hook relay bridge response too large")); + res.destroy(); + return; + } + responseText += chunkText; + }); + res.on("error", rejectOnce); + res.on("end", () => { + if (settled) { + return; + } + try { + const parsed = JSON.parse(responseText) as + | { ok: true; result: NativeHookRelayProcessResponse } + | { ok: false; error?: string }; + if (parsed.ok) { + resolveOnce(parsed.result); + return; + } + rejectOnce(new Error(parsed.error || "native hook relay bridge failed")); + } catch (error) { + rejectOnce(error); + } + }); + }, + ); + req.on("timeout", () => { + req.destroy(new Error("native hook relay bridge timed out")); + }); + req.on("error", rejectOnce); + req.end(body); + }); +} + +function isRetryableNativeHookRelayBridgeError(error: unknown): boolean { + const code = (error as NodeJS.ErrnoException).code; + return ( + code === "ENOENT" || + code === "ECONNREFUSED" || + code === "EAGAIN" || + (error instanceof Error && error.message === "native hook relay bridge not found") + ); +} + +function nativeHookRelayBridgeDir(): string { + const uid = typeof process.getuid === "function" ? process.getuid() : "nouid"; + return path.join(tmpdir(), `openclaw-native-hook-relays-${uid}`); +} + +function ensureNativeHookRelayBridgeDir(): string { + const bridgeDir = nativeHookRelayBridgeDir(); + mkdirSync(bridgeDir, { recursive: true, mode: 0o700 }); + const stats = lstatSync(bridgeDir); + const expectedUid = typeof process.getuid === "function" ? process.getuid() : undefined; + if (!stats.isDirectory() || stats.isSymbolicLink()) { + throw new Error("unsafe native hook relay bridge directory"); + } + if (expectedUid !== undefined && stats.uid !== expectedUid) { + throw new Error("unsafe native hook relay bridge directory owner"); + } + if ((stats.mode & 0o077) !== 0) { + chmodSync(bridgeDir, 0o700); + const repaired = lstatSync(bridgeDir); + if ((repaired.mode & 0o077) !== 0) { + throw new Error("unsafe native hook relay bridge directory permissions"); + } + } + return bridgeDir; +} + +function writeNativeHookRelayBridgeRecord( + registryPath: string, + record: NativeHookRelayBridgeRecord, +): void { + const tempPath = path.join( + path.dirname(registryPath), + `.${path.basename(registryPath)}.${process.pid}.${randomUUID()}.tmp`, + ); + try { + writeFileSync(tempPath, `${JSON.stringify(record)}\n`, { mode: 0o600, flag: "wx" }); + renameSync(tempPath, registryPath); + chmodSync(registryPath, 0o600); + } catch (error) { + rmSync(tempPath, { force: true }); + throw error; + } +} + +function nativeHookRelayBridgeRegistryPath(relayId: string): string { + return path.join(nativeHookRelayBridgeDir(), `${nativeHookRelayBridgeKey(relayId)}.json`); +} + +function nativeHookRelayBridgeKey(relayId: string): string { + return createHash("sha256").update(relayId).digest("hex").slice(0, 32); +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, Math.max(0, ms))); +} + async function processNativeHookRelayInvocation(params: { registration: NativeHookRelayRegistration; invocation: NativeHookRelayInvocation; @@ -1003,6 +1455,26 @@ function truncateText(value: string, maxLength: number): string { } function resolveOpenClawCliExecutable(): string { + const envPath = process.env.OPENCLAW_CLI_PATH?.trim(); + if (envPath && existsSync(envPath)) { + return envPath; + } + const packageRoot = resolveOpenClawPackageRootSync({ + moduleUrl: import.meta.url, + argv1: process.argv[1], + cwd: process.cwd(), + }); + if (packageRoot) { + for (const candidate of [ + path.join(packageRoot, "openclaw.mjs"), + path.join(packageRoot, "dist", "entry.js"), + path.join(packageRoot, "scripts", "run-node.mjs"), + ]) { + if (existsSync(candidate)) { + return candidate; + } + } + } const argvEntry = process.argv[1]; if (argvEntry) { const resolved = path.resolve(argvEntry); @@ -1161,6 +1633,9 @@ function isJsonObject(value: unknown): value is Record { export const __testing = { clearNativeHookRelaysForTests(): void { + for (const relayId of relayBridges.keys()) { + unregisterNativeHookRelayBridge(relayId); + } relays.clear(); invocations.length = 0; pendingPermissionApprovals.clear(); @@ -1173,6 +1648,16 @@ export const __testing = { getNativeHookRelayRegistrationForTests(relayId: string): NativeHookRelayRegistration | undefined { return relays.get(relayId); }, + getNativeHookRelayBridgeDirForTests(): string { + return nativeHookRelayBridgeDir(); + }, + getNativeHookRelayBridgeRegistryPathForTests(relayId: string): string { + return nativeHookRelayBridgeRegistryPath(relayId); + }, + getNativeHookRelayBridgeRecordForTests(relayId: string): Record | undefined { + const record = readNativeHookRelayBridgeRecordIfExists(relayId); + return record ? { ...record } : undefined; + }, formatPermissionApprovalDescriptionForTests( request: NativeHookRelayPermissionApprovalRequest, ): string { diff --git a/src/cli/native-hook-relay-cli.ts b/src/cli/native-hook-relay-cli.ts index dd33dca52ef..62b3094ed73 100644 --- a/src/cli/native-hook-relay-cli.ts +++ b/src/cli/native-hook-relay-cli.ts @@ -1,5 +1,6 @@ import { Readable, Writable } from "node:stream"; import { + invokeNativeHookRelayBridge, renderNativeHookRelayUnavailableResponse, type NativeHookRelayProcessResponse, } from "../agents/harness/native-hook-relay.js"; @@ -43,6 +44,23 @@ export async function runNativeHookRelayCli( return 1; } + try { + const response = await invokeNativeHookRelayBridge({ + provider, + relayId, + event, + rawPayload, + registrationTimeoutMs: 100, + timeoutMs: normalizeTimeoutMs(opts.timeout), + }); + writeText(stdout, response.stdout); + writeText(stderr, response.stderr); + return response.exitCode; + } catch { + // Fall through to the gateway path for embedded/local gateway cases and + // older registrations that predate the direct relay bridge. + } + try { const response = await callGatewayFn({ method: "nativeHook.invoke", diff --git a/src/pairing/pairing-store.test.ts b/src/pairing/pairing-store.test.ts index 8242748c239..945846e125f 100644 --- a/src/pairing/pairing-store.test.ts +++ b/src/pairing/pairing-store.test.ts @@ -24,43 +24,6 @@ vi.mock("../infra/file-lock.js", () => ({ withFileLock: async (_path: string, _options: unknown, fn: () => unknown) => await fn(), })); -const jsonStoreMocks = vi.hoisted(() => ({ - readJsonFileWithFallback: vi.fn(async (filePath: string, fallback: T) => { - const fs = await import("node:fs/promises"); - let raw: string; - try { - raw = await fs.readFile(filePath, "utf8"); - } catch (err) { - if ((err as { code?: string }).code === "ENOENT") { - return { value: fallback, exists: false }; - } - return { value: fallback, exists: false }; - } - try { - const parsed = JSON.parse(raw) as T; - return { - value: parsed ?? fallback, - exists: true, - }; - } catch { - return { value: fallback, exists: true }; - } - }), - writeJsonFileAtomically: vi.fn(async (filePath: string, value: unknown) => { - const fs = await import("node:fs/promises"); - const path = await import("node:path"); - await fs.mkdir(path.dirname(filePath), { recursive: true }); - await fs.writeFile(filePath, `${JSON.stringify(value, null, 2)}\n`, "utf8"); - }), -})); - -vi.mock("../plugin-sdk/json-store.js", () => { - return { - readJsonFileWithFallback: jsonStoreMocks.readJsonFileWithFallback, - writeJsonFileAtomically: jsonStoreMocks.writeJsonFileAtomically, - }; -}); - import { addChannelAllowFromStoreEntry, clearPairingAllowFromReadCacheForTest, @@ -77,7 +40,10 @@ import { let fixtureRoot = ""; let caseId = 0; type RandomIntSync = (minOrMax: number, max?: number) => number; -type ReadSpy = ReturnType | MockInstance; +type FileReadSpy = { + readCount: () => number; + mockRestore: () => void; +}; let randomIntSpy: MockInstance; let nextRandomInt = 0; @@ -181,13 +147,13 @@ async function seedTelegramAllowFromFixtures(params: { async function assertAllowFromCacheInvalidation(params: { stateDir: string; readAllowFrom: () => Promise; - readSpy: ReadSpy; + readSpy: FileReadSpy; }) { const first = await params.readAllowFrom(); const second = await params.readAllowFrom(); expect(first).toEqual(["1001"]); expect(second).toEqual(["1001"]); - expect(params.readSpy).toHaveBeenCalledTimes(1); + expect(params.readSpy.readCount()).toBe(1); await writeAllowFromFixture({ stateDir: params.stateDir, @@ -197,7 +163,7 @@ async function assertAllowFromCacheInvalidation(params: { }); const third = await params.readAllowFrom(); expect(third).toEqual(["10022"]); - expect(params.readSpy).toHaveBeenCalledTimes(2); + expect(params.readSpy.readCount()).toBe(2); } async function expectAccountScopedEntryIsolated(entry: string, accountId = "yy") { @@ -209,17 +175,17 @@ async function expectAccountScopedEntryIsolated(entry: string, accountId = "yy") async function withAllowFromCacheReadSpy(params: { stateDir: string; - createReadSpy: () => ReadSpy; - cleanupReadSpy?: (readSpy: ReadSpy) => void; + createReadSpy: (filePath: string) => FileReadSpy; readAllowFrom: () => Promise; }) { + const filePath = resolveAllowFromFilePath(params.stateDir, "telegram", "yy"); await writeAllowFromFixture({ stateDir: params.stateDir, channel: "telegram", accountId: "yy", allowFrom: ["1001"], }); - const readSpy = params.createReadSpy(); + const readSpy = params.createReadSpy(filePath); try { await assertAllowFromCacheInvalidation({ stateDir: params.stateDir, @@ -227,10 +193,14 @@ async function withAllowFromCacheReadSpy(params: { readSpy, }); } finally { - params.cleanupReadSpy?.(readSpy); + readSpy.mockRestore(); } } +function countFileReads(spy: { mock: { calls: unknown[][] } }, filePath: string): number { + return spy.mock.calls.filter(([candidate]) => candidate === filePath).length; +} + async function seedDefaultAccountAllowFromFixture(stateDir: string) { await seedTelegramAllowFromFixtures({ stateDir, @@ -608,25 +578,35 @@ describe("pairing store", () => { it("reuses cached allowFrom reads and invalidates on file updates", async () => { await withTempStateDir(async (stateDir) => { - clearOAuthFixtures(stateDir); - await withAllowFromCacheReadSpy({ - stateDir, - createReadSpy: () => { - jsonStoreMocks.readJsonFileWithFallback.mockClear(); - return jsonStoreMocks.readJsonFileWithFallback; + for (const variant of [ + { + createReadSpy: (filePath: string) => { + const spy = vi.spyOn(fsSync.promises, "readFile"); + return { + readCount: () => countFileReads(spy, filePath), + mockRestore: () => spy.mockRestore(), + }; + }, + readAllowFrom: () => readChannelAllowFromStore("telegram", process.env, "yy"), }, - readAllowFrom: () => readChannelAllowFromStore("telegram", process.env, "yy"), - }); - - clearOAuthFixtures(stateDir); - await withAllowFromCacheReadSpy({ - stateDir, - createReadSpy: () => vi.spyOn(fsSync, "readFileSync"), - cleanupReadSpy: (readSpy) => { - readSpy.mockRestore(); + { + createReadSpy: (filePath: string) => { + const spy = vi.spyOn(fsSync, "readFileSync"); + return { + readCount: () => countFileReads(spy, filePath), + mockRestore: () => spy.mockRestore(), + }; + }, + readAllowFrom: async () => readChannelAllowFromStoreSync("telegram", process.env, "yy"), }, - readAllowFrom: async () => readChannelAllowFromStoreSync("telegram", process.env, "yy"), - }); + ]) { + clearOAuthFixtures(stateDir); + await withAllowFromCacheReadSpy({ + stateDir, + createReadSpy: variant.createReadSpy, + readAllowFrom: variant.readAllowFrom, + }); + } }); }); });