fix(gateway): bound restart continuation recovery

This commit is contained in:
Ayaan Zaidi
2026-04-25 11:41:59 +05:30
parent 03addfe9ba
commit a903df02f5
5 changed files with 12 additions and 21 deletions

View File

@@ -91,8 +91,6 @@ async function deliverRestartSentinelNotice(params: {
session: ReturnType<typeof buildOutboundSessionContext>;
}) {
const payloads = [{ text: params.message }];
// Persist one recoverable notice across the whole retry loop so a transient
// failure does not leave behind a stale duplicate queue entry.
const queueId = await enqueueDelivery({
channel: params.channel,
to: params.to,
@@ -136,9 +134,7 @@ async function deliverRestartSentinelNotice(params: {
});
if (!retrying) {
if (queueId) {
await failDelivery(queueId, formatErrorMessage(err)).catch(() => {
// Best-effort queue bookkeeping.
});
await failDelivery(queueId, formatErrorMessage(err)).catch(() => undefined);
}
return;
}
@@ -483,7 +479,6 @@ async function loadRestartSentinelStartupTask(params: {
? String(replyTransport.threadId)
: undefined
: threadId;
// Keep the resolved route for the queued continuation and restart notice.
}
}

View File

@@ -85,6 +85,7 @@ describe("server-runtime-services", () => {
minimalTestGateway: false,
cfgAtStart: {} as never,
deps: {} as never,
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
cron,
logCron: { error: vi.fn() },
log,
@@ -104,7 +105,7 @@ describe("server-runtime-services", () => {
expect(hoisted.recoverPendingRestartContinuationDeliveries).toHaveBeenCalledWith(
expect.objectContaining({
deps: {},
maxEnqueuedAt: expect.any(Number),
maxEnqueuedAt: 123,
}),
);
});
@@ -116,6 +117,7 @@ describe("server-runtime-services", () => {
minimalTestGateway: true,
cfgAtStart: {} as never,
deps: {} as never,
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
cron,
logCron: { error: vi.fn() },
log: createLog(),

View File

@@ -71,8 +71,8 @@ function recoverPendingOutboundDeliveries(params: {
function recoverPendingSessionDeliveries(params: {
deps: import("../cli/deps.types.js").CliDeps;
log: GatewayRuntimeServiceLogger;
maxEnqueuedAt: number;
}): void {
const maxEnqueuedAt = Date.now();
const timer = setTimeout(() => {
void (async () => {
const { recoverPendingRestartContinuationDeliveries } =
@@ -81,7 +81,7 @@ function recoverPendingSessionDeliveries(params: {
await recoverPendingRestartContinuationDeliveries({
deps: params.deps,
log: logRecovery,
maxEnqueuedAt,
maxEnqueuedAt: params.maxEnqueuedAt,
});
})().catch((err) => params.log.error(`Session delivery recovery failed: ${String(err)}`));
}, 1_250);
@@ -98,7 +98,6 @@ export function startGatewayRuntimeServices(params: {
channelHealthMonitor: ChannelHealthMonitor | null;
stopModelPricingRefresh: () => void;
} {
// Keep scheduled work inert until post-attach sidecars finish.
const channelHealthMonitor = startGatewayChannelHealthMonitor({
cfg: params.cfgAtStart,
channelManager: params.channelManager,
@@ -114,14 +113,11 @@ export function startGatewayRuntimeServices(params: {
};
}
/**
* Activate cron scheduler, heartbeat runner, and pending delivery recovery
* after gateway sidecars are fully started and chat.history is available.
*/
export function activateGatewayScheduledServices(params: {
minimalTestGateway: boolean;
cfgAtStart: OpenClawConfig;
deps: import("../cli/deps.types.js").CliDeps;
sessionDeliveryRecoveryMaxEnqueuedAt: number;
cron: { start: () => Promise<void> };
logCron: { error: (message: string) => void };
log: GatewayRuntimeServiceLogger;
@@ -141,6 +137,7 @@ export function activateGatewayScheduledServices(params: {
recoverPendingSessionDeliveries({
deps: params.deps,
log: params.log,
maxEnqueuedAt: params.sessionDeliveryRecoveryMaxEnqueuedAt,
});
return { heartbeatRunner };
}

View File

@@ -811,6 +811,7 @@ export async function startGatewayServer(
});
await startListening();
startupTrace.mark("http.bound");
const sessionDeliveryRecoveryMaxEnqueuedAt = Date.now();
({
stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck,
tailscaleCleanup: runtimeState.tailscaleCleanup,
@@ -851,11 +852,11 @@ export async function startGatewayServer(
));
startupTrace.mark("ready");
// HTTP is live before sidecars finish; /readyz stays red until the startup sidecars settle.
const activated = activateGatewayScheduledServices({
minimalTestGateway,
cfgAtStart,
deps,
sessionDeliveryRecoveryMaxEnqueuedAt,
cron: runtimeState.cronState.cron,
logCron,
log,

View File

@@ -65,11 +65,7 @@ function buildEntryId(idempotencyKey?: string): string {
}
async function unlinkBestEffort(filePath: string): Promise<void> {
try {
await fs.promises.unlink(filePath);
} catch {
// Best-effort cleanup.
}
await fs.promises.unlink(filePath).catch(() => undefined);
}
async function unlinkStaleTmpBestEffort(filePath: string, now: number): Promise<void> {
@@ -245,7 +241,7 @@ export async function loadPendingSessionDeliveries(
}
entries.push(await readQueueEntry(filePath));
} catch {
// Skip malformed or inaccessible entries.
continue;
}
}
return entries;