diff --git a/CHANGELOG.md b/CHANGELOG.md index 05affa66a3b..73c0fe1c695 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Build/Gateway: route restart, shutdown, respawn, diagnostics, command-queue cleanup, and runtime cleanup through one stable gateway lifecycle runtime entry so rebuilt packages do not strand long-running gateways on stale hashed chunks. Carries forward #73964. Thanks @pashpashpash. - Memory/wiki: keep broad shared-source and generated related-link blocks from turning every page into a search hit, cap noisy backlinks, support all-term searches such as people-routing queries, and prefer readable page body snippets over generated metadata. Thanks @vincentkoc. +- Cron/Gateway: abort and bounded-clean up timed-out isolated agent turns before recording the timeout, so stale cron sessions cannot leave Discord or other chat lanes stuck in `processing` after a timeout. Thanks @vincentkoc. - Agents/errors: suppress malformed streaming tool-call JSON fragments before they reach chat surfaces while preserving provider request-validation diagnostics. Fixes #59076; keeps #59080 as duplicate coverage. (#59118) Thanks @singleGanghood. - CLI/models: restore provider-filtered `models list --all --provider ` rows for providers without manifest/static catalog coverage, including Anthropic and Amazon Bedrock, while keeping the compatibility fallback off expensive availability and resolver paths. Thanks @shakkernerd. - CLI/models: move the OpenAI listable catalog into the plugin manifest so `models list --all --provider openai` uses the manifest fast path instead of loading provider runtime normalization hooks. Thanks @shakkernerd. diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index d1dbb581b0f..eb1b4650c6a 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -51,6 +51,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t - Isolated cron runs also guard against stale acknowledgement replies. If the first result is just an interim status update (`on it`, `pulling everything together`, and similar hints) and no descendant subagent run is still responsible for the final answer, OpenClaw re-prompts once for the actual result before delivery. - Isolated cron runs prefer structured execution-denial metadata from the embedded run, then fall back to known final summary/output markers such as `SYSTEM_RUN_DENIED` and `INVALID_REQUEST`, so a blocked command is not reported as a green run. - Isolated cron runs also treat run-level agent failures as job errors even when no reply payload is produced, so model/provider failures increment error counters and trigger failure notifications instead of clearing the job as successful. +- When an isolated agent-turn job reaches `timeoutSeconds`, cron aborts the underlying agent run and gives it a short cleanup window. If the run does not drain, Gateway-owned cleanup force-clears that run's session ownership before cron records the timeout, so queued chat work is not left behind a stale processing session. diff --git a/docs/concepts/agent-loop.md b/docs/concepts/agent-loop.md index 0ad8e765211..464ea294656 100644 --- a/docs/concepts/agent-loop.md +++ b/docs/concepts/agent-loop.md @@ -162,6 +162,7 @@ surfaces, while Codex native hooks remain a separate lower-level Codex mechanism - `agent.wait` default: 30s (just the wait). `timeoutMs` param overrides. - Agent runtime: `agents.defaults.timeoutSeconds` default 172800s (48 hours); enforced in `runEmbeddedPiAgent` abort timer. +- Cron runtime: isolated agent-turn `timeoutSeconds` is owned by cron. The scheduler starts that timer when execution begins, aborts the underlying run at the configured deadline, then runs bounded cleanup before recording the timeout so a stale child session cannot keep the lane stuck. - Stuck-session recovery: with diagnostics enabled, `diagnostics.stuckSessionWarnMs` detects long `processing` sessions. Active embedded runs, active reply operations, and active session-lane tasks remain warning-only by default; if diagnostics show no active work for the session, the watchdog releases the affected session lane so queued startup work can drain. - Model idle timeout: OpenClaw aborts a model request when no response chunks arrive before the idle window. `models.providers..timeoutSeconds` extends this idle watchdog for slow local/self-hosted providers; otherwise OpenClaw uses `agents.defaults.timeoutSeconds` when configured, capped at 120s by default. Cron-triggered runs with no explicit model or agent timeout disable the idle watchdog and rely on the cron outer timeout. - Provider HTTP request timeout: `models.providers..timeoutSeconds` applies to that provider's model HTTP fetches, including connect, headers, body, SDK request timeout, total guarded-fetch abort handling, and model stream idle watchdog. Use this for slow local/self-hosted providers such as Ollama before raising the whole agent runtime timeout. diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 8d464b22917..ead720c0a52 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -21,6 +21,7 @@ export { runEmbeddedPiAgent as runEmbeddedAgent, } from "./pi-embedded-runner/run.js"; export { + abortAndDrainEmbeddedPiRun, abortEmbeddedPiRun, abortEmbeddedPiRun as abortEmbeddedAgentRun, isEmbeddedPiRunActive, diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 23d8d1fe3ab..c0d7a4f28f2 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -2,6 +2,7 @@ import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures"; import { afterEach, describe, expect, it, vi } from "vitest"; import { __testing, + abortAndDrainEmbeddedPiRun, abortEmbeddedPiRun, clearActiveEmbeddedRun, consumeEmbeddedRunModelSwitch, @@ -65,6 +66,32 @@ describe("pi-embedded runner run registry", () => { expect(abortB).toHaveBeenCalledTimes(1); }); + it("force-clears an aborted run that does not drain", async () => { + vi.useFakeTimers(); + try { + const abortRun = vi.fn(); + setActiveEmbeddedRun("session-stuck", createRunHandle({ abort: abortRun }), "agent:main"); + + const resultPromise = abortAndDrainEmbeddedPiRun({ + sessionId: "session-stuck", + sessionKey: "agent:main", + settleMs: 100, + forceClear: true, + reason: "test_timeout", + }); + await vi.advanceTimersByTimeAsync(100); + const result = await resultPromise; + + expect(result).toEqual({ aborted: true, drained: false, forceCleared: true }); + expect(abortRun).toHaveBeenCalledTimes(1); + expect(isEmbeddedPiRunHandleActive("session-stuck")).toBe(false); + expect(resolveActiveEmbeddedRunHandleSessionId("agent:main")).toBeUndefined(); + } finally { + await vi.runOnlyPendingTimersAsync(); + vi.useRealTimers(); + } + }); + it("waits for active runs to drain", async () => { vi.useFakeTimers(); try { diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index 58448b5abca..ca96f6c5354 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -310,6 +310,29 @@ export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): }); } +export type AbortAndDrainEmbeddedPiRunResult = { + aborted: boolean; + drained: boolean; + forceCleared: boolean; +}; + +export async function abortAndDrainEmbeddedPiRun(params: { + sessionId: string; + sessionKey?: string; + settleMs?: number; + forceClear?: boolean; + reason?: string; +}): Promise { + const settleMs = params.settleMs ?? 15_000; + const aborted = abortEmbeddedPiRun(params.sessionId); + const drained = aborted ? await waitForEmbeddedPiRunEnd(params.sessionId, settleMs) : false; + const forceCleared = + params.forceClear === true && (!aborted || !drained) + ? forceClearEmbeddedPiRun(params.sessionId, params.sessionKey, params.reason) + : false; + return { aborted, drained, forceCleared }; +} + function notifyEmbeddedRunEnded(sessionId: string) { const waiters = EMBEDDED_RUN_WAITERS.get(sessionId); if (!waiters || waiters.size === 0) { diff --git a/src/agents/pi-embedded.runtime.ts b/src/agents/pi-embedded.runtime.ts index c5eb796abac..24ec29fda98 100644 --- a/src/agents/pi-embedded.runtime.ts +++ b/src/agents/pi-embedded.runtime.ts @@ -1,4 +1,5 @@ export { + abortAndDrainEmbeddedPiRun, abortEmbeddedPiRun, isEmbeddedPiRunActive, isEmbeddedPiRunStreaming, diff --git a/src/agents/pi-embedded.ts b/src/agents/pi-embedded.ts index 0b548defe70..6d98cdcb860 100644 --- a/src/agents/pi-embedded.ts +++ b/src/agents/pi-embedded.ts @@ -9,6 +9,7 @@ export type { EmbeddedPiRunResult, } from "./pi-embedded-runner.js"; export { + abortAndDrainEmbeddedPiRun, abortEmbeddedAgentRun, abortEmbeddedPiRun, compactEmbeddedAgentSession, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index c5ba5bd8529..95ad674d388 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -10,6 +10,7 @@ import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { resolveCronDeliveryPlan, type CronDeliveryPlan } from "../delivery-plan.js"; import type { + CronAgentExecutionStarted, CronDeliveryTrace, CronDeliveryTraceMessageTarget, CronDeliveryTraceTarget, @@ -424,7 +425,7 @@ type RunCronAgentTurnParams = { message: string; abortSignal?: AbortSignal; signal?: AbortSignal; - onExecutionStarted?: () => void; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; sessionKey: string; agentId?: string; lane?: string; @@ -1008,7 +1009,7 @@ export async function runCronIsolatedAgentTurn(params: { message: string; abortSignal?: AbortSignal; signal?: AbortSignal; - onExecutionStarted?: () => void; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; sessionKey: string; agentId?: string; lane?: string; @@ -1026,6 +1027,13 @@ export async function runCronIsolatedAgentTurn(params: { if (!prepared.ok) { return prepared.result; } + const notifyExecutionStarted = () => + params.onExecutionStarted?.({ + jobId: params.job.id, + agentId: prepared.context.agentId, + sessionId: prepared.context.runSessionId, + sessionKey: prepared.context.runSessionKey, + }); try { const { executeCronRun } = await loadCronExecutorRuntime(); @@ -1054,7 +1062,7 @@ export async function runCronIsolatedAgentTurn(params: { commandBody: prepared.context.commandBody, persistSessionEntry: prepared.context.persistSessionEntry, abortSignal, - onExecutionStarted: params.onExecutionStarted, + onExecutionStarted: notifyExecutionStarted, abortReason, isAborted, thinkLevel: prepared.context.thinkLevel, diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 25b2c6482d9..5b7d9387037 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -7,6 +7,7 @@ import type { CronJobCreate, CronJobPatch, CronMessageChannel, + CronAgentExecutionStarted, CronRunOutcome, CronRunStatus, CronRunTelemetry, @@ -93,7 +94,7 @@ export type CronServiceDeps = { job: CronJob; message: string; abortSignal?: AbortSignal; - onExecutionStarted?: () => void; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; }) => Promise< { summary?: string; @@ -114,6 +115,11 @@ export type CronServiceDeps = { } & CronRunOutcome & CronRunTelemetry >; + cleanupTimedOutAgentRun?: (params: { + job: CronJob; + timeoutMs: number; + execution?: CronAgentExecutionStarted; + }) => Promise; sendCronFailureAlert?: (params: { job: CronJob; text: string; diff --git a/src/cron/service/timer.regression.test.ts b/src/cron/service/timer.regression.test.ts index 8a61d4d16a9..4a7f8dfe0e1 100644 --- a/src/cron/service/timer.regression.test.ts +++ b/src/cron/service/timer.regression.test.ts @@ -13,7 +13,7 @@ import { } from "../../../test/helpers/cron/service-regression-fixtures.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import * as schedule from "../schedule.js"; -import type { CronJob } from "../types.js"; +import type { CronAgentExecutionStarted, CronJob } from "../types.js"; import { computeJobNextRunAtMs } from "./jobs.js"; import { createCronServiceState, type CronEvent } from "./state.js"; import { @@ -1213,6 +1213,86 @@ describe("cron service timer regressions", () => { } }); + it("cleans up timed-out isolated runs even when the runner ignores abort", async () => { + vi.useFakeTimers(); + try { + const store = timerRegressionFixtures.makeStorePath(); + const scheduledAt = Date.parse("2026-02-15T14:00:00.000Z"); + const cronJob = createIsolatedRegressionJob({ + id: "timeout-cleanup-stuck-run", + name: "timeout cleanup stuck run", + scheduledAt, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 1 }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + vi.setSystemTime(scheduledAt); + let now = scheduledAt; + const started = createDeferred(); + let abortObserved = false; + const cleanupTimedOutAgentRun = vi.fn(async () => {}); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + cleanupTimedOutAgentRun, + runIsolatedAgentJob: vi.fn( + async ({ + abortSignal, + onExecutionStarted, + }: { + abortSignal?: AbortSignal; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; + }) => { + onExecutionStarted?.({ + jobId: "timeout-cleanup-stuck-run", + agentId: "main", + sessionId: "cron-run-session", + sessionKey: "agent:main:cron:timeout-cleanup-stuck-run:run:cron-run-session", + }); + started.resolve(); + abortSignal?.addEventListener( + "abort", + () => { + abortObserved = true; + }, + { once: true }, + ); + return await new Promise(() => {}); + }, + ), + }); + + const timerPromise = onTimer(state); + await started.promise; + await vi.advanceTimersByTimeAsync(1_100); + now += 1_100; + await timerPromise; + + expect(abortObserved).toBe(true); + expect(cleanupTimedOutAgentRun).toHaveBeenCalledWith({ + job: expect.objectContaining({ id: "timeout-cleanup-stuck-run" }), + timeoutMs: 1_000, + execution: { + jobId: "timeout-cleanup-stuck-run", + agentId: "main", + sessionId: "cron-run-session", + sessionKey: "agent:main:cron:timeout-cleanup-stuck-run:run:cron-run-session", + }, + }); + const job = state.store?.jobs.find((entry) => entry.id === "timeout-cleanup-stuck-run"); + expect(job?.state.lastStatus).toBe("error"); + expect(job?.state.lastError).toContain("timed out"); + } finally { + vi.useRealTimers(); + } + }); + it("keeps state updates when cron next-run computation throws after a successful run (#30905)", () => { const startedAt = Date.parse("2026-03-02T12:00:00.000Z"); const endedAt = startedAt + 50; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 0d1fddafd88..324d4135aa6 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -17,6 +17,7 @@ import { resolveCronDeliveryPlan } from "../delivery-plan.js"; import { createCronExecutionId } from "../run-id.js"; import { sweepCronRunSessions } from "../session-reaper.js"; import type { + CronAgentExecutionStarted, CronDeliveryStatus, CronDeliveryTrace, CronJob, @@ -45,6 +46,7 @@ import { DEFAULT_JOB_TIMEOUT_MS, resolveCronJobTimeoutMs } from "./timeout-polic export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js"; const MAX_TIMER_DELAY_MS = 60_000; +const CRON_TIMEOUT_CLEANUP_GUARD_MS = 20_000; /** * Minimum gap between consecutive fires of the same cron job. This is a @@ -108,31 +110,48 @@ export async function executeJobCoreWithTimeout( const runAbortController = new AbortController(); let timeoutId: NodeJS.Timeout | undefined; - let rejectTimeout: ((reason?: unknown) => void) | undefined; - const timeoutPromise = new Promise((_, reject) => { - rejectTimeout = reject; + let activeExecution: CronAgentExecutionStarted | undefined; + const timeoutMarker = Symbol("cron-timeout"); + let resolveTimeout: ((value: typeof timeoutMarker) => void) | undefined; + const timeoutPromise = new Promise((resolve) => { + resolveTimeout = resolve; }); - const startTimeout = () => { - if (timeoutId) { - return; - } - timeoutId = setTimeout(() => { - runAbortController.abort(timeoutErrorMessage()); - rejectTimeout?.(new Error(timeoutErrorMessage())); - }, jobTimeoutMs); - }; + const deferTimeoutUntilExecutionStart = job.sessionTarget !== "main" && job.payload.kind === "agentTurn"; + const startTimeout = () => { + if (!timeoutId) { + timeoutId = setTimeout(() => { + runAbortController.abort(timeoutErrorMessage()); + resolveTimeout?.(timeoutMarker); + }, jobTimeoutMs); + } + }; + const onExecutionStarted = (info?: CronAgentExecutionStarted) => { + activeExecution = info ?? activeExecution; + startTimeout(); + }; + const corePromise = executeJobCore(state, job, runAbortController.signal, { + onExecutionStarted: deferTimeoutUntilExecutionStart ? onExecutionStarted : undefined, + }); if (!deferTimeoutUntilExecutionStart) { startTimeout(); } + void corePromise.catch((err) => { + if (runAbortController.signal.aborted) { + state.deps.log.warn( + { jobId: job.id, err: String(err) }, + "cron: job core rejected after timeout abort", + ); + } + }); try { - return await Promise.race([ - executeJobCore(state, job, runAbortController.signal, { - onExecutionStarted: deferTimeoutUntilExecutionStart ? startTimeout : undefined, - }), - timeoutPromise, - ]); + const first = await Promise.race([corePromise, timeoutPromise]); + if (first !== timeoutMarker) { + return first; + } + await cleanupTimedOutCronAgentRun(state, job, jobTimeoutMs, activeExecution); + return { status: "error", error: timeoutErrorMessage() }; } finally { if (timeoutId) { clearTimeout(timeoutId); @@ -140,6 +159,34 @@ export async function executeJobCoreWithTimeout( } } +async function cleanupTimedOutCronAgentRun( + state: CronServiceState, + job: CronJob, + timeoutMs: number, + execution?: CronAgentExecutionStarted, +): Promise { + if (!state.deps.cleanupTimedOutAgentRun) { + return; + } + let settleTimer: NodeJS.Timeout | undefined; + const cleanupPromise = state.deps.cleanupTimedOutAgentRun({ job, timeoutMs, execution }); + const settleTimeout = new Promise((resolve) => { + settleTimer = setTimeout(resolve, CRON_TIMEOUT_CLEANUP_GUARD_MS); + }); + try { + await Promise.race([cleanupPromise, settleTimeout]); + } catch (err) { + state.deps.log.warn( + { jobId: job.id, err: String(err) }, + "cron: timed-out agent cleanup failed", + ); + } finally { + if (settleTimer) { + clearTimeout(settleTimer); + } + } +} + function resolveRunConcurrency(state: CronServiceState): number { const raw = state.deps.cronConfig?.maxConcurrentRuns; if (typeof raw !== "number" || !Number.isFinite(raw)) { @@ -1260,7 +1307,7 @@ export async function executeJobCore( job: CronJob, abortSignal?: AbortSignal, options?: { - onExecutionStarted?: () => void; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; }, ): Promise< CronRunOutcome & @@ -1418,7 +1465,7 @@ async function executeDetachedCronJob( abortSignal: AbortSignal | undefined, resolveAbortError: () => { status: "error"; error: string }, options?: { - onExecutionStarted?: () => void; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; }, ): Promise< CronRunOutcome & diff --git a/src/cron/types.ts b/src/cron/types.ts index 57e26d1397d..44e40c66e3d 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -98,6 +98,13 @@ export type CronRunOutcome = { sessionKey?: string; }; +export type CronAgentExecutionStarted = { + jobId: string; + agentId?: string; + sessionId?: string; + sessionKey?: string; +}; + export type CronFailureAlert = { after?: number; channel?: CronMessageChannel; diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index bb65320da03..d0d6c12a538 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -1,4 +1,5 @@ import { resolveDefaultAgentId } from "../agents/agent-scope.js"; +import { abortAndDrainEmbeddedPiRun } from "../agents/pi-embedded.js"; import { cleanupBrowserSessionsForLifecycleEnd } from "../browser-lifecycle-cleanup.js"; import type { CliDeps } from "../cli/deps.types.js"; import { getRuntimeConfig } from "../config/io.js"; @@ -307,6 +308,29 @@ export function buildGatewayCronService(params: { }); } }, + cleanupTimedOutAgentRun: async ({ job, execution }) => { + if (!execution?.sessionId) { + return; + } + const result = await abortAndDrainEmbeddedPiRun({ + sessionId: execution.sessionId, + sessionKey: execution.sessionKey, + settleMs: 15_000, + forceClear: true, + reason: "cron_timeout", + }); + cronLogger.warn( + { + jobId: job.id, + sessionId: execution.sessionId, + sessionKey: execution.sessionKey, + aborted: result.aborted, + drained: result.drained, + forceCleared: result.forceCleared, + }, + "cron: cleaned up timed-out agent run", + ); + }, sendCronFailureAlert: async ({ job, text, channel, to, mode, accountId }) => await sendGatewayCronFailureAlert({ deps: params.deps, diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index ea948c329e9..e03bfe7cd2e 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -18,6 +18,23 @@ const mocks = vi.hoisted(() => ({ })); vi.mock("../agents/pi-embedded-runner/runs.js", () => ({ + abortAndDrainEmbeddedPiRun: async (params: { + sessionId: string; + sessionKey?: string; + settleMs?: number; + forceClear?: boolean; + reason?: string; + }) => { + const aborted = mocks.abortEmbeddedPiRun(params.sessionId); + const drained = aborted + ? await mocks.waitForEmbeddedPiRunEnd(params.sessionId, params.settleMs) + : false; + const forceCleared = + params.forceClear === true && (!aborted || !drained) + ? mocks.forceClearEmbeddedPiRun(params.sessionId, params.sessionKey, params.reason) + : false; + return { aborted, drained, forceCleared }; + }, abortEmbeddedPiRun: mocks.abortEmbeddedPiRun, forceClearEmbeddedPiRun: mocks.forceClearEmbeddedPiRun, isEmbeddedPiRunActive: mocks.isEmbeddedPiRunActive, diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index 9d7fd0d6ae1..00dd9bcd04f 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -1,12 +1,10 @@ import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js"; import { - abortEmbeddedPiRun, - forceClearEmbeddedPiRun, + abortAndDrainEmbeddedPiRun, isEmbeddedPiRunActive, isEmbeddedPiRunHandleActive, resolveActiveEmbeddedRunSessionId, resolveActiveEmbeddedRunHandleSessionId, - waitForEmbeddedPiRunEnd, } from "../agents/pi-embedded-runner/runs.js"; import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js"; import { diagnosticLogger as diag } from "./diagnostic-runtime.js"; @@ -62,13 +60,15 @@ export async function recoverStuckDiagnosticSession( ); return; } - aborted = abortEmbeddedPiRun(activeSessionId); - if (aborted) { - drained = await waitForEmbeddedPiRunEnd(activeSessionId, STUCK_SESSION_ABORT_SETTLE_MS); - } - if (!aborted || !drained) { - forceClearEmbeddedPiRun(activeSessionId, params.sessionKey, "stuck_recovery"); - } + const result = await abortAndDrainEmbeddedPiRun({ + sessionId: activeSessionId, + sessionKey: params.sessionKey, + settleMs: STUCK_SESSION_ABORT_SETTLE_MS, + forceClear: true, + reason: "stuck_recovery", + }); + aborted = result.aborted; + drained = result.drained; } if (!activeSessionId && activeWorkSessionId && isEmbeddedPiRunActive(activeWorkSessionId)) {