fix: retry restart continuations during shutdown

This commit is contained in:
VACInc
2026-05-08 07:39:48 -04:00
committed by Ayaan Zaidi
parent 09be7de2b2
commit 0c8c620d6f
6 changed files with 139 additions and 17 deletions

View File

@@ -212,7 +212,7 @@ Docs: https://docs.openclaw.ai
- CLI/router: when `openclaw <name>` does not match a CLI subcommand, check plugin tool manifests first so names like `lcm_recent` get an agent-tool diagnostic instead of the misleading suggestion to add the tool name to `plugins.allow`. Fixes #77214. Thanks @100yenadmin.
- QA-lab/parity: bump the live mock-openai parity baseline from `claude-opus-4-6`/`claude-sonnet-4-6` to `claude-opus-4-7`/`claude-sonnet-4-7` and the candidate alt from `gpt-5.4-alt` to `gpt-5.5-alt` in `openclaw-release-checks.yml` and `qa-live-transports-convex.yml`, matching the active Opus 4.7 / GPT-5.5 defaults already used elsewhere on main. Carries forward the surface-bump portion of #74290. Thanks @100yenadmin.
- QA-lab/scenarios: raise the `approval-turn-tool-followthrough` per-turn fallback timeouts from 20s/30s to 60s so cold mock-gateway parity runs do not flake on the approval-turn chain. Carries forward the timeout-bump portion of #74290. Thanks @100yenadmin.
- Gateway/restart continuation: treat routed post-reboot agent turns as trusted internal continuations while preserving the original Telegram topic route, so owner-only tools remain available for chained restart workflows after reboot.
- Gateway/restart continuation: treat routed post-reboot agent turns as trusted internal continuations while preserving the original Telegram topic route, and retry briefly when the previous run is still shutting down, so owner-only tools remain available for chained restart workflows after reboot.
- Agents/compaction: keep the recent tail after manual `/compact` when Pi returns an empty or no-op compaction summary, preventing blank checkpoints from replacing the live context.
- Native commands: handle slash commands before workspace and agent-reply bootstrap so Telegram `/status` and other command-only native replies do not wait behind full agent turn setup.
- Telegram/groups: include the recent local chat window and nearby reply-target window as generic inbound context so stale reply ancestry does not overshadow the live group conversation.

View File

@@ -70,6 +70,7 @@ import { resolveQueuedReplyExecutionConfig } from "./agent-runner-utils.js";
import { createAudioAsVoiceBuffer, createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
import { createFollowupRunner } from "./followup-runner.js";
import { REPLY_RUN_STILL_SHUTTING_DOWN_TEXT } from "./get-reply-run-queue.js";
import { resolveOriginMessageProvider, resolveOriginMessageTo } from "./origin-routing.js";
import { drainPendingToolTasks } from "./pending-tool-task-drain.js";
import { readPostCompactionContext } from "./post-compaction-context.js";
@@ -1170,7 +1171,7 @@ export async function runReplyAgent(params: {
if (error instanceof ReplyRunAlreadyActiveError) {
typing.cleanup();
return markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ Previous run is still shutting down. Please try again in a moment.",
text: REPLY_RUN_STILL_SHUTTING_DOWN_TEXT,
});
}
throw error;

View File

@@ -9,6 +9,9 @@ export type ReplyRunQueueBusyState = {
isStreaming: boolean;
};
export const REPLY_RUN_STILL_SHUTTING_DOWN_TEXT =
"⚠️ Previous run is still shutting down. Please try again in a moment.";
export async function resolvePreparedReplyQueueState(params: {
activeRunQueueAction: ActiveRunQueueAction;
activeSessionId: string | undefined;
@@ -40,7 +43,7 @@ export async function resolvePreparedReplyQueueState(params: {
return {
kind: "reply",
reply: {
text: "⚠️ Previous run is still shutting down. Please try again in a moment.",
text: REPLY_RUN_STILL_SHUTTING_DOWN_TEXT,
},
};
}

View File

@@ -78,6 +78,7 @@ const mocks = vi.hoisted(() => {
state.queuedSessionDelivery = payload;
return "session-delivery-1";
}),
loadPendingSessionDelivery: vi.fn(async () => state.queuedSessionDelivery),
drainPendingSessionDeliveries: vi.fn(
async (params: {
logLabel: string;
@@ -99,7 +100,13 @@ const mocks = vi.hoisted(() => {
}
try {
await params.deliver(entry);
state.queuedSessionDelivery = null;
} catch (err) {
state.queuedSessionDelivery = {
...entry,
retryCount: entry.retryCount + 1,
lastError: err instanceof Error ? err.message : String(err),
};
params.log.warn(`${params.logLabel}: retry failed for entry ${entry.id}: ${String(err)}`);
}
},
@@ -157,6 +164,7 @@ vi.mock("../infra/restart-sentinel.js", () => ({
vi.mock("../infra/session-delivery-queue.js", () => ({
enqueueSessionDelivery: mocks.enqueueSessionDelivery,
loadPendingSessionDelivery: mocks.loadPendingSessionDelivery,
drainPendingSessionDeliveries: mocks.drainPendingSessionDeliveries,
recoverPendingSessionDeliveries: mocks.recoverPendingSessionDeliveries,
}));
@@ -320,6 +328,7 @@ describe("scheduleRestartSentinelWake", () => {
mocks.enqueueSystemEvent.mockClear();
mocks.requestHeartbeat.mockClear();
mocks.enqueueSessionDelivery.mockClear();
mocks.loadPendingSessionDelivery.mockClear();
mocks.drainPendingSessionDeliveries.mockClear();
mocks.recoverPendingSessionDeliveries.mockClear();
mocks.removeRestartSentinelFile.mockClear();
@@ -990,6 +999,71 @@ describe("scheduleRestartSentinelWake", () => {
);
});
it("retries restart continuations when the previous run is still shutting down", async () => {
const busyReply = "⚠️ Previous run is still shutting down. Please try again in a moment.";
mocks.readRestartSentinel.mockResolvedValue({
payload: {
sessionKey: "agent:main:main",
deliveryContext: {
channel: "whatsapp",
to: "+15550002",
accountId: "acct-2",
},
ts: 123,
continuation: {
kind: "agentTurn",
message: "continue",
},
},
} as Awaited<ReturnType<typeof mocks.readRestartSentinel>>);
mocks.recordInboundSessionAndDispatchReply
.mockImplementationOnce(async (params) => {
await params.deliver({ text: busyReply });
})
.mockImplementationOnce(async (params) => {
await params.deliver({
text: "done",
replyToId: String(params.ctxPayload.MessageSid),
});
});
await scheduleRestartSentinelWake({ deps: {} as never });
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenCalledTimes(2);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
ctxPayload: expect.objectContaining({
MessageSid: "restart-sentinel:agent:main:main:agentTurn:123",
}),
}),
);
expect(mocks.recordInboundSessionAndDispatchReply).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
ctxPayload: expect.objectContaining({
MessageSid: "restart-sentinel:agent:main:main:agentTurn:123:retry:1",
}),
}),
);
const deliveredBusyReply = (
mocks.deliverOutboundPayloads.mock.calls as unknown as Array<
[{ payloads?: Array<{ text?: string }> }]
>
).some(([call]) => call.payloads?.some((payload) => payload.text === busyReply) === true);
expect(deliveredBusyReply).toBe(false);
expect(mocks.deliverOutboundPayloads).toHaveBeenLastCalledWith(
expect.objectContaining({
payloads: [{ text: "done" }],
}),
);
expect(mocks.logWarn).toHaveBeenCalledWith(
expect.stringContaining(
"retry failed for entry session-delivery-1: Error: restart continuation deferred because previous run is still shutting down",
),
);
});
it("falls back to a session wake when restart routing cannot resolve a destination", async () => {
mocks.readRestartSentinel.mockResolvedValue({
payload: {

View File

@@ -1,4 +1,5 @@
import { resolveSessionAgentId } from "../agents/agent-scope.js";
import { REPLY_RUN_STILL_SHUTTING_DOWN_TEXT } from "../auto-reply/reply/get-reply-run-queue.js";
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
import type { ChatType } from "../channels/chat-type.js";
@@ -27,6 +28,7 @@ import {
import {
drainPendingSessionDeliveries,
enqueueSessionDelivery,
loadPendingSessionDelivery,
recoverPendingSessionDeliveries,
type QueuedSessionDelivery,
type QueuedSessionDeliveryPayload,
@@ -49,8 +51,14 @@ import { runStartupTasks, type StartupTask } from "./startup-tasks.js";
const log = createSubsystemLogger("gateway/restart-sentinel");
const OUTBOUND_RETRY_DELAY_MS = 1_000;
const OUTBOUND_MAX_ATTEMPTS = 45;
const RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS = process.env.VITEST ? 1 : 6_000;
const RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS = 5;
const RESTART_CONTINUATION_BUSY_RETRY_ERROR =
"restart continuation deferred because previous run is still shutting down";
let latestUpdateRestartSentinel: RestartSentinelPayload | null = null;
type QueuedAgentTurnSessionDelivery = Extract<QueuedSessionDelivery, { kind: "agentTurn" }>;
function cloneRestartSentinelPayload(
payload: RestartSentinelPayload | null,
): RestartSentinelPayload | null {
@@ -203,6 +211,23 @@ function resolveRestartContinuationOutboundPayload(params: {
return params.replyToId ? { ...payload, replyToId: params.replyToId } : payload;
}
function isRestartContinuationBusyPayload(payload: OutboundReplyPayload): boolean {
return (
typeof payload.text === "string" && payload.text.trim() === REPLY_RUN_STILL_SHUTTING_DOWN_TEXT
);
}
function isRestartContinuationBusyRetry(entry: QueuedSessionDelivery | null): boolean {
return entry?.lastError === RESTART_CONTINUATION_BUSY_RETRY_ERROR;
}
function resolveQueuedRestartContinuationMessageId(entry: QueuedAgentTurnSessionDelivery): string {
if (isRestartContinuationBusyRetry(entry) && entry.retryCount > 0) {
return `${entry.messageId}:retry:${entry.retryCount}`;
}
return entry.messageId;
}
function resolveQueuedSessionDeliveryContext(entry: QueuedSessionDelivery):
| {
channel?: string;
@@ -270,7 +295,7 @@ async function deliverQueuedSessionDelivery(params: {
}
const route = params.entry.route;
const messageId = params.entry.messageId;
const messageId = resolveQueuedRestartContinuationMessageId(params.entry);
const userMessage = params.entry.message.trim();
const agentId = resolveSessionAgentId({
sessionKey: canonicalKey,
@@ -320,12 +345,16 @@ async function deliverQueuedSessionDelivery(params: {
recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher,
delivery: {
preparePayload: (payload) =>
resolveRestartContinuationOutboundPayload({
preparePayload: (payload) => {
if (isRestartContinuationBusyPayload(payload)) {
throw new Error(RESTART_CONTINUATION_BUSY_RETRY_ERROR);
}
return resolveRestartContinuationOutboundPayload({
payload,
messageId,
replyToId: route.replyToId,
}),
});
},
durable: (_payload, info) =>
info.kind === "final"
? {
@@ -421,16 +450,30 @@ async function drainRestartContinuationQueue(params: {
entryId: string;
log: SessionDeliveryRecoveryLogger;
}) {
await drainPendingSessionDeliveries({
drainKey: `restart-continuation:${params.entryId}`,
logLabel: "restart continuation",
log: params.log,
deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }),
selectEntry: (entry) => ({
match: entry.id === params.entryId,
bypassBackoff: true,
}),
});
for (let attempt = 1; attempt <= RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS; attempt += 1) {
await drainPendingSessionDeliveries({
drainKey: `restart-continuation:${params.entryId}`,
logLabel: "restart continuation",
log: params.log,
deliver: (entry) => deliverQueuedSessionDelivery({ deps: params.deps, entry }),
selectEntry: (entry) => ({
match: entry.id === params.entryId,
bypassBackoff: true,
}),
});
const queued = await loadPendingSessionDelivery(params.entryId);
if (!isRestartContinuationBusyRetry(queued)) {
return;
}
if (attempt >= RESTART_CONTINUATION_BUSY_MAX_ATTEMPTS) {
return;
}
params.log.info(
`restart continuation: entry ${params.entryId} still waiting for the previous run to clear; retrying in ${RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS}ms`,
);
await waitForOutboundRetry(RESTART_CONTINUATION_BUSY_RETRY_DELAY_MS);
}
}
export async function recoverPendingRestartContinuationDeliveries(params: {

View File

@@ -2,6 +2,7 @@ export {
ackSessionDelivery,
enqueueSessionDelivery,
failSessionDelivery,
loadPendingSessionDelivery,
loadPendingSessionDeliveries,
resolveSessionDeliveryQueueDir,
} from "./session-delivery-queue-storage.js";