diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 20169650a85..322a49086a4 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -1604,6 +1604,8 @@ describe("runCodexAppServerAttempt", () => { path.join(tempDir, "workspace"), ); params.timeoutMs = 100; + const onRunProgress = vi.fn(); + params.onRunProgress = onRunProgress; const run = runCodexAppServerAttempt(params, { turnCompletionIdleTimeoutMs: 300, @@ -1649,6 +1651,11 @@ describe("runCodexAppServerAttempt", () => { expect(result.timedOut).toBe(false); expect(result.promptError).toBeNull(); expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(false); + const progressReasons = onRunProgress.mock.calls.map(([info]) => info.reason); + expect(progressReasons).toContain("turn:start"); + expect( + progressReasons.filter((reason) => reason === "notification:rawResponseItem/completed"), + ).toHaveLength(2); }); it("does not count non-turn app-server requests as turn attempt progress", async () => { @@ -1659,6 +1666,8 @@ describe("runCodexAppServerAttempt", () => { path.join(tempDir, "workspace"), ); params.timeoutMs = 100; + const onRunProgress = vi.fn(); + params.onRunProgress = onRunProgress; const run = runCodexAppServerAttempt(params, { turnCompletionIdleTimeoutMs: 500, @@ -1689,6 +1698,7 @@ describe("runCodexAppServerAttempt", () => { expect(warnData?.timeoutMs).toBe(100); expect(warnData?.lastActivityReason).toBe("turn:start"); expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(true); + expect(onRunProgress.mock.calls.map(([info]) => info.reason)).toEqual(["turn:start"]); }); it("keeps the turn attempt timeout armed while non-turn requests are pending", async () => { diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 8147daa340e..5335153dc47 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -1353,6 +1353,12 @@ export async function runCodexAppServerAttempt( turnAttemptLastProgressReason = reason; turnAttemptLastProgressDetails = options.details; renewNativeHookRelayForTurnProgress(); + params.onRunProgress?.({ + reason, + provider: params.provider, + model: params.modelId, + backend: "codex-app-server", + }); } emitTrustedDiagnosticEvent({ type: "run.progress", diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index d34c0ae6374..8675842fb63 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -379,8 +379,18 @@ export async function runEmbeddedPiAgent( const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs); + let laneTaskProgressAtMs = Date.now(); + const noteLaneTaskProgress = () => { + laneTaskProgressAtMs = Date.now(); + }; const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) => - withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs); + withEmbeddedRunLaneTimeout( + { + ...opts, + taskTimeoutProgressAtMs: () => laneTaskProgressAtMs, + }, + laneTaskTimeoutMs, + ); const enqueueGlobal = (task: () => Promise, opts?: CommandQueueEnqueueOptions) => params.enqueue ? params.enqueue(task, withLaneTimeout(opts)) @@ -429,8 +439,15 @@ export async function runEmbeddedPiAgent( "phase" >, ) => { + noteLaneTaskProgress(); params.onExecutionPhase?.({ phase, ...extra }); }; + const notifyRunProgress = ( + info: Parameters>[0], + ) => { + noteLaneTaskProgress(); + params.onRunProgress?.(info); + }; const emitStartupStageSummary = (phase: string) => { const summary = startupStages.snapshot(); const shouldWarn = shouldWarnEmbeddedRunStageSummary(summary); @@ -1370,6 +1387,7 @@ export async function runEmbeddedPiAgent( legacyBeforeAgentStartResult, thinkLevel, onToolOutcome: observePostCompactionToolOutcome, + onRunProgress: notifyRunProgress, fastMode: params.fastMode, verboseLevel: params.verboseLevel, reasoningLevel: params.reasoningLevel, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 024e0abf7f2..26642839b07 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -177,6 +177,12 @@ export type RunEmbeddedPiAgentParams = { itemId?: string; firstModelCallStarted?: boolean; }) => void; + onRunProgress?: (info: { + reason: string; + provider?: string; + model?: string; + backend?: string; + }) => void; replyOperation?: ReplyOperation; shouldEmitToolResult?: () => boolean; shouldEmitToolOutput?: () => boolean; diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 93f1203c8ff..756f8ec8eaa 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -391,6 +391,72 @@ describe("command queue", () => { } }); + it("task timeout renews from progress timestamps", async () => { + const lane = `timeout-progress-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + vi.useFakeTimers(); + try { + let progressAtMs = Date.now(); + const blocker = createDeferred(); + const first = enqueueCommandInLane( + lane, + async () => { + await blocker.promise; + return "first"; + }, + { + taskTimeoutMs: 25, + taskTimeoutProgressAtMs: () => progressAtMs, + }, + ); + let secondRan = false; + const second = enqueueCommandInLane(lane, async () => { + secondRan = true; + return "second"; + }); + + await vi.advanceTimersByTimeAsync(20); + progressAtMs = Date.now(); + await vi.advanceTimersByTimeAsync(20); + expect(secondRan).toBe(false); + + blocker.resolve(); + await expect(first).resolves.toBe("first"); + await expect(second).resolves.toBe("second"); + expect(secondRan).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + + it("task timeout falls back when progress timestamp callback throws", async () => { + const lane = `timeout-progress-throw-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + vi.useFakeTimers(); + try { + const first = enqueueCommandInLane(lane, async () => new Promise(() => {}), { + taskTimeoutMs: 25, + taskTimeoutProgressAtMs: () => { + throw new Error("progress failed"); + }, + }); + const firstRejected = expect(first).rejects.toBeInstanceOf(CommandLaneTaskTimeoutError); + + await vi.advanceTimersByTimeAsync(25); + await firstRejected; + + expect( + diagnosticMocks.diag.warn.mock.calls.some(([message]) => + String(message).includes("lane task timeout progress callback failed"), + ), + ).toBe(true); + } finally { + vi.useRealTimers(); + } + }); + it("keeps work queued while a lane has zero concurrency and drains after resume", async () => { const lane = `suspended-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 0); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 1e3255e706a..41a86340bf8 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -63,6 +63,7 @@ type QueueEntry = { enqueuedAt: number; warnAfterMs: number; taskTimeoutMs?: number; + taskTimeoutProgressAtMs?: () => number | undefined; onWait?: (waitMs: number, queuedAhead: number) => void; }; @@ -210,14 +211,33 @@ async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise { + let value: number | undefined; + try { + value = entry.taskTimeoutProgressAtMs?.(); + } catch (err) { + diag.warn(`lane task timeout progress callback failed: lane=${lane} error="${String(err)}"`); + } + return typeof value === "number" && Number.isFinite(value) && value > 0 + ? Math.max(startedAtMs, Math.floor(value)) + : startedAtMs; + }; let timeoutHandle: ReturnType | undefined; let timedOut = false; const timeoutPromise = new Promise((_, reject) => { - timeoutHandle = setTimeout(() => { - timedOut = true; - reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs)); - }, taskTimeoutMs); - timeoutHandle.unref?.(); + const armTimeout = () => { + const elapsedMs = Math.max(0, Date.now() - readLastProgressAtMs()); + const remainingMs = taskTimeoutMs - elapsedMs; + if (remainingMs <= 0) { + timedOut = true; + reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs)); + return; + } + timeoutHandle = setTimeout(armTimeout, remainingMs); + timeoutHandle.unref?.(); + }; + armTimeout(); }); try { @@ -349,6 +369,7 @@ export function enqueueCommandInLane( enqueuedAt: Date.now(), warnAfterMs, taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs), + taskTimeoutProgressAtMs: opts?.taskTimeoutProgressAtMs, onWait: opts?.onWait, }); logLaneEnqueue(cleaned, getLaneDepth(state)); diff --git a/src/process/command-queue.types.ts b/src/process/command-queue.types.ts index 2e6e6d450c9..4faa427aa2d 100644 --- a/src/process/command-queue.types.ts +++ b/src/process/command-queue.types.ts @@ -2,6 +2,7 @@ export type CommandQueueEnqueueOptions = { warnAfterMs?: number; onWait?: (waitMs: number, queuedAhead: number) => void; taskTimeoutMs?: number; + taskTimeoutProgressAtMs?: () => number | undefined; }; export type CommandQueueEnqueueFn = (