diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a18e5e908a..a750005573e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai - Control UI/WebChat: focus the composer when users click the visible input chrome and restore larger, labeled desktop composer controls while preserving compact mobile taps. Fixes #45656. Thanks @BunsDev. - Discord: suppress generated link embeds on outbound messages by default so agent-sent URLs stay as plain links unless `channels.discord.suppressEmbeds` is disabled. - System events: keep owner downgrades in structured metadata while rendering queued prompt text as plain `System:` lines, preserving least-privilege wakeups without prompt-visible trust labels. (#82067) +- Gateway/agents: abort active embedded runs when diagnostics detect a stale native tool call, preventing nested agent sessions from staying deadlocked through restart recovery. Fixes #81976. (#82369) Thanks @joshavant. - Slack: default outbound bot link unfurls off so agent-sent URLs no longer expand into inline previews unless `channels.slack.unfurlLinks` is enabled. (#82123) Thanks @kibi-bsp. - Slack: keep finalized draft-preview replies visible when a later same-turn tool warning is delivered normally instead of clearing the edited answer. Fixes #81903. (#81979) Thanks @neeravmakwana. - Providers/Xiaomi: preserve MiMo `reasoning_content` on multi-turn tool-call replay, including custom Xiaomi-compatible proxy routes, so follow-up turns no longer fail with `400 Param Incorrect`. Fixes #81419. (#81589) Thanks @lovelefeng-glitch and @jimdawdy-hub. diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 8273d9bf7a4..33d7b79c1a8 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -401,6 +401,38 @@ describe("runGatewayLoop", () => { }); }); + it("aborts active embedded runs after a short restart drain grace", async () => { + vi.clearAllMocks(); + consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({}); + getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0); + getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValue(0); + waitForActiveTasks.mockResolvedValueOnce({ drained: false }); + waitForActiveEmbeddedRuns.mockResolvedValueOnce({ drained: false }); + + await withIsolatedSignals(async ({ captureSignal }) => { + const { start, exited } = await createSignaledLoopHarness(); + const sigterm = captureSignal("SIGTERM"); + const sigint = captureSignal("SIGINT"); + + sigterm(); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(waitForActiveTasks).toHaveBeenCalledWith(90_000); + expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(30_000); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" }); + expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" }); + expect(gatewayLog.warn).toHaveBeenCalledWith( + "active embedded run drain grace reached; aborting active run(s) before restart", + ); + expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); + expect(start).toHaveBeenCalledTimes(2); + + sigint(); + await expect(exited).resolves.toBe(0); + }); + }); + it("forces SIGTERM restarts without waiting for active task drain", async () => { vi.clearAllMocks(); consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ force: true }); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index c70bbc5cc71..041a9ed0e9c 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -10,6 +10,7 @@ import { createLazyImportLoader } from "../../shared/lazy-promise.js"; const gatewayLog = createSubsystemLogger("gateway"); const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500; const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000; +const RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS = 30_000; const RESTART_DRAIN_STILL_PENDING_WARN_MS = 30_000; const UPDATE_RESPAWN_HEALTH_TIMEOUT_MS = 10_000; const UPDATE_RESPAWN_HEALTH_POLL_MS = 200; @@ -345,6 +346,15 @@ export async function runGatewayLoop(params: { restartDrainTimeoutMs === undefined ? "without a timeout" : `with timeout ${restartDrainTimeoutMs}ms`; + const resolveActiveRunDrainWaitMs = (activeRuns: number): RestartDrainTimeoutMs => { + if (activeRuns <= 0) { + return restartDrainTimeoutMs; + } + if (restartDrainTimeoutMs === undefined) { + return RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS; + } + return Math.min(restartDrainTimeoutMs, RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS); + }; const armCloseForceExitTimerForIndefiniteRestart = () => { if (isRestart && restartDrainTimeoutMs === undefined) { armForceExitTimer(SHUTDOWN_TIMEOUT_MS); @@ -417,22 +427,40 @@ export async function runGatewayLoop(params: { gatewayLog.warn("forced restart requested; skipping active work drain"); abortEmbeddedPiRun(undefined, { mode: "all" }); } else { + const activeRunDrainWaitMs = resolveActiveRunDrainWaitMs(activeRuns); const stillPendingDrainLogger = createStillPendingDrainLogger(); - const [tasksDrain, runsDrain] = await Promise.all([ - activeTasks > 0 - ? waitForActiveTasks(restartDrainTimeoutMs) - : Promise.resolve({ drained: true }), - activeRuns > 0 - ? waitForActiveEmbeddedRuns(restartDrainTimeoutMs) - : Promise.resolve({ drained: true }), - ]).finally(() => clearInterval(stillPendingDrainLogger)); + let abortedAfterRunGrace = false; + let tasksDrain: { drained: boolean } = { drained: true }; + let runsDrain: { drained: boolean } = { drained: true }; + try { + const tasksDrainPromise = + activeTasks > 0 + ? waitForActiveTasks(restartDrainTimeoutMs) + : Promise.resolve({ drained: true }); + runsDrain = + activeRuns > 0 + ? await waitForActiveEmbeddedRuns(activeRunDrainWaitMs) + : { drained: true }; + if (!runsDrain.drained && activeRuns > 0) { + gatewayLog.warn( + "active embedded run drain grace reached; aborting active run(s) before restart", + ); + abortEmbeddedPiRun(undefined, { mode: "all" }); + abortedAfterRunGrace = true; + } + tasksDrain = await tasksDrainPromise; + } finally { + clearInterval(stillPendingDrainLogger); + } if (tasksDrain.drained && runsDrain.drained) { gatewayLog.info("all active work drained"); } else { gatewayLog.warn("drain timeout reached; proceeding with restart"); // Final best-effort abort to avoid carrying active runs into the // next lifecycle when drain time budget is exhausted. - abortEmbeddedPiRun(undefined, { mode: "all" }); + if (!abortedAfterRunGrace) { + abortEmbeddedPiRun(undefined, { mode: "all" }); + } } } } diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index 2db81e821ed..16e47f036e6 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -139,6 +139,36 @@ describe("stuck session recovery", () => { expect(mocks.resetCommandLane).not.toHaveBeenCalled(); }); + it("returns an abort outcome for a stale tool call on an active embedded run", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-tool"); + mocks.abortEmbeddedPiRun.mockReturnValue(true); + mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true); + + const outcome = await recoverStuckDiagnosticSession({ + sessionId: "session-tool", + sessionKey: "agent:main:telegram:group:-1003821464158:topic:4836", + ageMs: 147_000, + queueDepth: 1, + allowActiveAbort: true, + }); + + expect(outcome).toMatchObject({ + status: "aborted", + action: "abort_embedded_run", + sessionId: "session-tool", + sessionKey: "agent:main:telegram:group:-1003821464158:topic:4836", + activeSessionId: "session-tool", + activeWorkKind: "embedded_run", + aborted: true, + drained: true, + forceCleared: false, + released: 0, + }); + expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-tool"); + expect(mocks.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("session-tool", 15_000); + expect(mocks.resetCommandLane).not.toHaveBeenCalled(); + }); + it("logs stopped cron context when aborting an active embedded run", async () => { const previousStateDir = process.env.OPENCLAW_STATE_DIR; const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-recovery-context-")); diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 0c47e488262..b748265e2c0 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -560,9 +560,11 @@ describe("stuck session diagnostics threshold", () => { ); }); - it("does not abort embedded runs while a native tool call is active", async () => { + it("recovers stale native tool calls through the active-run abort path", async () => { const events: DiagnosticEventPayload[] = []; const recoverStuckSession = vi.fn(); + const stuckSessionWarnMs = 30_000; + const stuckSessionAbortMs = 60_000; const unsubscribe = onDiagnosticEvent((event) => { events.push(event); }); @@ -571,8 +573,8 @@ describe("stuck session diagnostics threshold", () => { { diagnostics: { enabled: true, - stuckSessionWarnMs: 30_000, - stuckSessionAbortMs: 60_000, + stuckSessionWarnMs, + stuckSessionAbortMs, }, }, { recoverStuckSession }, @@ -587,12 +589,14 @@ describe("stuck session diagnostics threshold", () => { toolCallId: "cmd-1", }); - vi.advanceTimersByTime(2 * 60_000); + vi.advanceTimersByTime(stuckSessionAbortMs - 30_000); + expect(recoverStuckSession).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(30_000); } finally { unsubscribe(); } - expect(recoverStuckSession).not.toHaveBeenCalled(); expectRecordFields( requireRecord( events.findLast((event) => event.type === "session.stalled"), @@ -606,6 +610,48 @@ describe("stuck session diagnostics threshold", () => { activeToolCallId: "cmd-1", }, ); + expectRecoveryCall( + recoverStuckSession, + { sessionId: "s1", sessionKey: "main", queueDepth: 0, allowActiveAbort: true }, + ["ageMs", "stateGeneration"], + ); + }); + + it("does not recover a recent native tool call just because the session is old", async () => { + const recoverStuckSession = vi.fn(); + const stuckSessionWarnMs = 30_000; + const stuckSessionAbortMs = 90_000; + + startDiagnosticHeartbeat( + { + diagnostics: { + enabled: true, + stuckSessionWarnMs, + stuckSessionAbortMs, + }, + }, + { recoverStuckSession }, + ); + logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" }); + getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" }).lastActivity = + Date.now() - 120_000; + markDiagnosticToolStartedForTest({ + sessionId: "s1", + sessionKey: "main", + runId: "run-1", + toolName: "bash", + toolCallId: "cmd-1", + }); + + vi.advanceTimersByTime(60_000); + expect(recoverStuckSession).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(30_000); + expectRecoveryCall( + recoverStuckSession, + { sessionId: "s1", sessionKey: "main", queueDepth: 0, allowActiveAbort: true }, + ["ageMs", "stateGeneration"], + ); }); it("uses diagnostics.stuckSessionAbortMs for stalled active-work recovery", () => { diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index b844b269888..c631ad24119 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -466,6 +466,33 @@ function isStalledEmbeddedRunRecoveryEligible(params: { ); } +function isBlockedToolCallRecoveryEligible(params: { + classification: SessionAttentionClassification | undefined; + activity?: DiagnosticSessionActivitySnapshot; + stuckSessionAbortMs: number; +}): boolean { + const toolAgeMs = params.activity?.activeToolAgeMs; + const lastProgressAgeMs = params.activity?.lastProgressAgeMs; + return ( + params.classification?.eventType === "session.stalled" && + params.classification.classification === "blocked_tool_call" && + params.classification.activeWorkKind === "tool_call" && + typeof toolAgeMs === "number" && + typeof lastProgressAgeMs === "number" && + toolAgeMs >= params.stuckSessionAbortMs && + lastProgressAgeMs >= params.stuckSessionAbortMs + ); +} + +function isActiveAbortRecoveryEligible(params: { + classification: SessionAttentionClassification | undefined; + activity?: DiagnosticSessionActivitySnapshot; + ageMs: number; + stuckSessionAbortMs: number; +}): boolean { + return isStalledEmbeddedRunRecoveryEligible(params) || isBlockedToolCallRecoveryEligible(params); +} + export function logWebhookReceived(params: { channel: string; updateType?: string; @@ -766,8 +793,9 @@ export function logSessionAttention( }); const recoveryEligible = classification.recoveryEligible || - isStalledEmbeddedRunRecoveryEligible({ + isActiveAbortRecoveryEligible({ classification, + activity, ageMs: params.ageMs, stuckSessionAbortMs: params.abortThresholdMs ?? resolveStalledEmbeddedRunAbortMs(params.thresholdMs), @@ -1007,6 +1035,10 @@ export function startDiagnosticHeartbeat( for (const [, state] of diagnosticSessionStates) { const ageMs = now - state.lastActivity; if (state.state === "processing" && ageMs > stuckSessionWarnMs) { + const activity = getDiagnosticSessionActivitySnapshot( + { sessionId: state.sessionId, sessionKey: state.sessionKey }, + now, + ); const classification = logSessionAttention({ sessionId: state.sessionId, sessionKey: state.sessionKey, @@ -1029,8 +1061,9 @@ export function startDiagnosticHeartbeat( }); } else if ( classification && - isStalledEmbeddedRunRecoveryEligible({ + isActiveAbortRecoveryEligible({ classification, + activity, ageMs, stuckSessionAbortMs, })