fix(codex): defer native-hook-relay unregister to avoid cleanup race

Keep successful Codex native hook relays alive through a bounded grace window so late hook callbacks still reach OpenClaw enforcement, while interrupted, aborted, timed-out, and failed turns unregister immediately.\n\nCo-authored-by: Kaspre <kaspre@gmail.com>
This commit is contained in:
Kaspre
2026-05-23 22:53:00 -04:00
committed by GitHub
parent 0abedd546a
commit 96959ec3d7
6 changed files with 269 additions and 19 deletions

View File

@@ -71,6 +71,7 @@ Docs: https://docs.openclaw.ai
- WebChat: keep the run-complete indicator in progress until deferred history replay renders the assistant reply, so Done no longer appears before response text. (#85374) Thanks @neeravmakwana.
- Agents/tools: give timed-out or cancelled process trees a bounded SIGTERM cleanup window before SIGKILL while preserving tree-aware cancellation. Fixes #66399. (#85865) Thanks @IWhatsskill.
- Agents/compaction: skip agent-harness preflight for provider-owned CLI runtime sessions so over-threshold Claude CLI sessions continue through normal compaction instead of failing on a missing harness. Fixes #84857. (#84878) Thanks @zhangguiping-xydt.
- Codex/app-server: keep successful native hook relays available through a short post-turn grace window so late Codex hook subprocesses can finish policy enforcement without clearing a replacement relay. (#83987) Thanks @Kaspre.
- Control UI/config: save form-mode edits from the source config snapshot so runtime-only provider defaults like empty `models.providers.<id>.baseUrl` are not written back and rejected. Fixes #85831. Thanks @garyd9.
- Browser/existing-session: launch Chrome DevTools MCP with usage statistics disabled by default so its telemetry watchdog stays off unless an operator explicitly opts in. (#85886) Thanks @rohitjavvadi.
- Telegram: normalize legacy durable group retry targets before retry sends, polls, and pins so group retries keep using the real chat id. (#85656) Thanks @luoyanglang.

View File

@@ -176,6 +176,10 @@ export class CodexAppServerEventProjector {
private readonly options: CodexAppServerEventProjectorOptions = {},
) {}
getCompletedTurnStatus(): CodexTurn["status"] | undefined {
return this.completedTurn?.status;
}
async handleNotification(notification: CodexServerNotification): Promise<void> {
const params = isJsonObject(notification.params) ? notification.params : undefined;
if (!params) {

View File

@@ -5,6 +5,7 @@ import { SessionManager } from "@earendil-works/pi-coding-agent";
import {
abortAgentHarnessRun,
embeddedAgentLog,
invokeNativeHookRelay,
nativeHookRelayTesting,
onAgentEvent,
queueAgentHarnessMessage,
@@ -860,6 +861,7 @@ describe("runCodexAppServerAttempt", () => {
await closeCodexSandboxExecServersForTests();
resetCodexAppServerClientFactoryForTest();
testing.resetOpenClawCodingToolsFactoryForTests();
testing.clearPendingCodexNativeHookRelayUnregistersForTests();
resetCodexRateLimitCacheForTests();
nativeHookRelayTesting.clearNativeHookRelaysForTests();
clearPluginCommands();
@@ -6777,6 +6779,8 @@ describe("runCodexAppServerAttempt", () => {
expect(preToolUseState?.enabled).toBe(true);
expect(preToolUseState?.trusted_hash).toMatch(/^sha256:[a-f0-9]{64}$/);
const relayId = extractRelayIdFromThreadRequest(startRequest?.params);
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeDefined();
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -6839,6 +6843,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -6942,6 +6947,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -7004,6 +7010,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -7034,6 +7041,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -7066,6 +7074,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -7099,6 +7108,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
@@ -7140,6 +7150,7 @@ describe("runCodexAppServerAttempt", () => {
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
completed = true;
await run;
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId),
).toBeUndefined();
@@ -7152,7 +7163,7 @@ describe("runCodexAppServerAttempt", () => {
}
});
it("reuses the Codex native hook relay id across runs for the same session", async () => {
it("keeps a replacement Codex native hook relay registered when prior cleanup is pending", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const firstHarness = createStartedThreadHarness();
@@ -7171,9 +7182,22 @@ describe("runCodexAppServerAttempt", () => {
(request) => request.method === "thread/start",
);
const firstRelayId = extractRelayIdFromThreadRequest(firstStartRequest?.params);
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId),
).toBeUndefined();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId)?.runId).toBe(
"run-1",
);
await expect(
invokeNativeHookRelay({
provider: "codex",
relayId: firstRelayId,
event: "pre_tool_use",
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_use_id: "late-call-1",
tool_input: { command: "python3 -c 'print(\"x\")'" },
},
}),
).resolves.toMatchObject({ exitCode: 0 });
const secondHarness = createResumeHarness();
const secondParams = createParams(sessionFile, workspaceDir);
@@ -7196,8 +7220,17 @@ describe("runCodexAppServerAttempt", () => {
expect(resumedRegistration?.runId).toBe("run-2");
expect(resumedRegistration?.allowedEvents).toEqual(["pre_tool_use"]);
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId)?.runId).toBe(
"run-2",
);
await secondHarness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
await secondRun;
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId)?.runId).toBe(
"run-2",
);
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(
nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(firstRelayId),
).toBeUndefined();
@@ -7215,6 +7248,13 @@ describe("runCodexAppServerAttempt", () => {
expect(relayId).not.toContain("cu-pr-relay-smoke");
});
it("extends native hook relay cleanup grace for configured hook timeouts", () => {
expect(testing.resolveCodexNativeHookRelayUnregisterGraceMs(undefined)).toBe(10_000);
expect(testing.resolveCodexNativeHookRelayUnregisterGraceMs(5)).toBe(10_000);
expect(testing.resolveCodexNativeHookRelayUnregisterGraceMs(9)).toBe(14_000);
expect(testing.resolveCodexNativeHookRelayUnregisterGraceMs(60)).toBe(65_000);
});
it("sends clearing Codex native hook config when the relay is disabled", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
@@ -7455,6 +7495,20 @@ describe("runCodexAppServerAttempt", () => {
expect(result.aborted).toBe(true);
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
await expect(
invokeNativeHookRelay({
provider: "codex",
relayId,
event: "pre_tool_use",
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay not found");
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
it("refreshes Codex account rate limits when a failed turn omits reset details", async () => {
@@ -8229,11 +8283,11 @@ describe("runCodexAppServerAttempt", () => {
expect(result.timedOut).toBe(false);
});
it("releases completion when Codex raw-events an interrupted turn marker", async () => {
it("releases completion and native hook relay state when Codex raw-events an interrupted turn marker", async () => {
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
{ turnTerminalIdleTimeoutMs: 60_000 },
{ nativeHookRelay: { enabled: true }, turnTerminalIdleTimeoutMs: 60_000 },
);
let resolved = false;
void run.then(() => {
@@ -8241,6 +8295,8 @@ describe("runCodexAppServerAttempt", () => {
});
await harness.waitForMethod("turn/start");
const startRequest = harness.requests.find((request) => request.method === "thread/start");
const relayId = extractRelayIdFromThreadRequest(startRequest?.params);
await harness.notify({
method: "rawResponseItem/completed",
params: {
@@ -8266,6 +8322,61 @@ describe("runCodexAppServerAttempt", () => {
expect(result.timedOut).toBe(false);
expect(result.promptError).toBeNull();
expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(false);
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
await expect(
invokeNativeHookRelay({
provider: "codex",
relayId,
event: "pre_tool_use",
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay not found");
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
it("cleans up native hook relay state when Codex completes the turn as interrupted", async () => {
const harness = createStartedThreadHarness();
const run = runCodexAppServerAttempt(
createParams(path.join(tempDir, "session.jsonl"), path.join(tempDir, "workspace")),
{ nativeHookRelay: { enabled: true }, turnTerminalIdleTimeoutMs: 60_000 },
);
await harness.waitForMethod("turn/start");
const startRequest = harness.requests.find((request) => request.method === "thread/start");
const relayId = extractRelayIdFromThreadRequest(startRequest?.params);
await harness.notify({
method: "turn/completed",
params: {
threadId: "thread-1",
turnId: "turn-1",
turn: { id: "turn-1", status: "interrupted", items: [] },
},
});
const result = await run;
expect(result.aborted).toBe(false);
expect(result.timedOut).toBe(false);
expect(result.promptError).toBeNull();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
await expect(
invokeNativeHookRelay({
provider: "codex",
relayId,
event: "pre_tool_use",
rawPayload: {
hook_event_name: "PreToolUse",
tool_name: "Bash",
tool_input: { command: "pnpm test" },
},
}),
).rejects.toThrow("native hook relay not found");
testing.flushPendingCodexNativeHookRelayUnregistersForTests();
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
it("keeps upstream cancellation aborted when Codex completes the turn as interrupted", async () => {

View File

@@ -218,6 +218,8 @@ const CODEX_TURN_TERMINAL_IDLE_TIMEOUT_MS = 30 * 60_000;
const CODEX_NATIVE_HOOK_RELAY_MIN_TTL_MS = 30 * 60_000;
const CODEX_NATIVE_HOOK_RELAY_TTL_GRACE_MS = 5 * 60_000;
const CODEX_NATIVE_HOOK_RELAY_RENEW_INTERVAL_MS = 60_000;
const CODEX_NATIVE_HOOK_RELAY_UNREGISTER_GRACE_MS = 10_000;
const CODEX_NATIVE_HOOK_RELAY_UNREGISTER_EXTRA_GRACE_MS = 5_000;
const CODEX_STEER_ALL_DEBOUNCE_MS = 500;
const LOG_FIELD_MAX_LENGTH = 160;
const CODEX_NATIVE_SANDBOX_TOOL_REQUIREMENTS = [
@@ -276,6 +278,67 @@ type CodexWorkspaceBootstrapContext = CodexBootstrapContext & {
let openClawCodingToolsFactoryForTests: OpenClawCodingToolsFactory | undefined;
type PendingCodexNativeHookRelayUnregister = {
timeout: ReturnType<typeof setTimeout>;
unregister: () => void;
};
const pendingCodexNativeHookRelayUnregisters = new Set<PendingCodexNativeHookRelayUnregister>();
function scheduleCodexNativeHookRelayUnregister(params: {
relay: NativeHookRelayRegistrationHandle;
hookTimeoutSec?: number;
}): void {
let pending: PendingCodexNativeHookRelayUnregister | undefined;
const unregister = () => {
if (!pending) {
return;
}
const current = pending;
pending = undefined;
if (!pendingCodexNativeHookRelayUnregisters.delete(current)) {
return;
}
params.relay.unregister();
};
const timeout = setTimeout(
unregister,
resolveCodexNativeHookRelayUnregisterGraceMs(params.hookTimeoutSec),
);
pending = { timeout, unregister };
pendingCodexNativeHookRelayUnregisters.add(pending);
timeout.unref();
}
function resolveCodexNativeHookRelayUnregisterGraceMs(hookTimeoutSec: number | undefined): number {
const hookTimeoutMs =
typeof hookTimeoutSec === "number" && Number.isFinite(hookTimeoutSec) && hookTimeoutSec > 0
? Math.ceil(hookTimeoutSec) * 1000
: 0;
return Math.max(
CODEX_NATIVE_HOOK_RELAY_UNREGISTER_GRACE_MS,
hookTimeoutMs + CODEX_NATIVE_HOOK_RELAY_UNREGISTER_EXTRA_GRACE_MS,
);
}
function flushPendingCodexNativeHookRelayUnregistersForTests(): void {
while (pendingCodexNativeHookRelayUnregisters.size > 0) {
const pending = pendingCodexNativeHookRelayUnregisters.values().next().value;
if (!pending) {
return;
}
clearTimeout(pending.timeout);
pending.unregister();
}
}
function clearPendingCodexNativeHookRelayUnregistersForTests(): void {
for (const pending of pendingCodexNativeHookRelayUnregisters) {
clearTimeout(pending.timeout);
}
pendingCodexNativeHookRelayUnregisters.clear();
}
function emitCodexAppServerEvent(
params: EmbeddedRunAttemptParams,
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
@@ -1810,6 +1873,7 @@ export async function runCodexAppServerAttempt(
let turnCompletionIdleTimeoutMessage: string | undefined;
let clientClosedPromptError: string | undefined;
let clientClosedAbort = false;
let shouldDelayNativeHookRelayUnregister = false;
let lifecycleStarted = false;
let lifecycleTerminalEmitted = false;
let resolveCompletion: (() => void) | undefined;
@@ -3250,6 +3314,13 @@ export async function runCodexAppServerAttempt(
},
ctx: hookContext,
});
const completedTurnStatus = activeProjector.getCompletedTurnStatus();
shouldDelayNativeHookRelayUnregister =
completedTurnStatus === "completed" &&
!timedOut &&
!runAbortController.signal.aborted &&
!finalAborted &&
!finalPromptError;
return {
...result,
timedOut,
@@ -3316,7 +3387,19 @@ export async function runCodexAppServerAttempt(
notificationCleanup();
requestCleanup();
closeCleanup?.();
nativeHookRelay?.unregister();
if (nativeHookRelay) {
if (shouldDelayNativeHookRelayUnregister) {
// Codex hook subprocesses can outlive a completed app-server turn by a
// few seconds. Keep the relay available briefly so late
// nativeHook.invoke RPCs can still reach before_tool_call enforcement.
scheduleCodexNativeHookRelayUnregister({
relay: nativeHookRelay,
hookTimeoutSec: options.nativeHookRelay?.hookTimeoutSec,
});
} else {
nativeHookRelay.unregister();
}
}
await releaseSandboxExecEnvironment();
runAbortController.signal.removeEventListener("abort", abortListener);
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
@@ -5759,5 +5842,8 @@ export const testing = {
resetOpenClawCodingToolsFactoryForTests(): void {
openClawCodingToolsFactoryForTests = undefined;
},
flushPendingCodexNativeHookRelayUnregistersForTests,
clearPendingCodexNativeHookRelayUnregistersForTests,
resolveCodexNativeHookRelayUnregisterGraceMs,
} as const;
export { testing as __testing };

View File

@@ -179,7 +179,7 @@ describe("native hook relay registry", () => {
);
});
it("allows callers to replace a relay at a stable id", () => {
it("allows callers to replace a relay at a stable id", async () => {
const first = registerNativeHookRelay({
provider: "codex",
relayId: "codex-stable-session",
@@ -207,6 +207,48 @@ describe("native hook relay registry", () => {
allowedEvents: ["post_tool_use"],
},
);
const secondExpiresAtMs = requireRecord(
testing.getNativeHookRelayRegistrationForTests(first.relayId),
"replacement native hook relay registration",
).expiresAtMs;
first.renew(60_000);
expect(
requireRecord(
testing.getNativeHookRelayRegistrationForTests(first.relayId),
"replacement native hook relay registration",
).expiresAtMs,
).toBe(secondExpiresAtMs);
first.unregister();
expectRecordFields(
requireRecord(
testing.getNativeHookRelayRegistrationForTests(first.relayId),
"replacement native hook relay registration",
),
{
runId: "run-2",
allowedEvents: ["post_tool_use"],
},
);
await expect(
invokeNativeHookRelayBridge({
provider: "codex",
relayId: second.relayId,
event: "post_tool_use",
timeoutMs: 2_000,
rawPayload: {
hook_event_name: "PostToolUse",
tool_name: "Bash",
tool_use_id: "replacement-call",
tool_input: { command: "pnpm test" },
tool_response: { output: "ok", exit_code: 0 },
},
}),
).resolves.toEqual({ stdout: "", stderr: "", exitCode: 0 });
second.unregister();
expect(testing.getNativeHookRelayRegistrationForTests(first.relayId)).toBeUndefined();
});
it("exposes registered relays through the direct hook bridge", async () => {
@@ -741,16 +783,16 @@ describe("native hook relay registry", () => {
});
it("rejects expired relay ids", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-04-24T12:00:00Z"));
const relay = registerNativeHookRelay({
provider: "codex",
sessionId: "session-1",
runId: "run-1",
ttlMs: 1,
});
await waitForNativeHookRelayBridgeRecord(relay.relayId);
vi.setSystemTime(new Date("2026-04-24T12:00:01Z"));
vi.useFakeTimers();
vi.setSystemTime(new Date(relay.expiresAtMs + 1));
await expect(
invokeNativeHookRelay({
@@ -761,6 +803,9 @@ describe("native hook relay registry", () => {
}),
).rejects.toThrow("expired");
expect(testing.getNativeHookRelayRegistrationForTests(relay.relayId)).toBeUndefined();
expect(testing.getNativeHookRelayBridgeRecordForTests(relay.relayId)).toBeUndefined();
relay.unregister();
expect(testing.getNativeHookRelayBridgeRecordForTests(relay.relayId)).toBeUndefined();
});
it("uses the Codex no-op output when no OpenClaw hook decides", async () => {

View File

@@ -334,7 +334,7 @@ export function registerNativeHookRelay(
}),
renew: (ttlMs) => {
const current = relays.get(relayId);
if (!current) {
if (current !== registration) {
return;
}
const expiresAtMs = Date.now() + normalizePositiveInteger(ttlMs, DEFAULT_RELAY_TTL_MS);
@@ -345,12 +345,18 @@ export function registerNativeHookRelay(
writeNativeHookRelayBridgeRecordForRegistration(current, bridge);
}
},
unregister: () => unregisterNativeHookRelay(relayId),
unregister: () => unregisterNativeHookRelay(relayId, registration),
};
return handle;
}
function unregisterNativeHookRelay(relayId: string): void {
function unregisterNativeHookRelay(
relayId: string,
expectedRegistration?: NativeHookRelayRegistration,
): void {
if (expectedRegistration && relays.get(relayId) !== expectedRegistration) {
return;
}
unregisterNativeHookRelayBridge(relayId);
relays.delete(relayId);
removeNativeHookRelayInvocations(relayId);
@@ -422,8 +428,7 @@ export async function invokeNativeHookRelay(
throw new Error("native hook relay not found");
}
if (Date.now() > registration.expiresAtMs) {
relays.delete(relayId);
removeNativeHookRelayInvocations(relayId);
unregisterNativeHookRelay(relayId, registration);
throw new Error("native hook relay expired");
}
if (registration.provider !== provider) {
@@ -551,9 +556,7 @@ function removeNativeHookRelayInvocations(relayId: string): void {
function pruneExpiredNativeHookRelays(now = Date.now()): void {
for (const [relayId, registration] of relays) {
if (now > registration.expiresAtMs) {
relays.delete(relayId);
unregisterNativeHookRelayBridge(relayId);
removeNativeHookRelayInvocations(relayId);
unregisterNativeHookRelay(relayId, registration);
}
}
}