From a903df02f5ad5b166127a700fa4f2a56a6351162 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 25 Apr 2026 11:41:59 +0530 Subject: [PATCH] fix(gateway): bound restart continuation recovery --- src/gateway/server-restart-sentinel.ts | 7 +------ src/gateway/server-runtime-services.test.ts | 4 +++- src/gateway/server-runtime-services.ts | 11 ++++------- src/gateway/server.impl.ts | 3 ++- src/infra/session-delivery-queue-storage.ts | 8 ++------ 5 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index e5e34df19cd..446b3eaa415 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -91,8 +91,6 @@ async function deliverRestartSentinelNotice(params: { session: ReturnType; }) { const payloads = [{ text: params.message }]; - // Persist one recoverable notice across the whole retry loop so a transient - // failure does not leave behind a stale duplicate queue entry. const queueId = await enqueueDelivery({ channel: params.channel, to: params.to, @@ -136,9 +134,7 @@ async function deliverRestartSentinelNotice(params: { }); if (!retrying) { if (queueId) { - await failDelivery(queueId, formatErrorMessage(err)).catch(() => { - // Best-effort queue bookkeeping. - }); + await failDelivery(queueId, formatErrorMessage(err)).catch(() => undefined); } return; } @@ -483,7 +479,6 @@ async function loadRestartSentinelStartupTask(params: { ? String(replyTransport.threadId) : undefined : threadId; - // Keep the resolved route for the queued continuation and restart notice. } } diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index 20b20db4852..1466cfbbb91 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -85,6 +85,7 @@ describe("server-runtime-services", () => { minimalTestGateway: false, cfgAtStart: {} as never, deps: {} as never, + sessionDeliveryRecoveryMaxEnqueuedAt: 123, cron, logCron: { error: vi.fn() }, log, @@ -104,7 +105,7 @@ describe("server-runtime-services", () => { expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith( expect.objectContaining({ deps: {}, - maxEnqueuedAt: expect.any(Number), + maxEnqueuedAt: 123, }), ); }); @@ -116,6 +117,7 @@ describe("server-runtime-services", () => { minimalTestGateway: true, cfgAtStart: {} as never, deps: {} as never, + sessionDeliveryRecoveryMaxEnqueuedAt: 123, cron, logCron: { error: vi.fn() }, log: createLog(), diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index f021562ae37..70cbe4c0773 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -71,8 +71,8 @@ function recoverPendingOutboundDeliveries(params: { function recoverPendingSessionDeliveries(params: { deps: import("../cli/deps.types.js").CliDeps; log: GatewayRuntimeServiceLogger; + maxEnqueuedAt: number; }): void { - const maxEnqueuedAt = Date.now(); const timer = setTimeout(() => { void (async () => { const { recoverPendingRestartContinuationDeliveries } = @@ -81,7 +81,7 @@ function recoverPendingSessionDeliveries(params: { await recoverPendingRestartContinuationDeliveries({ deps: params.deps, log: logRecovery, - maxEnqueuedAt, + maxEnqueuedAt: params.maxEnqueuedAt, }); })().catch((err) => params.log.error(`Session delivery recovery failed: ${String(err)}`)); }, 1_250); @@ -98,7 +98,6 @@ export function startGatewayRuntimeServices(params: { channelHealthMonitor: ChannelHealthMonitor | null; stopModelPricingRefresh: () => void; } { - // Keep scheduled work inert until post-attach sidecars finish. const channelHealthMonitor = startGatewayChannelHealthMonitor({ cfg: params.cfgAtStart, channelManager: params.channelManager, @@ -114,14 +113,11 @@ export function startGatewayRuntimeServices(params: { }; } -/** - * Activate cron scheduler, heartbeat runner, and pending delivery recovery - * after gateway sidecars are fully started and chat.history is available. - */ export function activateGatewayScheduledServices(params: { minimalTestGateway: boolean; cfgAtStart: OpenClawConfig; deps: import("../cli/deps.types.js").CliDeps; + sessionDeliveryRecoveryMaxEnqueuedAt: number; cron: { start: () => Promise }; logCron: { error: (message: string) => void }; log: GatewayRuntimeServiceLogger; @@ -141,6 +137,7 @@ export function activateGatewayScheduledServices(params: { recoverPendingSessionDeliveries({ deps: params.deps, log: params.log, + maxEnqueuedAt: params.sessionDeliveryRecoveryMaxEnqueuedAt, }); return { heartbeatRunner }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 81578434e7a..f335dafb680 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -811,6 +811,7 @@ export async function startGatewayServer( }); await startListening(); startupTrace.mark("http.bound"); + const sessionDeliveryRecoveryMaxEnqueuedAt = Date.now(); ({ stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck, tailscaleCleanup: runtimeState.tailscaleCleanup, @@ -851,11 +852,11 @@ export async function startGatewayServer( )); startupTrace.mark("ready"); - // HTTP is live before sidecars finish; /readyz stays red until the startup sidecars settle. const activated = activateGatewayScheduledServices({ minimalTestGateway, cfgAtStart, deps, + sessionDeliveryRecoveryMaxEnqueuedAt, cron: runtimeState.cronState.cron, logCron, log, diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts index ba4b928f75a..f6e7a6238f8 100644 --- a/src/infra/session-delivery-queue-storage.ts +++ b/src/infra/session-delivery-queue-storage.ts @@ -65,11 +65,7 @@ function buildEntryId(idempotencyKey?: string): string { } async function unlinkBestEffort(filePath: string): Promise { - try { - await fs.promises.unlink(filePath); - } catch { - // Best-effort cleanup. - } + await fs.promises.unlink(filePath).catch(() => undefined); } async function unlinkStaleTmpBestEffort(filePath: string, now: number): Promise { @@ -245,7 +241,7 @@ export async function loadPendingSessionDeliveries( } entries.push(await readQueueEntry(filePath)); } catch { - // Skip malformed or inaccessible entries. + continue; } } return entries;