fix(gateway): address restart continuation review comments

This commit is contained in:
FullerStackDev
2026-04-23 15:58:31 -06:00
committed by Ayaan Zaidi
parent 0ac81d41b6
commit 2261550633
7 changed files with 97 additions and 7 deletions

View File

@@ -213,14 +213,15 @@ async function deliverQueuedSessionDelivery(params: {
entry: QueuedSessionDelivery;
}) {
const { cfg, storePath, canonicalKey } = loadSessionEntry(params.entry.sessionKey);
const queuedDeliveryContext = resolveQueuedSessionDeliveryContext(params.entry);
if (params.entry.kind === "systemEvent") {
enqueueSystemEvent(params.entry.text, {
sessionKey: canonicalKey,
...(resolveQueuedSessionDeliveryContext(params.entry)
...(queuedDeliveryContext
? {
deliveryContext: {
...resolveQueuedSessionDeliveryContext(params.entry),
...queuedDeliveryContext,
},
}
: {}),
@@ -232,10 +233,10 @@ async function deliverQueuedSessionDelivery(params: {
if (!params.entry.route) {
enqueueSystemEvent(params.entry.message, {
sessionKey: canonicalKey,
...(resolveQueuedSessionDeliveryContext(params.entry)
...(queuedDeliveryContext
? {
deliveryContext: {
...resolveQueuedSessionDeliveryContext(params.entry),
...queuedDeliveryContext,
},
}
: {}),
@@ -383,10 +384,12 @@ async function drainRestartContinuationQueue(params: {
export async function recoverPendingRestartContinuationDeliveries(params: {
deps: CliDeps;
log?: SessionDeliveryRecoveryLogger;
maxEnqueuedAt?: number;
}) {
await recoverPendingSessionDeliveries({
deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }),
log: params.log ?? log,
maxEnqueuedAt: params.maxEnqueuedAt,
});
}

View File

@@ -104,6 +104,7 @@ describe("server-runtime-services", () => {
expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith(
expect.objectContaining({
deps: {},
maxEnqueuedAt: expect.any(Number),
}),
);
});

View File

@@ -72,6 +72,7 @@ function recoverPendingSessionDeliveries(params: {
deps: import("../cli/deps.types.js").CliDeps;
log: GatewayRuntimeServiceLogger;
}): void {
const maxEnqueuedAt = Date.now();
const timer = setTimeout(() => {
void (async () => {
const { recoverPendingRestartContinuationDeliveries } =
@@ -80,6 +81,7 @@ function recoverPendingSessionDeliveries(params: {
await recoverPendingRestartContinuationDeliveries({
deps: params.deps,
log: logRecovery,
maxEnqueuedAt,
});
})().catch((err) => params.log.error(`Session delivery recovery failed: ${String(err)}`));
}, 1_250);

View File

@@ -199,8 +199,11 @@ export async function recoverPendingSessionDeliveries(opts: {
log: SessionDeliveryRecoveryLogger;
stateDir?: string;
maxRecoveryMs?: number;
maxEnqueuedAt?: number;
}): Promise<SessionDeliveryRecoverySummary> {
const pending = await loadPendingSessionDeliveries(opts.stateDir);
const pending = (await loadPendingSessionDeliveries(opts.stateDir)).filter(
(entry) => opts.maxEnqueuedAt == null || entry.enqueuedAt <= opts.maxEnqueuedAt,
);
if (pending.length === 0) {
return createEmptyRecoverySummary();
}
@@ -223,9 +226,18 @@ export async function recoverPendingSessionDeliveries(opts: {
if (!currentEntry) {
continue;
}
if (opts.maxEnqueuedAt != null && currentEntry.enqueuedAt > opts.maxEnqueuedAt) {
continue;
}
if (currentEntry.retryCount >= MAX_SESSION_DELIVERY_RETRIES) {
summary.skippedMaxRetries += 1;
await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir).catch(() => {});
try {
await moveSessionDeliveryToFailed(currentEntry.id, opts.stateDir);
} catch (err) {
if (getErrnoCode(err) !== "ENOENT") {
throw err;
}
}
continue;
}

View File

@@ -206,7 +206,7 @@ export async function loadPendingSessionDeliveries(
}
for (const file of files) {
if (file.endsWith(".delivered")) {
if (file.endsWith(".delivered") || file.endsWith(".tmp")) {
await unlinkBestEffort(path.join(queueDir, file));
}
}

View File

@@ -65,4 +65,54 @@ describe("session-delivery queue recovery", () => {
expect(failedEntry?.lastError).toBe("transient failure");
});
});
it("skips entries queued after the startup recovery cutoff", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-04-23T00:00:00.000Z"));
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "recover old entry",
},
tempDir,
);
const maxEnqueuedAt = Date.now();
vi.setSystemTime(new Date("2026-04-23T00:00:05.000Z"));
await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "leave fresh entry queued",
},
tempDir,
);
const deliver = vi.fn(async () => undefined);
const summary = await recoverPendingSessionDeliveries({
deliver,
stateDir: tempDir,
maxEnqueuedAt,
log: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(summary.recovered).toBe(1);
const pending = await loadPendingSessionDeliveries(tempDir);
expect(pending).toHaveLength(1);
expect(pending[0]?.kind).toBe("systemEvent");
if (pending[0]?.kind === "systemEvent") {
expect(pending[0].text).toBe("leave fresh entry queued");
}
});
vi.useRealTimers();
});
});

View File

@@ -1,3 +1,5 @@
import fs from "node:fs";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
@@ -5,6 +7,7 @@ import {
enqueueSessionDelivery,
failSessionDelivery,
loadPendingSessionDeliveries,
resolveSessionDeliveryQueueDir,
} from "./session-delivery-queue.js";
describe("session-delivery queue storage", () => {
@@ -56,4 +59,23 @@ describe("session-delivery queue storage", () => {
expect(await loadPendingSessionDeliveries(tempDir)).toEqual([]);
});
});
it("cleans up orphaned temporary queue files during load", async () => {
await withTempDir({ prefix: "openclaw-session-delivery-" }, async (tempDir) => {
await enqueueSessionDelivery(
{
kind: "systemEvent",
sessionKey: "agent:main:main",
text: "restart complete",
},
tempDir,
);
const tmpPath = path.join(resolveSessionDeliveryQueueDir(tempDir), "orphan-entry.tmp");
fs.writeFileSync(tmpPath, "stale tmp");
await loadPendingSessionDeliveries(tempDir);
expect(fs.existsSync(tmpPath)).toBe(false);
});
});
});