fix(codex): keep run lane timeout progress-aware

This commit is contained in:
Peter Steinberger
2026-05-16 14:12:35 +01:00
parent a641a27bd4
commit 21c5f8dc6d
7 changed files with 134 additions and 6 deletions

View File

@@ -1604,6 +1604,8 @@ describe("runCodexAppServerAttempt", () => {
path.join(tempDir, "workspace"),
);
params.timeoutMs = 100;
const onRunProgress = vi.fn();
params.onRunProgress = onRunProgress;
const run = runCodexAppServerAttempt(params, {
turnCompletionIdleTimeoutMs: 300,
@@ -1649,6 +1651,11 @@ describe("runCodexAppServerAttempt", () => {
expect(result.timedOut).toBe(false);
expect(result.promptError).toBeNull();
expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(false);
const progressReasons = onRunProgress.mock.calls.map(([info]) => info.reason);
expect(progressReasons).toContain("turn:start");
expect(
progressReasons.filter((reason) => reason === "notification:rawResponseItem/completed"),
).toHaveLength(2);
});
it("does not count non-turn app-server requests as turn attempt progress", async () => {
@@ -1659,6 +1666,8 @@ describe("runCodexAppServerAttempt", () => {
path.join(tempDir, "workspace"),
);
params.timeoutMs = 100;
const onRunProgress = vi.fn();
params.onRunProgress = onRunProgress;
const run = runCodexAppServerAttempt(params, {
turnCompletionIdleTimeoutMs: 500,
@@ -1689,6 +1698,7 @@ describe("runCodexAppServerAttempt", () => {
expect(warnData?.timeoutMs).toBe(100);
expect(warnData?.lastActivityReason).toBe("turn:start");
expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(true);
expect(onRunProgress.mock.calls.map(([info]) => info.reason)).toEqual(["turn:start"]);
});
it("keeps the turn attempt timeout armed while non-turn requests are pending", async () => {

View File

@@ -1353,6 +1353,12 @@ export async function runCodexAppServerAttempt(
turnAttemptLastProgressReason = reason;
turnAttemptLastProgressDetails = options.details;
renewNativeHookRelayForTurnProgress();
params.onRunProgress?.({
reason,
provider: params.provider,
model: params.modelId,
backend: "codex-app-server",
});
}
emitTrustedDiagnosticEvent({
type: "run.progress",

View File

@@ -379,8 +379,18 @@ export async function runEmbeddedPiAgent(
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
const globalLane = resolveGlobalLane(params.lane);
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
let laneTaskProgressAtMs = Date.now();
const noteLaneTaskProgress = () => {
laneTaskProgressAtMs = Date.now();
};
const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) =>
withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs);
withEmbeddedRunLaneTimeout(
{
...opts,
taskTimeoutProgressAtMs: () => laneTaskProgressAtMs,
},
laneTaskTimeoutMs,
);
const enqueueGlobal = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
params.enqueue
? params.enqueue(task, withLaneTimeout(opts))
@@ -429,8 +439,15 @@ export async function runEmbeddedPiAgent(
"phase"
>,
) => {
noteLaneTaskProgress();
params.onExecutionPhase?.({ phase, ...extra });
};
const notifyRunProgress = (
info: Parameters<NonNullable<RunEmbeddedPiAgentParams["onRunProgress"]>>[0],
) => {
noteLaneTaskProgress();
params.onRunProgress?.(info);
};
const emitStartupStageSummary = (phase: string) => {
const summary = startupStages.snapshot();
const shouldWarn = shouldWarnEmbeddedRunStageSummary(summary);
@@ -1370,6 +1387,7 @@ export async function runEmbeddedPiAgent(
legacyBeforeAgentStartResult,
thinkLevel,
onToolOutcome: observePostCompactionToolOutcome,
onRunProgress: notifyRunProgress,
fastMode: params.fastMode,
verboseLevel: params.verboseLevel,
reasoningLevel: params.reasoningLevel,

View File

@@ -177,6 +177,12 @@ export type RunEmbeddedPiAgentParams = {
itemId?: string;
firstModelCallStarted?: boolean;
}) => void;
onRunProgress?: (info: {
reason: string;
provider?: string;
model?: string;
backend?: string;
}) => void;
replyOperation?: ReplyOperation;
shouldEmitToolResult?: () => boolean;
shouldEmitToolOutput?: () => boolean;

View File

@@ -391,6 +391,72 @@ describe("command queue", () => {
}
});
it("task timeout renews from progress timestamps", async () => {
const lane = `timeout-progress-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);
vi.useFakeTimers();
try {
let progressAtMs = Date.now();
const blocker = createDeferred();
const first = enqueueCommandInLane(
lane,
async () => {
await blocker.promise;
return "first";
},
{
taskTimeoutMs: 25,
taskTimeoutProgressAtMs: () => progressAtMs,
},
);
let secondRan = false;
const second = enqueueCommandInLane(lane, async () => {
secondRan = true;
return "second";
});
await vi.advanceTimersByTimeAsync(20);
progressAtMs = Date.now();
await vi.advanceTimersByTimeAsync(20);
expect(secondRan).toBe(false);
blocker.resolve();
await expect(first).resolves.toBe("first");
await expect(second).resolves.toBe("second");
expect(secondRan).toBe(true);
} finally {
vi.useRealTimers();
}
});
it("task timeout falls back when progress timestamp callback throws", async () => {
const lane = `timeout-progress-throw-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 1);
vi.useFakeTimers();
try {
const first = enqueueCommandInLane(lane, async () => new Promise<never>(() => {}), {
taskTimeoutMs: 25,
taskTimeoutProgressAtMs: () => {
throw new Error("progress failed");
},
});
const firstRejected = expect(first).rejects.toBeInstanceOf(CommandLaneTaskTimeoutError);
await vi.advanceTimersByTimeAsync(25);
await firstRejected;
expect(
diagnosticMocks.diag.warn.mock.calls.some(([message]) =>
String(message).includes("lane task timeout progress callback failed"),
),
).toBe(true);
} finally {
vi.useRealTimers();
}
});
it("keeps work queued while a lane has zero concurrency and drains after resume", async () => {
const lane = `suspended-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
setCommandLaneConcurrency(lane, 0);

View File

@@ -63,6 +63,7 @@ type QueueEntry = {
enqueuedAt: number;
warnAfterMs: number;
taskTimeoutMs?: number;
taskTimeoutProgressAtMs?: () => number | undefined;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
@@ -210,14 +211,33 @@ async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unkno
return await taskPromise;
}
const startedAtMs = Date.now();
const readLastProgressAtMs = () => {
let value: number | undefined;
try {
value = entry.taskTimeoutProgressAtMs?.();
} catch (err) {
diag.warn(`lane task timeout progress callback failed: lane=${lane} error="${String(err)}"`);
}
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.max(startedAtMs, Math.floor(value))
: startedAtMs;
};
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
let timedOut = false;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutHandle = setTimeout(() => {
timedOut = true;
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
}, taskTimeoutMs);
timeoutHandle.unref?.();
const armTimeout = () => {
const elapsedMs = Math.max(0, Date.now() - readLastProgressAtMs());
const remainingMs = taskTimeoutMs - elapsedMs;
if (remainingMs <= 0) {
timedOut = true;
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
return;
}
timeoutHandle = setTimeout(armTimeout, remainingMs);
timeoutHandle.unref?.();
};
armTimeout();
});
try {
@@ -349,6 +369,7 @@ export function enqueueCommandInLane<T>(
enqueuedAt: Date.now(),
warnAfterMs,
taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs),
taskTimeoutProgressAtMs: opts?.taskTimeoutProgressAtMs,
onWait: opts?.onWait,
});
logLaneEnqueue(cleaned, getLaneDepth(state));

View File

@@ -2,6 +2,7 @@ export type CommandQueueEnqueueOptions = {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
taskTimeoutMs?: number;
taskTimeoutProgressAtMs?: () => number | undefined;
};
export type CommandQueueEnqueueFn = <T>(