Recover stale embedded tool calls during gateway diagnostics (#82369)

* fix(gateway): recover stale embedded tool calls

* chore(changelog): note stale embedded tool recovery
This commit is contained in:
Josh Avant
2026-05-15 19:36:59 -05:00
committed by GitHub
parent 9a1fa5f23f
commit a1e208ee26
6 changed files with 186 additions and 16 deletions

View File

@@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai
- Control UI/WebChat: focus the composer when users click the visible input chrome and restore larger, labeled desktop composer controls while preserving compact mobile taps. Fixes #45656. Thanks @BunsDev.
- Discord: suppress generated link embeds on outbound messages by default so agent-sent URLs stay as plain links unless `channels.discord.suppressEmbeds` is disabled.
- System events: keep owner downgrades in structured metadata while rendering queued prompt text as plain `System:` lines, preserving least-privilege wakeups without prompt-visible trust labels. (#82067)
- Gateway/agents: abort active embedded runs when diagnostics detect a stale native tool call, preventing nested agent sessions from staying deadlocked through restart recovery. Fixes #81976. (#82369) Thanks @joshavant.
- Slack: default outbound bot link unfurls off so agent-sent URLs no longer expand into inline previews unless `channels.slack.unfurlLinks` is enabled. (#82123) Thanks @kibi-bsp.
- Slack: keep finalized draft-preview replies visible when a later same-turn tool warning is delivered normally instead of clearing the edited answer. Fixes #81903. (#81979) Thanks @neeravmakwana.
- Providers/Xiaomi: preserve MiMo `reasoning_content` on multi-turn tool-call replay, including custom Xiaomi-compatible proxy routes, so follow-up turns no longer fail with `400 Param Incorrect`. Fixes #81419. (#81589) Thanks @lovelefeng-glitch and @jimdawdy-hub.

View File

@@ -401,6 +401,38 @@ describe("runGatewayLoop", () => {
});
});
it("aborts active embedded runs after a short restart drain grace", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({});
getActiveTaskCount.mockReturnValueOnce(1).mockReturnValue(0);
getActiveEmbeddedRunCount.mockReturnValueOnce(1).mockReturnValue(0);
waitForActiveTasks.mockResolvedValueOnce({ drained: false });
waitForActiveEmbeddedRuns.mockResolvedValueOnce({ drained: false });
await withIsolatedSignals(async ({ captureSignal }) => {
const { start, exited } = await createSignaledLoopHarness();
const sigterm = captureSignal("SIGTERM");
const sigint = captureSignal("SIGINT");
sigterm();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(waitForActiveTasks).toHaveBeenCalledWith(90_000);
expect(waitForActiveEmbeddedRuns).toHaveBeenCalledWith(30_000);
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "compacting" });
expect(abortEmbeddedPiRun).toHaveBeenCalledWith(undefined, { mode: "all" });
expect(gatewayLog.warn).toHaveBeenCalledWith(
"active embedded run drain grace reached; aborting active run(s) before restart",
);
expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG);
expect(start).toHaveBeenCalledTimes(2);
sigint();
await expect(exited).resolves.toBe(0);
});
});
it("forces SIGTERM restarts without waiting for active task drain", async () => {
vi.clearAllMocks();
consumeGatewayRestartIntentPayloadSync.mockReturnValueOnce({ force: true });

View File

@@ -10,6 +10,7 @@ import { createLazyImportLoader } from "../../shared/lazy-promise.js";
const gatewayLog = createSubsystemLogger("gateway");
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000;
const RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS = 30_000;
const RESTART_DRAIN_STILL_PENDING_WARN_MS = 30_000;
const UPDATE_RESPAWN_HEALTH_TIMEOUT_MS = 10_000;
const UPDATE_RESPAWN_HEALTH_POLL_MS = 200;
@@ -345,6 +346,15 @@ export async function runGatewayLoop(params: {
restartDrainTimeoutMs === undefined
? "without a timeout"
: `with timeout ${restartDrainTimeoutMs}ms`;
const resolveActiveRunDrainWaitMs = (activeRuns: number): RestartDrainTimeoutMs => {
if (activeRuns <= 0) {
return restartDrainTimeoutMs;
}
if (restartDrainTimeoutMs === undefined) {
return RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS;
}
return Math.min(restartDrainTimeoutMs, RESTART_ACTIVE_EMBEDDED_RUN_ABORT_GRACE_MS);
};
const armCloseForceExitTimerForIndefiniteRestart = () => {
if (isRestart && restartDrainTimeoutMs === undefined) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
@@ -417,22 +427,40 @@ export async function runGatewayLoop(params: {
gatewayLog.warn("forced restart requested; skipping active work drain");
abortEmbeddedPiRun(undefined, { mode: "all" });
} else {
const activeRunDrainWaitMs = resolveActiveRunDrainWaitMs(activeRuns);
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]).finally(() => clearInterval(stillPendingDrainLogger));
let abortedAfterRunGrace = false;
let tasksDrain: { drained: boolean } = { drained: true };
let runsDrain: { drained: boolean } = { drained: true };
try {
const tasksDrainPromise =
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true });
runsDrain =
activeRuns > 0
? await waitForActiveEmbeddedRuns(activeRunDrainWaitMs)
: { drained: true };
if (!runsDrain.drained && activeRuns > 0) {
gatewayLog.warn(
"active embedded run drain grace reached; aborting active run(s) before restart",
);
abortEmbeddedPiRun(undefined, { mode: "all" });
abortedAfterRunGrace = true;
}
tasksDrain = await tasksDrainPromise;
} finally {
clearInterval(stillPendingDrainLogger);
}
if (tasksDrain.drained && runsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");
// Final best-effort abort to avoid carrying active runs into the
// next lifecycle when drain time budget is exhausted.
abortEmbeddedPiRun(undefined, { mode: "all" });
if (!abortedAfterRunGrace) {
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
}
}

View File

@@ -139,6 +139,36 @@ describe("stuck session recovery", () => {
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("returns an abort outcome for a stale tool call on an active embedded run", async () => {
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue("session-tool");
mocks.abortEmbeddedPiRun.mockReturnValue(true);
mocks.waitForEmbeddedPiRunEnd.mockResolvedValue(true);
const outcome = await recoverStuckDiagnosticSession({
sessionId: "session-tool",
sessionKey: "agent:main:telegram:group:-1003821464158:topic:4836",
ageMs: 147_000,
queueDepth: 1,
allowActiveAbort: true,
});
expect(outcome).toMatchObject({
status: "aborted",
action: "abort_embedded_run",
sessionId: "session-tool",
sessionKey: "agent:main:telegram:group:-1003821464158:topic:4836",
activeSessionId: "session-tool",
activeWorkKind: "embedded_run",
aborted: true,
drained: true,
forceCleared: false,
released: 0,
});
expect(mocks.abortEmbeddedPiRun).toHaveBeenCalledWith("session-tool");
expect(mocks.waitForEmbeddedPiRunEnd).toHaveBeenCalledWith("session-tool", 15_000);
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
});
it("logs stopped cron context when aborting an active embedded run", async () => {
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-recovery-context-"));

View File

@@ -560,9 +560,11 @@ describe("stuck session diagnostics threshold", () => {
);
});
it("does not abort embedded runs while a native tool call is active", async () => {
it("recovers stale native tool calls through the active-run abort path", async () => {
const events: DiagnosticEventPayload[] = [];
const recoverStuckSession = vi.fn();
const stuckSessionWarnMs = 30_000;
const stuckSessionAbortMs = 60_000;
const unsubscribe = onDiagnosticEvent((event) => {
events.push(event);
});
@@ -571,8 +573,8 @@ describe("stuck session diagnostics threshold", () => {
{
diagnostics: {
enabled: true,
stuckSessionWarnMs: 30_000,
stuckSessionAbortMs: 60_000,
stuckSessionWarnMs,
stuckSessionAbortMs,
},
},
{ recoverStuckSession },
@@ -587,12 +589,14 @@ describe("stuck session diagnostics threshold", () => {
toolCallId: "cmd-1",
});
vi.advanceTimersByTime(2 * 60_000);
vi.advanceTimersByTime(stuckSessionAbortMs - 30_000);
expect(recoverStuckSession).not.toHaveBeenCalled();
vi.advanceTimersByTime(30_000);
} finally {
unsubscribe();
}
expect(recoverStuckSession).not.toHaveBeenCalled();
expectRecordFields(
requireRecord(
events.findLast((event) => event.type === "session.stalled"),
@@ -606,6 +610,48 @@ describe("stuck session diagnostics threshold", () => {
activeToolCallId: "cmd-1",
},
);
expectRecoveryCall(
recoverStuckSession,
{ sessionId: "s1", sessionKey: "main", queueDepth: 0, allowActiveAbort: true },
["ageMs", "stateGeneration"],
);
});
it("does not recover a recent native tool call just because the session is old", async () => {
const recoverStuckSession = vi.fn();
const stuckSessionWarnMs = 30_000;
const stuckSessionAbortMs = 90_000;
startDiagnosticHeartbeat(
{
diagnostics: {
enabled: true,
stuckSessionWarnMs,
stuckSessionAbortMs,
},
},
{ recoverStuckSession },
);
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
getDiagnosticSessionState({ sessionId: "s1", sessionKey: "main" }).lastActivity =
Date.now() - 120_000;
markDiagnosticToolStartedForTest({
sessionId: "s1",
sessionKey: "main",
runId: "run-1",
toolName: "bash",
toolCallId: "cmd-1",
});
vi.advanceTimersByTime(60_000);
expect(recoverStuckSession).not.toHaveBeenCalled();
vi.advanceTimersByTime(30_000);
expectRecoveryCall(
recoverStuckSession,
{ sessionId: "s1", sessionKey: "main", queueDepth: 0, allowActiveAbort: true },
["ageMs", "stateGeneration"],
);
});
it("uses diagnostics.stuckSessionAbortMs for stalled active-work recovery", () => {

View File

@@ -466,6 +466,33 @@ function isStalledEmbeddedRunRecoveryEligible(params: {
);
}
function isBlockedToolCallRecoveryEligible(params: {
classification: SessionAttentionClassification | undefined;
activity?: DiagnosticSessionActivitySnapshot;
stuckSessionAbortMs: number;
}): boolean {
const toolAgeMs = params.activity?.activeToolAgeMs;
const lastProgressAgeMs = params.activity?.lastProgressAgeMs;
return (
params.classification?.eventType === "session.stalled" &&
params.classification.classification === "blocked_tool_call" &&
params.classification.activeWorkKind === "tool_call" &&
typeof toolAgeMs === "number" &&
typeof lastProgressAgeMs === "number" &&
toolAgeMs >= params.stuckSessionAbortMs &&
lastProgressAgeMs >= params.stuckSessionAbortMs
);
}
function isActiveAbortRecoveryEligible(params: {
classification: SessionAttentionClassification | undefined;
activity?: DiagnosticSessionActivitySnapshot;
ageMs: number;
stuckSessionAbortMs: number;
}): boolean {
return isStalledEmbeddedRunRecoveryEligible(params) || isBlockedToolCallRecoveryEligible(params);
}
export function logWebhookReceived(params: {
channel: string;
updateType?: string;
@@ -766,8 +793,9 @@ export function logSessionAttention(
});
const recoveryEligible =
classification.recoveryEligible ||
isStalledEmbeddedRunRecoveryEligible({
isActiveAbortRecoveryEligible({
classification,
activity,
ageMs: params.ageMs,
stuckSessionAbortMs:
params.abortThresholdMs ?? resolveStalledEmbeddedRunAbortMs(params.thresholdMs),
@@ -1007,6 +1035,10 @@ export function startDiagnosticHeartbeat(
for (const [, state] of diagnosticSessionStates) {
const ageMs = now - state.lastActivity;
if (state.state === "processing" && ageMs > stuckSessionWarnMs) {
const activity = getDiagnosticSessionActivitySnapshot(
{ sessionId: state.sessionId, sessionKey: state.sessionKey },
now,
);
const classification = logSessionAttention({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
@@ -1029,8 +1061,9 @@ export function startDiagnosticHeartbeat(
});
} else if (
classification &&
isStalledEmbeddedRunRecoveryEligible({
isActiveAbortRecoveryEligible({
classification,
activity,
ageMs,
stuckSessionAbortMs,
})