From 22615506333fa6f952e99e7e10db3745c521b8ac Mon Sep 17 00:00:00 2001 From: FullerStackDev <263060202+fuller-stack-dev@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:58:31 -0600 Subject: [PATCH] fix(gateway): address restart continuation review comments --- src/gateway/server-restart-sentinel.ts | 11 ++-- src/gateway/server-runtime-services.test.ts | 1 + src/gateway/server-runtime-services.ts | 2 + src/infra/session-delivery-queue-recovery.ts | 16 +++++- src/infra/session-delivery-queue-storage.ts | 2 +- .../session-delivery-queue.recovery.test.ts | 50 +++++++++++++++++++ .../session-delivery-queue.storage.test.ts | 22 ++++++++ 7 files changed, 97 insertions(+), 7 deletions(-) diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 19811f3c934..e5e34df19cd 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -213,14 +213,15 @@ async function deliverQueuedSessionDelivery(params: { entry: QueuedSessionDelivery; }) { const { cfg, storePath, canonicalKey } = loadSessionEntry(params.entry.sessionKey); + const queuedDeliveryContext = resolveQueuedSessionDeliveryContext(params.entry); if (params.entry.kind === "systemEvent") { enqueueSystemEvent(params.entry.text, { sessionKey: canonicalKey, - ...(resolveQueuedSessionDeliveryContext(params.entry) + ...(queuedDeliveryContext ? { deliveryContext: { - ...resolveQueuedSessionDeliveryContext(params.entry), + ...queuedDeliveryContext, }, } : {}), @@ -232,10 +233,10 @@ async function deliverQueuedSessionDelivery(params: { if (!params.entry.route) { enqueueSystemEvent(params.entry.message, { sessionKey: canonicalKey, - ...(resolveQueuedSessionDeliveryContext(params.entry) + ...(queuedDeliveryContext ? { deliveryContext: { - ...resolveQueuedSessionDeliveryContext(params.entry), + ...queuedDeliveryContext, }, } : {}), @@ -383,10 +384,12 @@ async function drainRestartContinuationQueue(params: { export async function recoverPendingRestartContinuationDeliveries(params: { deps: CliDeps; log?: SessionDeliveryRecoveryLogger; + maxEnqueuedAt?: number; }) { await recoverPendingSessionDeliveries({ deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }), log: params.log ?? log, + maxEnqueuedAt: params.maxEnqueuedAt, }); } diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index 6105891b670..20b20db4852 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -104,6 +104,7 @@ describe("server-runtime-services", () => { expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith( expect.objectContaining({ deps: {}, + maxEnqueuedAt: expect.any(Number), }), ); }); diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index 0f62c66f326..f021562ae37 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -72,6 +72,7 @@ function recoverPendingSessionDeliveries(params: { deps: import("../cli/deps.types.js").CliDeps; log: GatewayRuntimeServiceLogger; }): void { + const maxEnqueuedAt = Date.now(); const timer = setTimeout(() => { void (async () => { const { recoverPendingRestartContinuationDeliveries } = @@ -80,6 +81,7 @@ function recoverPendingSessionDeliveries(params: { await recoverPendingRestartContinuationDeliveries({ deps: params.deps, log: logRecovery, + maxEnqueuedAt, }); })().catch((err) => params.log.error(`Session delivery recovery failed: ${String(err)}`)); }, 1_250); diff --git a/src/infra/session-delivery-queue-recovery.ts b/src/infra/session-delivery-queue-recovery.ts index 803b6c90de2..86902617858 100644 --- a/src/infra/session-delivery-queue-recovery.ts +++ b/src/infra/session-delivery-queue-recovery.ts @@ -199,8 +199,11 @@ export async function recoverPendingSessionDeliveries(opts: { log: SessionDeliveryRecoveryLogger; stateDir?: string; maxRecoveryMs?: number; + maxEnqueuedAt?: number; }): Promise { - const pending = await loadPendingSessionDeliveries(opts.stateDir); + const pending = (await loadPendingSessionDeliveries(opts.stateDir)).filter( + (entry) => opts.maxEnqueuedAt == null || entry.enqueuedAt <= opts.maxEnqueuedAt, + ); if (pending.length === 0) { return createEmptyRecoverySummary(); } @@ -223,9 +226,18 @@ export async function recoverPendingSessionDeliveries(opts: { if (!currentEntry) { continue; } + if (opts.maxEnqueuedAt != null && currentEntry.enqueuedAt > opts.maxEnqueuedAt) { + continue; + } if (currentEntry.retryCount >= MAX_SESSION_DELIVERY_RETRIES) { summary.skippedMaxRetries += 1; - await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir).catch(() => {}); + try { + await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir); + } catch (err) { + if (getErrnoCode(err) !== "ENOENT") { + throw err; + } + } continue; } diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts index 59839625c23..09760c7d8fc 100644 --- a/src/infra/session-delivery-queue-storage.ts +++ b/src/infra/session-delivery-queue-storage.ts @@ -206,7 +206,7 @@ export async function loadPendingSessionDeliveries( } for (const file of files) { - if (file.endsWith(".delivered")) { + if (file.endsWith(".delivered") || file.endsWith(".tmp")) { await unlinkBestEffort(path.join(queueDir, file)); } } diff --git a/src/infra/session-delivery-queue.recovery.test.ts b/src/infra/session-delivery-queue.recovery.test.ts index 8aef5194f55..6d9662298a6 100644 --- a/src/infra/session-delivery-queue.recovery.test.ts +++ b/src/infra/session-delivery-queue.recovery.test.ts @@ -65,4 +65,54 @@ describe("session-delivery queue recovery", () => { expect(failedEntry?.lastError).toBe("transient failure"); }); }); + + it("skips entries queued after the startup recovery cutoff", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-04-23T00:00:00.000Z")); + + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "recover old entry", + }, + tempDir, + ); + const maxEnqueuedAt = Date.now(); + + vi.setSystemTime(new Date("2026-04-23T00:00:05.000Z")); + await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "leave fresh entry queued", + }, + tempDir, + ); + + const deliver = vi.fn(async () => undefined); + const summary = await recoverPendingSessionDeliveries({ + deliver, + stateDir: tempDir, + maxEnqueuedAt, + log: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, + }); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(summary.recovered).toBe(1); + const pending = await loadPendingSessionDeliveries(tempDir); + expect(pending).toHaveLength(1); + expect(pending[0]?.kind).toBe("systemEvent"); + if (pending[0]?.kind === "systemEvent") { + expect(pending[0].text).toBe("leave fresh entry queued"); + } + }); + + vi.useRealTimers(); + }); }); diff --git a/src/infra/session-delivery-queue.storage.test.ts b/src/infra/session-delivery-queue.storage.test.ts index 59ecbf3a6bd..6bfdafa256a 100644 --- a/src/infra/session-delivery-queue.storage.test.ts +++ b/src/infra/session-delivery-queue.storage.test.ts @@ -1,3 +1,5 @@ +import fs from "node:fs"; +import path from "node:path"; import { describe, expect, it } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { @@ -5,6 +7,7 @@ import { enqueueSessionDelivery, failSessionDelivery, loadPendingSessionDeliveries, + resolveSessionDeliveryQueueDir, } from "./session-delivery-queue.js"; describe("session-delivery queue storage", () => { @@ -56,4 +59,23 @@ describe("session-delivery queue storage", () => { expect(await loadPendingSessionDeliveries(tempDir)).toEqual([]); }); }); + + it("cleans up orphaned temporary queue files during load", async () => { + await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => { + await enqueueSessionDelivery( + { + kind: "systemEvent", + sessionKey: "agent:main:main", + text: "restart complete", + }, + tempDir, + ); + const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp"); + fs.writeFileSync(tmpPath, "stale tmp"); + + await loadPendingSessionDeliveries(tempDir); + + expect(fs.existsSync(tmpPath)).toBe(false); + }); + }); });