From 0c8c620d6f2de31e4ee37127db71caab936ffd3d Mon Sep 17 00:00:00 2001 From: VACInc <3279061+VACInc@users.noreply.github.com> Date: Fri, 8 May 2026 07:39:48 -0400 Subject: [PATCH] fix: retry restart continuations during shutdown --- CHANGELOG.md | 2 +- src/auto-reply/reply/agent-runner.ts | 3 +- src/auto-reply/reply/get-reply-run-queue.ts | 5 +- src/gateway/server-restart-sentinel.test.ts | 74 +++++++++++++++++++++ src/gateway/server-restart-sentinel.ts | 71 ++++++++++++++++---- src/infra/session-delivery-queue.ts | 1 + 6 files changed, 139 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e5c9f1aa89..fec490737ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -212,7 +212,7 @@ Docs: https://docs.openclaw.ai - CLI/router: when `openclaw ` does not match a CLI subcommand, check plugin tool manifests first so names like `lcm_recent` get an agent-tool diagnostic instead of the misleading suggestion to add the tool name to `plugins.allow`. Fixes #77214. Thanks @100yenadmin. - QA-lab/parity: bump the live mock-openai parity baseline from `claude-opus-4-6`/`claude-sonnet-4-6` to `claude-opus-4-7`/`claude-sonnet-4-7` and the candidate alt from `gpt-5.4-alt` to `gpt-5.5-alt` in `openclaw-release-checks.yml` and `qa-live-transports-convex.yml`, matching the active Opus 4.7 / GPT-5.5 defaults already used elsewhere on main. Carries forward the surface-bump portion of #74290. Thanks @100yenadmin. - QA-lab/scenarios: raise the `approval-turn-tool-followthrough` per-turn fallback timeouts from 20s/30s to 60s so cold mock-gateway parity runs do not flake on the approval-turn chain. Carries forward the timeout-bump portion of #74290. Thanks @100yenadmin. -- Gateway/restart continuation: treat routed post-reboot agent turns as trusted internal continuations while preserving the original Telegram topic route, so owner-only tools remain available for chained restart workflows after reboot. +- Gateway/restart continuation: treat routed post-reboot agent turns as trusted internal continuations while preserving the original Telegram topic route, and retry briefly when the previous run is still shutting down, so owner-only tools remain available for chained restart workflows after reboot. - Agents/compaction: keep the recent tail after manual `/compact` when Pi returns an empty or no-op compaction summary, preventing blank checkpoints from replacing the live context. - Native commands: handle slash commands before workspace and agent-reply bootstrap so Telegram `/status` and other command-only native replies do not wait behind full agent turn setup. - Telegram/groups: include the recent local chat window and nearby reply-target window as generic inbound context so stale reply ancestry does not overshadow the live group conversation. diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index a099dbf3074..0c9253171c8 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -70,6 +70,7 @@ import { resolveQueuedReplyExecutionConfig } from "./agent-runner-utils.js"; import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js"; import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; +import { REPLY_RUN_STILL_SHUTTING_DOWN_TEXT } from "./get-reply-run-queue.js"; import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js"; import { drainPendingToolTasks } from "./pending-tool-task-drain.js"; import { readPostCompactionContext } from "./post-compaction-context.js"; @@ -1170,7 +1171,7 @@ export async function runReplyAgent(params: { if (error instanceof ReplyRunAlreadyActiveError) { typing.cleanup(); return markReplyPayloadForSourceSuppressionDelivery({ - text: "⚠️ Previous run is still shutting down. Please try again in a moment.", + text: REPLY_RUN_STILL_SHUTTING_DOWN_TEXT, }); } throw error; diff --git a/src/auto-reply/reply/get-reply-run-queue.ts b/src/auto-reply/reply/get-reply-run-queue.ts index a8262cc2e25..5179fbe2563 100644 --- a/src/auto-reply/reply/get-reply-run-queue.ts +++ b/src/auto-reply/reply/get-reply-run-queue.ts @@ -9,6 +9,9 @@ export type ReplyRunQueueBusyState = { isStreaming: boolean; }; +export const REPLY_RUN_STILL_SHUTTING_DOWN_TEXT = + "⚠️ Previous run is still shutting down. Please try again in a moment."; + export async function resolvePreparedReplyQueueState(params: { activeRunQueueAction: ActiveRunQueueAction; activeSessionId: string | undefined; @@ -40,7 +43,7 @@ export async function resolvePreparedReplyQueueState(params: { return { kind: "reply", reply: { - text: "⚠️ Previous run is still shutting down. Please try again in a moment.", + text: REPLY_RUN_STILL_SHUTTING_DOWN_TEXT, }, }; } diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 75ff4e204b3..271f1bfcd9d 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -78,6 +78,7 @@ const mocks = vi.hoisted(() => { state.queuedSessionDelivery = payload; return "session-delivery-1"; }), + loadPendingSessionDelivery: vi.fn(async () => state.queuedSessionDelivery), drainPendingSessionDeliveries: vi.fn( async (params: { logLabel: string; @@ -99,7 +100,13 @@ const mocks = vi.hoisted(() => { } try { await params.deliver(entry); + state.queuedSessionDelivery = null; } catch (err) { + state.queuedSessionDelivery = { + ...entry, + retryCount: entry.retryCount + 1, + lastError: err instanceof Error ? err.message : String(err), + }; params.log.warn(`${params.logLabel}: retry failed for entry ${entry.id}: ${String(err)}`); } }, @@ -157,6 +164,7 @@ vi.mock("../infra/restart-sentinel.js", () => ({ vi.mock("../infra/session-delivery-queue.js", () => ({ enqueueSessionDelivery: mocks.enqueueSessionDelivery, + loadPendingSessionDelivery: mocks.loadPendingSessionDelivery, drainPendingSessionDeliveries: mocks.drainPendingSessionDeliveries, recoverPendingSessionDeliveries: mocks.recoverPendingSessionDeliveries, })); @@ -320,6 +328,7 @@ describe("scheduleRestartSentinelWake", () => { mocks.enqueueSystemEvent.mockClear(); mocks.requestHeartbeat.mockClear(); mocks.enqueueSessionDelivery.mockClear(); + mocks.loadPendingSessionDelivery.mockClear(); mocks.drainPendingSessionDeliveries.mockClear(); mocks.recoverPendingSessionDeliveries.mockClear(); mocks.removeRestartSentinelFile.mockClear(); @@ -990,6 +999,71 @@ describe("scheduleRestartSentinelWake", () => { ); }); + it("retries restart continuations when the previous run is still shutting down", async () => { + const busyReply = "⚠️ Previous run is still shutting down. Please try again in a moment."; + mocks.readRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:main", + deliveryContext: { + channel: "whatsapp", + to: "+15550002", + accountId: "acct-2", + }, + ts: 123, + continuation: { + kind: "agentTurn", + message: "continue", + }, + }, + } as Awaited>); + mocks.recordInboundSessionAndDispatchReply + .mockImplementationOnce(async (params) => { + await params.deliver({ text: busyReply }); + }) + .mockImplementationOnce(async (params) => { + await params.deliver({ + text: "done", + replyToId: String(params.ctxPayload.MessageSid), + }); + }); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(2); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + ctxPayload: expect.objectContaining({ + MessageSid: "restart-sentinel:agent:main:main:agentTurn:123", + }), + }), + ); + expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + ctxPayload: expect.objectContaining({ + MessageSid: "restart-sentinel:agent:main:main:agentTurn:123:retry:1", + }), + }), + ); + const deliveredBusyReply = ( + mocks.deliverOutboundPayloads.mock.calls as unknown as Array< + [{ payloads?: Array<{ text?: string }> }] + > + ).some(([call]) => call.payloads?.some((payload) => payload.text === busyReply) === true); + expect(deliveredBusyReply).toBe(false); + expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith( + expect.objectContaining({ + payloads: [{ text: "done" }], + }), + ); + expect(mocks.logWarn).toHaveBeenCalledWith( + expect.stringContaining( + "retry failed for entry session-delivery-1: Error: restart continuation deferred because previous run is still shutting down", + ), + ); + }); + it("falls back to a session wake when restart routing cannot resolve a destination", async () => { mocks.readRestartSentinel.mockResolvedValue({ payload: { diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index e5fe3c47969..9b53fb173ac 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -1,4 +1,5 @@ import { resolveSessionAgentId } from "../agents/agent-scope.js"; +import { REPLY_RUN_STILL_SHUTTING_DOWN_TEXT } from "../auto-reply/reply/get-reply-run-queue.js"; import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import type { ChatType } from "../channels/chat-type.js"; @@ -27,6 +28,7 @@ import { import { drainPendingSessionDeliveries, enqueueSessionDelivery, + loadPendingSessionDelivery, recoverPendingSessionDeliveries, type QueuedSessionDelivery, type QueuedSessionDeliveryPayload, @@ -49,8 +51,14 @@ import { runStartupTasks, type StartupTask } from "./startup-tasks.js"; const log = createSubsystemLogger("gateway/restart-sentinel"); const OUTBOUND_RETRY_DELAY_MS = 1_000; const OUTBOUND_MAX_ATTEMPTS = 45; +const RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS = process.env.VITEST ? 1 : 6_000; +const RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS = 5; +const RESTART_CONTINUATION_BUSY_RETRY_ERROR = + "restart continuation deferred because previous run is still shutting down"; let latestUpdateRestartSentinel: RestartSentinelPayload | null = null; +type QueuedAgentTurnSessionDelivery = Extract; + function cloneRestartSentinelPayload( payload: RestartSentinelPayload | null, ): RestartSentinelPayload | null { @@ -203,6 +211,23 @@ function resolveRestartContinuationOutboundPayload(params: { return params.replyToId ? { ...payload, replyToId: params.replyToId } : payload; } +function isRestartContinuationBusyPayload(payload: OutboundReplyPayload): boolean { + return ( + typeof payload.text === "string" && payload.text.trim() === REPLY_RUN_STILL_SHUTTING_DOWN_TEXT + ); +} + +function isRestartContinuationBusyRetry(entry: QueuedSessionDelivery | null): boolean { + return entry?.lastError === RESTART_CONTINUATION_BUSY_RETRY_ERROR; +} + +function resolveQueuedRestartContinuationMessageId(entry: QueuedAgentTurnSessionDelivery): string { + if (isRestartContinuationBusyRetry(entry) && entry.retryCount > 0) { + return `${entry.messageId}:retry:${entry.retryCount}`; + } + return entry.messageId; +} + function resolveQueuedSessionDeliveryContext(entry: QueuedSessionDelivery): | { channel?: string; @@ -270,7 +295,7 @@ async function deliverQueuedSessionDelivery(params: { } const route = params.entry.route; - const messageId = params.entry.messageId; + const messageId = resolveQueuedRestartContinuationMessageId(params.entry); const userMessage = params.entry.message.trim(); const agentId = resolveSessionAgentId({ sessionKey: canonicalKey, @@ -320,12 +345,16 @@ async function deliverQueuedSessionDelivery(params: { recordInboundSession, dispatchReplyWithBufferedBlockDispatcher, delivery: { - preparePayload: (payload) => - resolveRestartContinuationOutboundPayload({ + preparePayload: (payload) => { + if (isRestartContinuationBusyPayload(payload)) { + throw new Error(RESTART_CONTINUATION_BUSY_RETRY_ERROR); + } + return resolveRestartContinuationOutboundPayload({ payload, messageId, replyToId: route.replyToId, - }), + }); + }, durable: (_payload, info) => info.kind === "final" ? { @@ -421,16 +450,30 @@ async function drainRestartContinuationQueue(params: { entryId: string; log: SessionDeliveryRecoveryLogger; }) { - await drainPendingSessionDeliveries({ - drainKey: `restart-continuation:${params.entryId}`, - logLabel: "restart continuation", - log: params.log, - deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }), - selectEntry: (entry) => ({ - match: entry.id === params.entryId, - bypassBackoff: true, - }), - }); + for (let attempt = 1; attempt <= RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS; attempt += 1) { + await drainPendingSessionDeliveries({ + drainKey: `restart-continuation:${params.entryId}`, + logLabel: "restart continuation", + log: params.log, + deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }), + selectEntry: (entry) => ({ + match: entry.id === params.entryId, + bypassBackoff: true, + }), + }); + + const queued = await loadPendingSessionDelivery(params.entryId); + if (!isRestartContinuationBusyRetry(queued)) { + return; + } + if (attempt >= RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS) { + return; + } + params.log.info( + `restart continuation: entry ${params.entryId} still waiting for the previous run to clear; retrying in ${RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS}ms`, + ); + await waitForOutboundRetry(RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS); + } } export async function recoverPendingRestartContinuationDeliveries(params: { diff --git a/src/infra/session-delivery-queue.ts b/src/infra/session-delivery-queue.ts index 8c0d590d1c6..c5e307fb6eb 100644 --- a/src/infra/session-delivery-queue.ts +++ b/src/infra/session-delivery-queue.ts @@ -2,6 +2,7 @@ export { ackSessionDelivery, enqueueSessionDelivery, failSessionDelivery, + loadPendingSessionDelivery, loadPendingSessionDeliveries, resolveSessionDeliveryQueueDir, } from "./session-delivery-queue-storage.js";