Keep Codex Computer Use hook relays live across turns (#74107)

* Fix Codex native hook relay across processes

* fix: harden native hook relay bridge

* test: stabilize pairing store cache assertion

---------

Co-authored-by: pashpashpash <nik@vault77.ai>
This commit is contained in:
pash-openai
2026-04-29 16:57:12 -04:00
committed by GitHub
parent 9ccd015898
commit 3b5dab372a
7 changed files with 843 additions and 68 deletions

View File

@@ -281,10 +281,10 @@ restart Codex Computer Use, relaunch Codex Desktop if needed, then retry in a
fresh OpenClaw session.
**A Computer Use tool says `Native hook relay unavailable`.** The Codex-native
tool hook reached OpenClaw with a stale or missing relay registration. Start a
fresh OpenClaw session with `/new` or `/reset`. If it keeps happening, restart
the gateway so old app-server threads and hook registrations are dropped, then
retry.
tool hook could not reach an active OpenClaw relay through the local bridge or
Gateway fallback. Start a fresh OpenClaw session with `/new` or `/reset`. If it
keeps happening, restart the gateway so old app-server threads and hook
registrations are dropped, then retry.
**Turn-start auto-install refuses a source.** This is intentional. Add the
source with explicit `/codex computer-use install --source <marketplace-source>`

View File

@@ -735,6 +735,71 @@ describe("runCodexAppServerAttempt", () => {
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
it("reuses the Codex native hook relay id across runs for the same session", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const firstHarness = createStartedThreadHarness();
const firstRun = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir), {
nativeHookRelay: {
enabled: true,
events: ["pre_tool_use"],
},
});
await firstHarness.waitForMethod("turn/start");
await firstHarness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await firstRun;
const firstStartRequest = firstHarness.requests.find(
(request) => request.method === "thread/start",
);
const firstRelayId = extractRelayIdFromThreadRequest(firstStartRequest?.params);
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId),
).toBeUndefined();
const secondHarness = createResumeHarness();
const secondParams = createParams(sessionFile, workspaceDir);
secondParams.runId = "run-2";
const secondRun = runCodexAppServerAttempt(secondParams, {
nativeHookRelay: {
enabled: true,
events: ["pre_tool_use"],
},
});
await secondHarness.waitForMethod("turn/start");
const resumeRequest = secondHarness.requests.find(
(request) => request.method === "thread/resume",
);
const secondRelayId = extractRelayIdFromThreadRequest(resumeRequest?.params);
expect(secondRelayId).toBe(firstRelayId);
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId),
).toMatchObject({
runId: "run-2",
allowedEvents: ["pre_tool_use"],
});
await secondHarness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await secondRun;
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId),
).toBeUndefined();
});
it("builds deterministic opaque Codex native hook relay ids", () => {
const relayId = __testing.buildCodexNativeHookRelayId({
agentId: "dev-codex",
sessionId: "cu-pr-relay-smoke",
sessionKey: "agent:dev-codex:cu-pr-relay-smoke",
});
expect(relayId).toBe("codex-8810b5252975550c887ff0def512b25e944bac39");
expect(relayId).not.toContain("dev-codex");
expect(relayId).not.toContain("cu-pr-relay-smoke");
});
it("sends clearing Codex native hook config when the relay is disabled", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");

View File

@@ -1,3 +1,4 @@
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
@@ -1009,6 +1010,11 @@ function createCodexNativeHookRelay(params: {
}
return registerNativeHookRelay({
provider: "codex",
relayId: buildCodexNativeHookRelayId({
agentId: params.agentId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
}),
...(params.agentId ? { agentId: params.agentId } : {}),
sessionId: params.sessionId,
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
@@ -1022,6 +1028,20 @@ function createCodexNativeHookRelay(params: {
});
}
function buildCodexNativeHookRelayId(params: {
agentId: string | undefined;
sessionId: string;
sessionKey: string | undefined;
}): string {
const hash = createHash("sha256");
hash.update("openclaw:codex:native-hook-relay:v1");
hash.update("\0");
hash.update(params.agentId?.trim() || "");
hash.update("\0");
hash.update(params.sessionKey?.trim() || params.sessionId);
return `codex-${hash.digest("hex").slice(0, 40)}`;
}
function interruptCodexTurnBestEffort(
client: CodexAppServerClient,
params: {
@@ -1297,6 +1317,7 @@ function handleApprovalRequest(params: {
export const __testing = {
CODEX_DYNAMIC_TOOL_TIMEOUT_MS,
CODEX_TURN_COMPLETION_IDLE_TIMEOUT_MS,
buildCodexNativeHookRelayId,
filterToolsForVisionInputs,
handleDynamicToolCallWithTimeout,
...createCodexAppServerClientFactoryTestHooks((factory) => {

View File

@@ -1,3 +1,5 @@
import { statSync, writeFileSync } from "node:fs";
import { createServer } from "node:http";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
initializeGlobalHookRunner,
@@ -8,6 +10,7 @@ import {
__testing,
buildNativeHookRelayCommand,
invokeNativeHookRelay,
invokeNativeHookRelayBridge,
registerNativeHookRelay,
} from "./native-hook-relay.js";
@@ -17,6 +20,17 @@ afterEach(() => {
__testing.clearNativeHookRelaysForTests();
});
async function waitForNativeHookRelayBridgeRecord(
relayId: string,
): Promise<Record<string, unknown>> {
let record: Record<string, unknown> | undefined;
await vi.waitFor(() => {
record = __testing.getNativeHookRelayBridgeRecordForTests(relayId);
expect(record).toBeDefined();
});
return record!;
}
describe("native hook relay registry", () => {
it("registers a short-lived relay and builds hidden CLI commands", () => {
const relay = registerNativeHookRelay({
@@ -46,6 +60,198 @@ describe("native hook relay registry", () => {
);
});
it("allows callers to replace a relay at a stable id", () => {
const first = registerNativeHookRelay({
provider: "codex",
relayId: "codex-stable-session",
sessionId: "session-1",
runId: "run-1",
allowedEvents: ["pre_tool_use"],
});
const second = registerNativeHookRelay({
provider: "codex",
relayId: "codex-stable-session",
sessionId: "session-1",
runId: "run-2",
allowedEvents: ["post_tool_use"],
});
expect(second.relayId).toBe(first.relayId);
expect(__testing.getNativeHookRelayRegistrationForTests(first.relayId)).toMatchObject({
runId: "run-2",
allowedEvents: ["post_tool_use"],
});
});
it("exposes registered relays through the direct hook bridge", async () => {
const relay = registerNativeHookRelay({
provider: "codex",
relayId: "codex-bridge-session",
sessionId: "session-1",
runId: "run-1",
allowedEvents: ["pre_tool_use"],
});
const response = await invokeNativeHookRelayBridge({
provider: "codex",
relayId: relay.relayId,
event: "pre_tool_use",
timeoutMs: 2_000,
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
});
expect(response).toEqual({ stdout: "", stderr: "", exitCode: 0 });
expect(__testing.getNativeHookRelayInvocationsForTests()).toEqual([
expect.objectContaining({
relayId: relay.relayId,
event: "pre_tool_use",
runId: "run-1",
}),
]);
});
it("keeps direct bridge registry files private and loopback-only", async () => {
const relay = registerNativeHookRelay({
provider: "codex",
relayId: "codex-private-bridge-session",
sessionId: "session-1",
runId: "run-1",
allowedEvents: ["pre_tool_use"],
});
const record = await waitForNativeHookRelayBridgeRecord(relay.relayId);
const bridgeDir = __testing.getNativeHookRelayBridgeDirForTests();
const registryPath = __testing.getNativeHookRelayBridgeRegistryPathForTests(relay.relayId);
expect(statSync(bridgeDir).mode & 0o077).toBe(0);
expect(statSync(registryPath).mode & 0o077).toBe(0);
writeFileSync(
registryPath,
`${JSON.stringify({
...record,
hostname: "192.0.2.1",
expiresAtMs: Date.now() + 10_000,
})}\n`,
{ mode: 0o600 },
);
await expect(
invokeNativeHookRelayBridge({
provider: "codex",
relayId: relay.relayId,
event: "pre_tool_use",
registrationTimeoutMs: 1,
timeoutMs: 50,
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay bridge not found");
});
it("binds direct bridge tokens to the relay they were issued for", async () => {
const first = registerNativeHookRelay({
provider: "codex",
relayId: "codex-first-bridge-session",
sessionId: "session-1",
runId: "run-1",
allowedEvents: ["pre_tool_use"],
});
const second = registerNativeHookRelay({
provider: "codex",
relayId: "codex-second-bridge-session",
sessionId: "session-2",
runId: "run-2",
allowedEvents: ["pre_tool_use"],
});
const firstRecord = await waitForNativeHookRelayBridgeRecord(first.relayId);
await waitForNativeHookRelayBridgeRecord(second.relayId);
writeFileSync(
__testing.getNativeHookRelayBridgeRegistryPathForTests(second.relayId),
`${JSON.stringify({
...firstRecord,
relayId: second.relayId,
expiresAtMs: Date.now() + 10_000,
})}\n`,
{ mode: 0o600 },
);
await expect(
invokeNativeHookRelayBridge({
provider: "codex",
relayId: second.relayId,
event: "pre_tool_use",
timeoutMs: 500,
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay bridge target mismatch");
expect(__testing.getNativeHookRelayInvocationsForTests()).toEqual([]);
});
it("rejects oversized direct bridge responses", async () => {
const relay = registerNativeHookRelay({
provider: "codex",
relayId: "codex-oversized-bridge-response",
sessionId: "session-1",
runId: "run-1",
allowedEvents: ["pre_tool_use"],
});
const record = await waitForNativeHookRelayBridgeRecord(relay.relayId);
const server = createServer((_req, res) => {
res.writeHead(200, { "content-type": "application/json" });
res.end("x".repeat(5_000_001));
});
await new Promise<void>((resolve) => {
server.listen(0, "127.0.0.1", resolve);
});
try {
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("test bridge server address unavailable");
}
writeFileSync(
__testing.getNativeHookRelayBridgeRegistryPathForTests(relay.relayId),
`${JSON.stringify({
...record,
port: address.port,
token: "test-token",
expiresAtMs: Date.now() + 10_000,
})}\n`,
{ mode: 0o600 },
);
await expect(
invokeNativeHookRelayBridge({
provider: "codex",
relayId: relay.relayId,
event: "pre_tool_use",
timeoutMs: 500,
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay bridge response too large");
} finally {
await new Promise<void>((resolve) => {
server.close(() => resolve());
});
}
});
it("accepts an allowed Codex invocation and preserves raw payload", async () => {
const relay = registerNativeHookRelay({
provider: "codex",

View File

@@ -1,6 +1,24 @@
import { createHash, randomUUID } from "node:crypto";
import { existsSync } from "node:fs";
import {
chmodSync,
existsSync,
lstatSync,
mkdirSync,
readFileSync,
renameSync,
rmSync,
writeFileSync,
} from "node:fs";
import {
createServer,
request as httpRequest,
type IncomingMessage,
type Server,
type ServerResponse,
} from "node:http";
import { tmpdir } from "node:os";
import path from "node:path";
import { resolveOpenClawPackageRootSync } from "../../infra/openclaw-root.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { PluginApprovalResolutions } from "../../plugins/types.js";
import { runBeforeToolCallHook } from "../pi-tools.before-tool-call.js";
@@ -76,6 +94,7 @@ export type NativeHookRelayRegistrationHandle = NativeHookRelayRegistration & {
export type RegisterNativeHookRelayParams = {
provider: NativeHookRelayProvider;
relayId?: string;
agentId?: string;
sessionId: string;
sessionKey?: string;
@@ -99,6 +118,11 @@ export type InvokeNativeHookRelayParams = {
rawPayload: unknown;
};
export type InvokeNativeHookRelayBridgeParams = InvokeNativeHookRelayParams & {
registrationTimeoutMs?: number;
timeoutMs?: number;
};
type NativeHookRelayInvocationMetadata = Partial<
Pick<
NativeHookRelayInvocation,
@@ -148,8 +172,12 @@ const MAX_APPROVAL_TITLE_LENGTH = 80;
const MAX_APPROVAL_DESCRIPTION_LENGTH = 700;
const MAX_PERMISSION_APPROVALS_PER_WINDOW = 12;
const PERMISSION_APPROVAL_WINDOW_MS = 60_000;
const MAX_NATIVE_HOOK_BRIDGE_BODY_BYTES = 5_000_000;
const MAX_NATIVE_HOOK_BRIDGE_RESPONSE_BYTES = 5_000_000;
const NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS = 25;
const ANSI_ESCAPE_PATTERN = new RegExp(`${String.fromCharCode(27)}\\[[0-?]*[ -/]*[@-~]`, "g");
const relays = new Map<string, NativeHookRelayRegistration>();
const relayBridges = new Map<string, NativeHookRelayBridgeRegistration>();
const invocations: NativeHookRelayInvocation[] = [];
const pendingPermissionApprovals = new Map<
string,
@@ -180,6 +208,23 @@ type NativeHookRelayPermissionApprovalRequester = (
request: NativeHookRelayPermissionApprovalRequest,
) => Promise<NativeHookRelayPermissionApprovalResult>;
type NativeHookRelayBridgeRegistration = {
relayId: string;
registryPath: string;
token: string;
server: Server;
};
type NativeHookRelayBridgeRecord = {
version: 1;
relayId: string;
pid: number;
hostname: string;
port: number;
token: string;
expiresAtMs: number;
};
let nativeHookRelayPermissionApprovalRequester: NativeHookRelayPermissionApprovalRequester =
requestNativeHookRelayPermissionApproval;
@@ -245,8 +290,9 @@ export function registerNativeHookRelay(
params: RegisterNativeHookRelayParams,
): NativeHookRelayRegistrationHandle {
pruneExpiredNativeHookRelays();
const relayId = randomUUID();
const relayId = normalizeRelayId(params.relayId) ?? randomUUID();
const allowedEvents = normalizeAllowedEvents(params.allowedEvents);
unregisterNativeHookRelay(relayId);
const registration: NativeHookRelayRegistration = {
relayId,
provider: params.provider,
@@ -259,6 +305,7 @@ export function registerNativeHookRelay(
...(params.signal ? { signal: params.signal } : {}),
};
relays.set(relayId, registration);
registerNativeHookRelayBridge(registration);
return {
...registration,
commandForEvent: (event) =>
@@ -275,11 +322,23 @@ export function registerNativeHookRelay(
}
export function unregisterNativeHookRelay(relayId: string): void {
unregisterNativeHookRelayBridge(relayId);
relays.delete(relayId);
removeNativeHookRelayInvocations(relayId);
removeNativeHookRelayPermissionState(relayId);
}
function normalizeRelayId(value: string | undefined): string | undefined {
const trimmed = value?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.length > 160 || !/^[A-Za-z0-9._:-]+$/u.test(trimmed)) {
throw new Error("native hook relay id must be non-empty, compact, and URL-safe");
}
return trimmed;
}
export function buildNativeHookRelayCommand(params: {
provider: NativeHookRelayProvider;
relayId: string;
@@ -348,6 +407,52 @@ export async function invokeNativeHookRelay(
});
}
export async function invokeNativeHookRelayBridge(
params: InvokeNativeHookRelayBridgeParams,
): Promise<NativeHookRelayProcessResponse> {
const provider = readNativeHookRelayProvider(params.provider);
const relayId = readNonEmptyString(params.relayId, "relayId");
const event = readNativeHookRelayEvent(params.event);
const timeoutMs = normalizePositiveInteger(params.timeoutMs, DEFAULT_RELAY_TIMEOUT_MS);
const registrationTimeoutMs = normalizePositiveInteger(params.registrationTimeoutMs, timeoutMs);
const startedAt = Date.now();
let lastError: unknown = new Error("native hook relay bridge not found");
while (Date.now() - startedAt < timeoutMs) {
try {
const record = readNativeHookRelayBridgeRecord(relayId);
if (Date.now() > record.expiresAtMs) {
throw new Error("native hook relay bridge expired");
}
return await invokeNativeHookRelayBridgeRecord({
record,
timeoutMs: Math.max(1, timeoutMs - (Date.now() - startedAt)),
payload: {
provider,
relayId,
event,
rawPayload: params.rawPayload,
},
});
} catch (error) {
lastError = error;
if (
error instanceof Error &&
error.message === "native hook relay bridge not found" &&
Date.now() - startedAt >= registrationTimeoutMs
) {
break;
}
if (!isRetryableNativeHookRelayBridgeError(error)) {
break;
}
await delay(
Math.min(NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS, timeoutMs - (Date.now() - startedAt)),
);
}
}
throw lastError instanceof Error ? lastError : new Error(String(lastError));
}
export function renderNativeHookRelayUnavailableResponse(params: {
provider: unknown;
event: unknown;
@@ -388,11 +493,358 @@ function pruneExpiredNativeHookRelays(now = Date.now()): void {
for (const [relayId, registration] of relays) {
if (now > registration.expiresAtMs) {
relays.delete(relayId);
unregisterNativeHookRelayBridge(relayId);
removeNativeHookRelayInvocations(relayId);
}
}
}
function registerNativeHookRelayBridge(registration: NativeHookRelayRegistration): void {
unregisterNativeHookRelayBridge(registration.relayId);
const token = randomUUID();
const bridgeDir = ensureNativeHookRelayBridgeDir();
const bridgeKey = nativeHookRelayBridgeKey(registration.relayId);
const registryPath = path.join(bridgeDir, `${bridgeKey}.json`);
const server = createServer((req, res) => {
void handleNativeHookRelayBridgeRequest(req, res, {
provider: registration.provider,
relayId: registration.relayId,
token,
});
});
const bridge: NativeHookRelayBridgeRegistration = {
relayId: registration.relayId,
registryPath,
token,
server,
};
relayBridges.set(registration.relayId, bridge);
server.on("error", (error) => {
log.debug("native hook relay bridge server error", { error, relayId: registration.relayId });
});
server.listen(0, "127.0.0.1", () => {
if (relayBridges.get(registration.relayId) !== bridge) {
return;
}
const address = server.address();
if (!address || typeof address === "string") {
log.debug("native hook relay bridge server address unavailable", {
relayId: registration.relayId,
});
return;
}
const record: NativeHookRelayBridgeRecord = {
version: 1,
relayId: registration.relayId,
pid: process.pid,
hostname: "127.0.0.1",
port: address.port,
token,
expiresAtMs: registration.expiresAtMs,
};
writeNativeHookRelayBridgeRecord(registryPath, record);
});
server.unref();
}
function unregisterNativeHookRelayBridge(relayId: string): void {
const bridge = relayBridges.get(relayId);
if (!bridge) {
return;
}
relayBridges.delete(relayId);
bridge.server.close();
const record = readNativeHookRelayBridgeRecordIfExists(relayId);
if (record?.token === bridge.token) {
rmSync(bridge.registryPath, { force: true });
}
}
async function handleNativeHookRelayBridgeRequest(
req: IncomingMessage,
res: ServerResponse,
auth: { provider: NativeHookRelayProvider; relayId: string; token: string },
): Promise<void> {
try {
if (req.method !== "POST" || req.url !== "/invoke") {
writeNativeHookRelayBridgeJson(res, 404, { ok: false, error: "not found" });
return;
}
if (req.headers.authorization !== `Bearer ${auth.token}`) {
writeNativeHookRelayBridgeJson(res, 403, { ok: false, error: "forbidden" });
return;
}
const body = await readNativeHookRelayBridgeBody(req);
const payload = readNativeHookRelayBridgePayload(JSON.parse(body));
if (payload.provider !== auth.provider || payload.relayId !== auth.relayId) {
writeNativeHookRelayBridgeJson(res, 403, {
ok: false,
error: "native hook relay bridge target mismatch",
});
return;
}
const result = await invokeNativeHookRelay(payload);
writeNativeHookRelayBridgeJson(res, 200, { ok: true, result });
} catch (error) {
writeNativeHookRelayBridgeJson(res, 500, {
ok: false,
error: error instanceof Error ? error.message : String(error),
});
}
}
async function readNativeHookRelayBridgeBody(req: NodeJS.ReadableStream): Promise<string> {
const chunks: Buffer[] = [];
let total = 0;
for await (const chunk of req) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
total += buffer.byteLength;
if (total > MAX_NATIVE_HOOK_BRIDGE_BODY_BYTES) {
throw new Error("native hook relay bridge payload too large");
}
chunks.push(buffer);
}
return Buffer.concat(chunks, total).toString("utf8");
}
function readNativeHookRelayBridgePayload(value: unknown): InvokeNativeHookRelayParams {
if (!isJsonObject(value)) {
throw new Error("native hook relay bridge payload must be an object");
}
return {
provider: value.provider,
relayId: value.relayId,
event: value.event,
rawPayload: value.rawPayload,
};
}
function writeNativeHookRelayBridgeJson(
res: ServerResponse,
statusCode: number,
payload: unknown,
): void {
const body = JSON.stringify(payload);
res.writeHead(statusCode, {
"content-type": "application/json",
"content-length": Buffer.byteLength(body),
});
res.end(body);
}
function readNativeHookRelayBridgeRecord(relayId: string): NativeHookRelayBridgeRecord {
const record = readNativeHookRelayBridgeRecordIfExists(relayId);
if (!record) {
throw new Error("native hook relay bridge not found");
}
return record;
}
function readNativeHookRelayBridgeRecordIfExists(
relayId: string,
): NativeHookRelayBridgeRecord | undefined {
const registryPath = nativeHookRelayBridgeRegistryPath(relayId);
try {
const parsed: unknown = JSON.parse(readFileSync(registryPath, "utf8"));
if (isNativeHookRelayBridgeRecord(parsed, relayId)) {
return parsed;
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
log.debug("failed to read native hook relay bridge registry", { error, relayId });
}
}
return undefined;
}
function isNativeHookRelayBridgeRecord(
value: unknown,
relayId: string,
): value is NativeHookRelayBridgeRecord {
return (
isJsonObject(value) &&
value.version === 1 &&
value.relayId === relayId &&
typeof value.pid === "number" &&
Number.isInteger(value.pid) &&
value.hostname === "127.0.0.1" &&
typeof value.port === "number" &&
Number.isInteger(value.port) &&
value.port > 0 &&
value.port <= 65_535 &&
typeof value.token === "string" &&
value.token.length > 0 &&
typeof value.expiresAtMs === "number"
);
}
async function invokeNativeHookRelayBridgeRecord(params: {
record: NativeHookRelayBridgeRecord;
timeoutMs: number;
payload: InvokeNativeHookRelayParams;
}): Promise<NativeHookRelayProcessResponse> {
const startedAt = Date.now();
let lastError: unknown;
while (Date.now() - startedAt < params.timeoutMs) {
try {
return await postNativeHookRelayBridgeRecord({
...params,
timeoutMs: Math.max(1, params.timeoutMs - (Date.now() - startedAt)),
});
} catch (error) {
lastError = error;
if (!isRetryableNativeHookRelayBridgeError(error)) {
break;
}
await delay(
Math.min(NATIVE_HOOK_BRIDGE_RETRY_INTERVAL_MS, params.timeoutMs - (Date.now() - startedAt)),
);
}
}
throw lastError instanceof Error ? lastError : new Error(String(lastError));
}
function postNativeHookRelayBridgeRecord(params: {
record: NativeHookRelayBridgeRecord;
timeoutMs: number;
payload: InvokeNativeHookRelayParams;
}): Promise<NativeHookRelayProcessResponse> {
const body = JSON.stringify(params.payload);
return new Promise((resolve, reject) => {
let settled = false;
const resolveOnce = (value: NativeHookRelayProcessResponse) => {
if (!settled) {
settled = true;
resolve(value);
}
};
const rejectOnce = (error: unknown) => {
if (!settled) {
settled = true;
reject(error);
}
};
const req = httpRequest(
{
hostname: params.record.hostname,
method: "POST",
path: "/invoke",
port: params.record.port,
timeout: params.timeoutMs,
headers: {
authorization: `Bearer ${params.record.token}`,
"content-type": "application/json",
"content-length": Buffer.byteLength(body),
},
},
(res) => {
let responseText = "";
let responseBytes = 0;
res.setEncoding("utf8");
res.on("data", (chunk) => {
const chunkText = typeof chunk === "string" ? chunk : String(chunk);
responseBytes += Buffer.byteLength(chunkText);
if (responseBytes > MAX_NATIVE_HOOK_BRIDGE_RESPONSE_BYTES) {
rejectOnce(new Error("native hook relay bridge response too large"));
res.destroy();
return;
}
responseText += chunkText;
});
res.on("error", rejectOnce);
res.on("end", () => {
if (settled) {
return;
}
try {
const parsed = JSON.parse(responseText) as
| { ok: true; result: NativeHookRelayProcessResponse }
| { ok: false; error?: string };
if (parsed.ok) {
resolveOnce(parsed.result);
return;
}
rejectOnce(new Error(parsed.error || "native hook relay bridge failed"));
} catch (error) {
rejectOnce(error);
}
});
},
);
req.on("timeout", () => {
req.destroy(new Error("native hook relay bridge timed out"));
});
req.on("error", rejectOnce);
req.end(body);
});
}
function isRetryableNativeHookRelayBridgeError(error: unknown): boolean {
const code = (error as NodeJS.ErrnoException).code;
return (
code === "ENOENT" ||
code === "ECONNREFUSED" ||
code === "EAGAIN" ||
(error instanceof Error && error.message === "native hook relay bridge not found")
);
}
function nativeHookRelayBridgeDir(): string {
const uid = typeof process.getuid === "function" ? process.getuid() : "nouid";
return path.join(tmpdir(), `openclaw-native-hook-relays-${uid}`);
}
function ensureNativeHookRelayBridgeDir(): string {
const bridgeDir = nativeHookRelayBridgeDir();
mkdirSync(bridgeDir, { recursive: true, mode: 0o700 });
const stats = lstatSync(bridgeDir);
const expectedUid = typeof process.getuid === "function" ? process.getuid() : undefined;
if (!stats.isDirectory() || stats.isSymbolicLink()) {
throw new Error("unsafe native hook relay bridge directory");
}
if (expectedUid !== undefined && stats.uid !== expectedUid) {
throw new Error("unsafe native hook relay bridge directory owner");
}
if ((stats.mode & 0o077) !== 0) {
chmodSync(bridgeDir, 0o700);
const repaired = lstatSync(bridgeDir);
if ((repaired.mode & 0o077) !== 0) {
throw new Error("unsafe native hook relay bridge directory permissions");
}
}
return bridgeDir;
}
function writeNativeHookRelayBridgeRecord(
registryPath: string,
record: NativeHookRelayBridgeRecord,
): void {
const tempPath = path.join(
path.dirname(registryPath),
`.${path.basename(registryPath)}.${process.pid}.${randomUUID()}.tmp`,
);
try {
writeFileSync(tempPath, `${JSON.stringify(record)}\n`, { mode: 0o600, flag: "wx" });
renameSync(tempPath, registryPath);
chmodSync(registryPath, 0o600);
} catch (error) {
rmSync(tempPath, { force: true });
throw error;
}
}
function nativeHookRelayBridgeRegistryPath(relayId: string): string {
return path.join(nativeHookRelayBridgeDir(), `${nativeHookRelayBridgeKey(relayId)}.json`);
}
function nativeHookRelayBridgeKey(relayId: string): string {
return createHash("sha256").update(relayId).digest("hex").slice(0, 32);
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, Math.max(0, ms)));
}
async function processNativeHookRelayInvocation(params: {
registration: NativeHookRelayRegistration;
invocation: NativeHookRelayInvocation;
@@ -1003,6 +1455,26 @@ function truncateText(value: string, maxLength: number): string {
}
function resolveOpenClawCliExecutable(): string {
const envPath = process.env.OPENCLAW_CLI_PATH?.trim();
if (envPath && existsSync(envPath)) {
return envPath;
}
const packageRoot = resolveOpenClawPackageRootSync({
moduleUrl: import.meta.url,
argv1: process.argv[1],
cwd: process.cwd(),
});
if (packageRoot) {
for (const candidate of [
path.join(packageRoot, "openclaw.mjs"),
path.join(packageRoot, "dist", "entry.js"),
path.join(packageRoot, "scripts", "run-node.mjs"),
]) {
if (existsSync(candidate)) {
return candidate;
}
}
}
const argvEntry = process.argv[1];
if (argvEntry) {
const resolved = path.resolve(argvEntry);
@@ -1161,6 +1633,9 @@ function isJsonObject(value: unknown): value is Record<string, unknown> {
export const __testing = {
clearNativeHookRelaysForTests(): void {
for (const relayId of relayBridges.keys()) {
unregisterNativeHookRelayBridge(relayId);
}
relays.clear();
invocations.length = 0;
pendingPermissionApprovals.clear();
@@ -1173,6 +1648,16 @@ export const __testing = {
getNativeHookRelayRegistrationForTests(relayId: string): NativeHookRelayRegistration | undefined {
return relays.get(relayId);
},
getNativeHookRelayBridgeDirForTests(): string {
return nativeHookRelayBridgeDir();
},
getNativeHookRelayBridgeRegistryPathForTests(relayId: string): string {
return nativeHookRelayBridgeRegistryPath(relayId);
},
getNativeHookRelayBridgeRecordForTests(relayId: string): Record<string, unknown> | undefined {
const record = readNativeHookRelayBridgeRecordIfExists(relayId);
return record ? { ...record } : undefined;
},
formatPermissionApprovalDescriptionForTests(
request: NativeHookRelayPermissionApprovalRequest,
): string {

View File

@@ -1,5 +1,6 @@
import { Readable, Writable } from "node:stream";
import {
invokeNativeHookRelayBridge,
renderNativeHookRelayUnavailableResponse,
type NativeHookRelayProcessResponse,
} from "../agents/harness/native-hook-relay.js";
@@ -43,6 +44,23 @@ export async function runNativeHookRelayCli(
return 1;
}
try {
const response = await invokeNativeHookRelayBridge({
provider,
relayId,
event,
rawPayload,
registrationTimeoutMs: 100,
timeoutMs: normalizeTimeoutMs(opts.timeout),
});
writeText(stdout, response.stdout);
writeText(stderr, response.stderr);
return response.exitCode;
} catch {
// Fall through to the gateway path for embedded/local gateway cases and
// older registrations that predate the direct relay bridge.
}
try {
const response = await callGatewayFn<NativeHookRelayProcessResponse>({
method: "nativeHook.invoke",

View File

@@ -24,43 +24,6 @@ vi.mock("../infra/file-lock.js", () => ({
withFileLock: async (_path: string, _options: unknown, fn: () => unknown) => await fn(),
}));
const jsonStoreMocks = vi.hoisted(() => ({
readJsonFileWithFallback: vi.fn(async <T>(filePath: string, fallback: T) => {
const fs = await import("node:fs/promises");
let raw: string;
try {
raw = await fs.readFile(filePath, "utf8");
} catch (err) {
if ((err as { code?: string }).code === "ENOENT") {
return { value: fallback, exists: false };
}
return { value: fallback, exists: false };
}
try {
const parsed = JSON.parse(raw) as T;
return {
value: parsed ?? fallback,
exists: true,
};
} catch {
return { value: fallback, exists: true };
}
}),
writeJsonFileAtomically: vi.fn(async (filePath: string, value: unknown) => {
const fs = await import("node:fs/promises");
const path = await import("node:path");
await fs.mkdir(path.dirname(filePath), { recursive: true });
await fs.writeFile(filePath, `${JSON.stringify(value, null, 2)}\n`, "utf8");
}),
}));
vi.mock("../plugin-sdk/json-store.js", () => {
return {
readJsonFileWithFallback: jsonStoreMocks.readJsonFileWithFallback,
writeJsonFileAtomically: jsonStoreMocks.writeJsonFileAtomically,
};
});
import {
addChannelAllowFromStoreEntry,
clearPairingAllowFromReadCacheForTest,
@@ -77,7 +40,10 @@ import {
let fixtureRoot = "";
let caseId = 0;
type RandomIntSync = (minOrMax: number, max?: number) => number;
type ReadSpy = ReturnType<typeof vi.fn> | MockInstance;
type FileReadSpy = {
readCount: () => number;
mockRestore: () => void;
};
let randomIntSpy: MockInstance<RandomIntSync>;
let nextRandomInt = 0;
@@ -181,13 +147,13 @@ async function seedTelegramAllowFromFixtures(params: {
async function assertAllowFromCacheInvalidation(params: {
stateDir: string;
readAllowFrom: () => Promise<string[]>;
readSpy: ReadSpy;
readSpy: FileReadSpy;
}) {
const first = await params.readAllowFrom();
const second = await params.readAllowFrom();
expect(first).toEqual(["1001"]);
expect(second).toEqual(["1001"]);
expect(params.readSpy).toHaveBeenCalledTimes(1);
expect(params.readSpy.readCount()).toBe(1);
await writeAllowFromFixture({
stateDir: params.stateDir,
@@ -197,7 +163,7 @@ async function assertAllowFromCacheInvalidation(params: {
});
const third = await params.readAllowFrom();
expect(third).toEqual(["10022"]);
expect(params.readSpy).toHaveBeenCalledTimes(2);
expect(params.readSpy.readCount()).toBe(2);
}
async function expectAccountScopedEntryIsolated(entry: string, accountId = "yy") {
@@ -209,17 +175,17 @@ async function expectAccountScopedEntryIsolated(entry: string, accountId = "yy")
async function withAllowFromCacheReadSpy(params: {
stateDir: string;
createReadSpy: () => ReadSpy;
cleanupReadSpy?: (readSpy: ReadSpy) => void;
createReadSpy: (filePath: string) => FileReadSpy;
readAllowFrom: () => Promise<string[]>;
}) {
const filePath = resolveAllowFromFilePath(params.stateDir, "telegram", "yy");
await writeAllowFromFixture({
stateDir: params.stateDir,
channel: "telegram",
accountId: "yy",
allowFrom: ["1001"],
});
const readSpy = params.createReadSpy();
const readSpy = params.createReadSpy(filePath);
try {
await assertAllowFromCacheInvalidation({
stateDir: params.stateDir,
@@ -227,10 +193,14 @@ async function withAllowFromCacheReadSpy(params: {
readSpy,
});
} finally {
params.cleanupReadSpy?.(readSpy);
readSpy.mockRestore();
}
}
function countFileReads(spy: { mock: { calls: unknown[][] } }, filePath: string): number {
return spy.mock.calls.filter(([candidate]) => candidate === filePath).length;
}
async function seedDefaultAccountAllowFromFixture(stateDir: string) {
await seedTelegramAllowFromFixtures({
stateDir,
@@ -608,25 +578,35 @@ describe("pairing store", () => {
it("reuses cached allowFrom reads and invalidates on file updates", async () => {
await withTempStateDir(async (stateDir) => {
clearOAuthFixtures(stateDir);
await withAllowFromCacheReadSpy({
stateDir,
createReadSpy: () => {
jsonStoreMocks.readJsonFileWithFallback.mockClear();
return jsonStoreMocks.readJsonFileWithFallback;
for (const variant of [
{
createReadSpy: (filePath: string) => {
const spy = vi.spyOn(fsSync.promises, "readFile");
return {
readCount: () => countFileReads(spy, filePath),
mockRestore: () => spy.mockRestore(),
};
},
readAllowFrom: () => readChannelAllowFromStore("telegram", process.env, "yy"),
},
readAllowFrom: () => readChannelAllowFromStore("telegram", process.env, "yy"),
});
clearOAuthFixtures(stateDir);
await withAllowFromCacheReadSpy({
stateDir,
createReadSpy: () => vi.spyOn(fsSync, "readFileSync"),
cleanupReadSpy: (readSpy) => {
readSpy.mockRestore();
{
createReadSpy: (filePath: string) => {
const spy = vi.spyOn(fsSync, "readFileSync");
return {
readCount: () => countFileReads(spy, filePath),
mockRestore: () => spy.mockRestore(),
};
},
readAllowFrom: async () => readChannelAllowFromStoreSync("telegram", process.env, "yy"),
},
readAllowFrom: async () => readChannelAllowFromStoreSync("telegram", process.env, "yy"),
});
]) {
clearOAuthFixtures(stateDir);
await withAllowFromCacheReadSpy({
stateDir,
createReadSpy: variant.createReadSpy,
readAllowFrom: variant.readAllowFrom,
});
}
});
});
});