From 85ee6f296767eca452825988e2d3484e574dc63e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 11 Apr 2026 00:55:42 +0100 Subject: [PATCH] fix: stabilize live qa suite routing --- extensions/qa-channel/src/channel-actions.ts | 59 +++++++++- extensions/qa-channel/src/channel.test.ts | 53 +++++++++ extensions/qa-lab/src/gateway-child.ts | 5 +- extensions/qa-lab/src/model-switch-eval.ts | 2 + extensions/qa-lab/src/reply-failure.ts | 1 + extensions/qa-lab/src/suite.test.ts | 93 +++++++++++++++ extensions/qa-lab/src/suite.ts | 108 +++++++++++++++--- src/gateway/protocol/schema/config.ts | 22 ++++ src/gateway/server-methods/config.ts | 15 ++- src/gateway/server-methods/restart-request.ts | 41 ++++++- src/gateway/server-methods/update.ts | 13 ++- src/gateway/server-restart-sentinel.test.ts | 63 ++++++++-- src/gateway/server-restart-sentinel.ts | 8 +- 13 files changed, 445 insertions(+), 38 deletions(-) diff --git a/extensions/qa-channel/src/channel-actions.ts b/extensions/qa-channel/src/channel-actions.ts index aa67071e698..f518d1036b3 100644 --- a/extensions/qa-channel/src/channel-actions.ts +++ b/extensions/qa-channel/src/channel-actions.ts @@ -3,6 +3,7 @@ import { jsonResult, readStringParam } from "openclaw/plugin-sdk/channel-actions import { extractToolSend } from "openclaw/plugin-sdk/tool-send"; import { resolveQaChannelAccount } from "./accounts.js"; import { + buildQaTarget, createQaBusThread, deleteQaBusMessage, editQaBusMessage, @@ -43,6 +44,33 @@ function listQaChannelActions( return Array.from(actions); } +function readQaSendText(params: Record) { + return ( + readStringParam(params, "message", { allowEmpty: true }) ?? + readStringParam(params, "text", { allowEmpty: true }) ?? + readStringParam(params, "content", { allowEmpty: true }) + ); +} + +function readQaSendTarget(params: Record) { + const explicitTo = readStringParam(params, "to"); + if (explicitTo) { + return explicitTo; + } + const channelId = readStringParam(params, "channelId"); + if (channelId) { + return buildQaTarget({ chatType: "channel", conversationId: channelId }); + } + const target = readStringParam(params, "target"); + if (!target) { + return undefined; + } + if (/^(dm|channel):|^thread:[^/]+\/.+/i.test(target)) { + return target; + } + return buildQaTarget({ chatType: "channel", conversationId: target }); +} + export const qaChannelMessageActions: ChannelMessageActionAdapter = { describeMessageTool: (context) => ({ actions: listQaChannelActions(context.cfg as CoreConfig, context.accountId), @@ -60,8 +88,13 @@ export const qaChannelMessageActions: ChannelMessageActionAdapter = { }), extractToolSend: ({ args }: { args: Record }) => { const action = typeof args.action === "string" ? args.action.trim() : ""; + if (action === "send") { + const to = readQaSendTarget(args); + const threadId = readStringParam(args, "threadId"); + return to ? { to, threadId } : null; + } if (action === "sendMessage") { - return extractToolSend(args, "sendMessage"); + return extractToolSend(args, "sendMessage") ?? null; } if (action === "threadReply") { const channelId = typeof args.channelId === "string" ? args.channelId.trim() : ""; @@ -76,6 +109,30 @@ export const qaChannelMessageActions: ChannelMessageActionAdapter = { const baseUrl = account.baseUrl; switch (action) { + case "send": { + const to = readQaSendTarget(params); + const text = readQaSendText(params); + if (!to || text === undefined) { + throw new Error("qa-channel send requires to/target and message/text"); + } + const parsed = parseQaTarget(to); + const threadId = readStringParam(params, "threadId") ?? parsed.threadId; + const { message } = await sendQaBusMessage({ + baseUrl, + accountId: account.accountId, + to: buildQaTarget({ + chatType: parsed.chatType, + conversationId: parsed.conversationId, + threadId, + }), + text, + senderId: account.botUserId, + senderName: account.botDisplayName, + threadId, + replyToId: readStringParam(params, "replyTo") ?? readStringParam(params, "replyToId"), + }); + return jsonResult({ message }); + } case "thread-create": { const channelId = readStringParam(params, "channelId") ?? diff --git a/extensions/qa-channel/src/channel.test.ts b/extensions/qa-channel/src/channel.test.ts index 163f94b5780..8b399143936 100644 --- a/extensions/qa-channel/src/channel.test.ts +++ b/extensions/qa-channel/src/channel.test.ts @@ -221,4 +221,57 @@ describe("qa-channel plugin", () => { await bus.stop(); } }); + + it("routes the advertised send action to the qa bus", async () => { + const state = createQaBusState(); + const bus = await startQaBusServer({ state }); + + try { + const cfg = { + channels: { + "qa-channel": { + baseUrl: bus.baseUrl, + botUserId: "openclaw", + botDisplayName: "OpenClaw QA", + }, + }, + }; + + const sendTarget = qaChannelPlugin.actions?.extractToolSend?.({ + args: { + action: "send", + target: "qa-room", + message: "hello", + }, + }); + expect(sendTarget).toEqual({ to: "channel:qa-room", threadId: undefined }); + + const result = await qaChannelPlugin.actions?.handleAction?.({ + channel: "qa-channel", + action: "send", + cfg, + accountId: "default", + params: { + target: "qa-room", + message: "hello from action", + }, + }); + const payload = extractToolPayload(result); + expect(payload).toMatchObject({ message: { text: "hello from action" } }); + + const outbound = await state.waitFor({ + kind: "message-text", + direction: "outbound", + textIncludes: "hello from action", + timeoutMs: 5_000, + }); + expect("conversation" in outbound).toBe(true); + if (!("conversation" in outbound)) { + throw new Error("expected outbound message match"); + } + expect(outbound.conversation).toMatchObject({ id: "qa-room", kind: "channel" }); + } finally { + await bus.stop(); + } + }); }); diff --git a/extensions/qa-lab/src/gateway-child.ts b/extensions/qa-lab/src/gateway-child.ts index 004f286ac8e..947b7227e42 100644 --- a/extensions/qa-lab/src/gateway-child.ts +++ b/extensions/qa-lab/src/gateway-child.ts @@ -947,9 +947,10 @@ export async function startQaGatewayChild(params: { async call( method: string, rpcParams?: unknown, - opts?: { expectFinal?: boolean; timeoutMs?: number }, + opts?: { expectFinal?: boolean; timeoutMs?: number; retryOnRestart?: boolean }, ) { const timeoutMs = opts?.timeoutMs ?? 20_000; + const retryOnRestart = opts?.retryOnRestart !== false; let lastDetails = ""; for (let attempt = 1; attempt <= 3; attempt += 1) { try { @@ -960,7 +961,7 @@ export async function startQaGatewayChild(params: { } catch (error) { const details = formatErrorMessage(error); lastDetails = details; - if (attempt >= 3 || !isRetryableGatewayCallError(details)) { + if (attempt >= 3 || !retryOnRestart || !isRetryableGatewayCallError(details)) { throw new Error(`${details}\nGateway logs:\n${logs()}`, { cause: error }); } await waitForGatewayReady({ diff --git a/extensions/qa-lab/src/model-switch-eval.ts b/extensions/qa-lab/src/model-switch-eval.ts index f696122af48..4b6f0f35b63 100644 --- a/extensions/qa-lab/src/model-switch-eval.ts +++ b/extensions/qa-lab/src/model-switch-eval.ts @@ -6,6 +6,8 @@ export function hasModelSwitchContinuityEvidence(text: string) { lower.includes("handoff") || lower.includes("model switch") || lower.includes("switched"); const mentionsKickoffTask = lower.includes("qa_kickoff_task") || + lower.includes("qa/scenarios/index.md") || + lower.includes("scenario pack") || lower.includes("kickoff task") || lower.includes("kickoff note") || lower.includes("qa mission") || diff --git a/extensions/qa-lab/src/reply-failure.ts b/extensions/qa-lab/src/reply-failure.ts index 4c8c65a038c..60e9e5f56a4 100644 --- a/extensions/qa-lab/src/reply-failure.ts +++ b/extensions/qa-lab/src/reply-failure.ts @@ -9,6 +9,7 @@ const FAILURE_REPLY_PREFIXES = [ "⚠️ model login expired on the gateway", "⚠️ model login failed on the gateway", "⚠️ agent failed before reply:", + "⚠️ ✉️ message failed", "⚠️ no api key found for provider ", "⚠️ missing api key for ", ]; diff --git a/extensions/qa-lab/src/suite.test.ts b/extensions/qa-lab/src/suite.test.ts index 1300d4c1c92..d9a28647c05 100644 --- a/extensions/qa-lab/src/suite.test.ts +++ b/extensions/qa-lab/src/suite.test.ts @@ -3,6 +3,24 @@ import { createQaBusState } from "./bus-state.js"; import { qaSuiteTesting } from "./suite.js"; describe("qa suite failure reply handling", () => { + const makeScenario = ( + id: string, + config?: Record, + ): Parameters[0]["scenarios"][number] => + ({ + id, + title: id, + surface: "test", + objective: "test", + successCriteria: ["test"], + sourcePath: `qa/scenarios/${id}.md`, + execution: { + kind: "flow", + config, + flow: { steps: [{ name: "noop", actions: [{ assert: "true" }] }] }, + }, + }) as Parameters[0]["scenarios"][number]; + it("normalizes suite concurrency to a bounded integer", () => { const previous = process.env.OPENCLAW_QA_SUITE_CONCURRENCY; delete process.env.OPENCLAW_QA_SUITE_CONCURRENCY; @@ -36,6 +54,63 @@ describe("qa suite failure reply handling", () => { expect(result).toEqual([10, 20, 30, 40]); }); + it("keeps explicitly requested provider-specific scenarios", () => { + const scenarios = [ + makeScenario("generic"), + makeScenario("anthropic-only", { + requiredProvider: "anthropic", + requiredModel: "claude-opus-4-6", + }), + ]; + + expect( + qaSuiteTesting + .selectQaSuiteScenarios({ + scenarios, + scenarioIds: ["anthropic-only"], + providerMode: "live-frontier", + primaryModel: "openai/gpt-5.4", + }) + .map((scenario) => scenario.id), + ).toEqual(["anthropic-only"]); + }); + + it("filters provider-specific scenarios from an implicit live lane", () => { + const scenarios = [ + makeScenario("generic"), + makeScenario("openai-only", { requiredProvider: "openai", requiredModel: "gpt-5.4" }), + makeScenario("anthropic-only", { + requiredProvider: "anthropic", + requiredModel: "claude-opus-4-6", + }), + makeScenario("claude-subscription", { + requiredProvider: "claude-cli", + authMode: "subscription", + }), + ]; + + expect( + qaSuiteTesting + .selectQaSuiteScenarios({ + scenarios, + providerMode: "live-frontier", + primaryModel: "openai/gpt-5.4", + }) + .map((scenario) => scenario.id), + ).toEqual(["generic", "openai-only"]); + + expect( + qaSuiteTesting + .selectQaSuiteScenarios({ + scenarios, + providerMode: "live-frontier", + primaryModel: "claude-cli/claude-sonnet-4-6", + claudeCliAuthMode: "subscription", + }) + .map((scenario) => scenario.id), + ).toEqual(["generic", "claude-subscription"]); + }); + it("reads retry-after from the primary gateway error before appended logs", () => { const error = new Error( "rate limit exceeded for config.patch; retry after 38s\nGateway logs:\nprevious config changed since last load", @@ -87,6 +162,24 @@ describe("qa suite failure reply handling", () => { await expect(pending).rejects.toThrow('No API key found for provider "openai".'); }); + it("treats QA channel message delivery failures as failure replies", async () => { + const state = createQaBusState(); + const pending = qaSuiteTesting.waitForOutboundMessage( + state, + (candidate) => candidate.text.includes("QA-RESTART"), + 5_000, + ); + + state.addOutboundMessage({ + to: "channel:qa-room", + text: "⚠️ ✉️ Message failed", + senderId: "openclaw", + senderName: "OpenClaw QA", + }); + + await expect(pending).rejects.toThrow("Message failed"); + }); + it("fails raw scenario waitForCondition calls when a classified failure reply arrives", async () => { const state = createQaBusState(); const waitForCondition = qaSuiteTesting.createScenarioWaitForCondition(state); diff --git a/extensions/qa-lab/src/suite.ts b/extensions/qa-lab/src/suite.ts index 9cfb3e2be46..d077cbe1055 100644 --- a/extensions/qa-lab/src/suite.ts +++ b/extensions/qa-lab/src/suite.ts @@ -180,6 +180,68 @@ function splitModelRef(ref: string) { }; } +function normalizeQaConfigString(value: unknown): string | undefined { + return typeof value === "string" && value.trim() ? value.trim() : undefined; +} + +function scenarioMatchesLiveLane(params: { + scenario: ReturnType["scenarios"][number]; + primaryModel: string; + providerMode: "mock-openai" | "live-frontier"; + claudeCliAuthMode?: QaCliBackendAuthMode; +}) { + if (params.providerMode !== "live-frontier") { + return true; + } + const selected = splitModelRef(params.primaryModel); + const config = params.scenario.execution.config ?? {}; + const requiredProvider = normalizeQaConfigString(config.requiredProvider); + if (requiredProvider && selected?.provider !== requiredProvider) { + return false; + } + const requiredModel = normalizeQaConfigString(config.requiredModel); + if (requiredModel && selected?.model !== requiredModel) { + return false; + } + const requiredAuthMode = normalizeQaConfigString(config.authMode); + if (requiredAuthMode && params.claudeCliAuthMode !== requiredAuthMode) { + return false; + } + return true; +} + +function selectQaSuiteScenarios(params: { + scenarios: ReturnType["scenarios"]; + scenarioIds?: string[]; + providerMode: "mock-openai" | "live-frontier"; + primaryModel: string; + claudeCliAuthMode?: QaCliBackendAuthMode; +}) { + const requestedScenarioIds = + params.scenarioIds && params.scenarioIds.length > 0 ? new Set(params.scenarioIds) : null; + const requestedScenarios = requestedScenarioIds + ? params.scenarios.filter((scenario) => requestedScenarioIds.has(scenario.id)) + : params.scenarios; + if (requestedScenarioIds) { + const foundScenarioIds = new Set(requestedScenarios.map((scenario) => scenario.id)); + const missingScenarioIds = [...requestedScenarioIds].filter( + (scenarioId) => !foundScenarioIds.has(scenarioId), + ); + if (missingScenarioIds.length > 0) { + throw new Error(`unknown QA scenario id(s): ${missingScenarioIds.join(", ")}`); + } + return requestedScenarios; + } + return requestedScenarios.filter((scenario) => + scenarioMatchesLiveLane({ + scenario, + providerMode: params.providerMode, + primaryModel: params.primaryModel, + claudeCliAuthMode: params.claudeCliAuthMode, + }), + ); +} + function liveTurnTimeoutMs(env: QaSuiteEnvironment, fallbackMs: number) { return resolveQaLiveTurnTimeoutMs(env, fallbackMs); } @@ -525,6 +587,12 @@ async function runConfigMutation(params: { action: "config.patch" | "config.apply"; raw: string; sessionKey?: string; + deliveryContext?: { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + }; note?: string; restartDelayMs?: number; }) { @@ -539,10 +607,11 @@ async function runConfigMutation(params: { raw: params.raw, baseHash: snapshot.hash, ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + ...(params.deliveryContext ? { deliveryContext: params.deliveryContext } : {}), ...(params.note ? { note: params.note } : {}), restartDelayMs, }, - { timeoutMs: 45_000 }, + { timeoutMs: 45_000, retryOnRestart: false }, ); await waitForConfigRestartSettle(params.env, restartDelayMs); return result; @@ -576,6 +645,12 @@ async function patchConfig(params: { env: QaSuiteEnvironment; patch: Record; sessionKey?: string; + deliveryContext?: { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + }; note?: string; restartDelayMs?: number; }) { @@ -584,6 +659,7 @@ async function patchConfig(params: { action: "config.patch", raw: JSON.stringify(params.patch, null, 2), sessionKey: params.sessionKey, + deliveryContext: params.deliveryContext, note: params.note, restartDelayMs: params.restartDelayMs, }); @@ -593,6 +669,12 @@ async function applyConfig(params: { env: QaSuiteEnvironment; nextConfig: Record; sessionKey?: string; + deliveryContext?: { + channel?: string; + to?: string; + accountId?: string; + threadId?: string | number; + }; note?: string; restartDelayMs?: number; }) { @@ -601,6 +683,7 @@ async function applyConfig(params: { action: "config.apply", raw: JSON.stringify(params.nextConfig, null, 2), sessionKey: params.sessionKey, + deliveryContext: params.deliveryContext, note: params.note, restartDelayMs: params.restartDelayMs, }); @@ -1205,6 +1288,8 @@ export const qaSuiteTesting = { isConfigHashConflict, mapQaSuiteWithConcurrency, normalizeQaSuiteConcurrency, + scenarioMatchesLiveLane, + selectQaSuiteScenarios, waitForOutboundMessage, }; @@ -1303,20 +1388,13 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise 0 ? new Set(params.scenarioIds) : null; - const selectedCatalogScenarios = requestedScenarioIds - ? catalog.scenarios.filter((scenario) => requestedScenarioIds.has(scenario.id)) - : catalog.scenarios; - if (requestedScenarioIds) { - const foundScenarioIds = new Set(selectedCatalogScenarios.map((scenario) => scenario.id)); - const missingScenarioIds = [...requestedScenarioIds].filter( - (scenarioId) => !foundScenarioIds.has(scenarioId), - ); - if (missingScenarioIds.length > 0) { - throw new Error(`unknown QA scenario id(s): ${missingScenarioIds.join(", ")}`); - } - } + const selectedCatalogScenarios = selectQaSuiteScenarios({ + scenarios: catalog.scenarios, + scenarioIds: params?.scenarioIds, + providerMode, + primaryModel, + claudeCliAuthMode: params?.claudeCliAuthMode, + }); const concurrency = normalizeQaSuiteConcurrency( params?.concurrency, selectedCatalogScenarios.length, diff --git a/src/gateway/protocol/schema/config.ts b/src/gateway/protocol/schema/config.ts index 9d0ec876668..47d63a83da8 100644 --- a/src/gateway/protocol/schema/config.ts +++ b/src/gateway/protocol/schema/config.ts @@ -22,6 +22,17 @@ const ConfigApplyLikeParamsSchema = Type.Object( raw: NonEmptyString, baseHash: Type.Optional(NonEmptyString), sessionKey: Type.Optional(Type.String()), + deliveryContext: Type.Optional( + Type.Object( + { + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + accountId: Type.Optional(Type.String()), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])), + }, + { additionalProperties: false }, + ), + ), note: Type.Optional(Type.String()), restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })), }, @@ -43,6 +54,17 @@ export const ConfigSchemaLookupParamsSchema = Type.Object( export const UpdateRunParamsSchema = Type.Object( { sessionKey: Type.Optional(Type.String()), + deliveryContext: Type.Optional( + Type.Object( + { + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + accountId: Type.Optional(Type.String()), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])), + }, + { additionalProperties: false }, + ), + ), note: Type.Optional(Type.String()), restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })), timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })), diff --git a/src/gateway/server-methods/config.ts b/src/gateway/server-methods/config.ts index 581f64e6509..009b9c8d380 100644 --- a/src/gateway/server-methods/config.ts +++ b/src/gateway/server-methods/config.ts @@ -328,18 +328,25 @@ function resolveConfigRestartRequest(params: unknown): { deliveryContext: ReturnType["deliveryContext"]; threadId: ReturnType["threadId"]; } { - const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params); + const { + sessionKey, + deliveryContext: requestedDeliveryContext, + threadId: requestedThreadId, + note, + restartDelayMs, + } = parseRestartRequestParams(params); // Extract deliveryContext + threadId for routing after restart. // Uses generic :thread: parsing plus plugin-owned session grammars. - const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + const { deliveryContext: sessionDeliveryContext, threadId: sessionThreadId } = + extractDeliveryInfo(sessionKey); return { sessionKey, note, restartDelayMs, - deliveryContext, - threadId, + deliveryContext: requestedDeliveryContext ?? sessionDeliveryContext, + threadId: requestedThreadId ?? sessionThreadId, }; } diff --git a/src/gateway/server-methods/restart-request.ts b/src/gateway/server-methods/restart-request.ts index 1b011e5efe2..8dd47633166 100644 --- a/src/gateway/server-methods/restart-request.ts +++ b/src/gateway/server-methods/restart-request.ts @@ -1,16 +1,55 @@ import { normalizeOptionalString } from "../../shared/string-coerce.js"; +type RestartDeliveryContext = { + channel?: string; + to?: string; + accountId?: string; +}; + +function parseRestartDeliveryContext(params: unknown): { + deliveryContext: RestartDeliveryContext | undefined; + threadId: string | undefined; +} { + const raw = (params as { deliveryContext?: unknown }).deliveryContext; + if (!raw || typeof raw !== "object" || Array.isArray(raw)) { + return { deliveryContext: undefined, threadId: undefined }; + } + const context = raw as { + channel?: unknown; + to?: unknown; + accountId?: unknown; + threadId?: unknown; + }; + const deliveryContext: RestartDeliveryContext = { + channel: normalizeOptionalString(context.channel), + to: normalizeOptionalString(context.to), + accountId: normalizeOptionalString(context.accountId), + }; + const normalizedContext = + deliveryContext.channel || deliveryContext.to || deliveryContext.accountId + ? deliveryContext + : undefined; + const threadId = + typeof context.threadId === "number" && Number.isFinite(context.threadId) + ? String(Math.trunc(context.threadId)) + : normalizeOptionalString(context.threadId); + return { deliveryContext: normalizedContext, threadId }; +} + export function parseRestartRequestParams(params: unknown): { sessionKey: string | undefined; + deliveryContext: RestartDeliveryContext | undefined; + threadId: string | undefined; note: string | undefined; restartDelayMs: number | undefined; } { const sessionKey = normalizeOptionalString((params as { sessionKey?: unknown }).sessionKey); + const { deliveryContext, threadId } = parseRestartDeliveryContext(params); const note = normalizeOptionalString((params as { note?: unknown }).note); const restartDelayMsRaw = (params as { restartDelayMs?: unknown }).restartDelayMs; const restartDelayMs = typeof restartDelayMsRaw === "number" && Number.isFinite(restartDelayMsRaw) ? Math.max(0, Math.floor(restartDelayMsRaw)) : undefined; - return { sessionKey, note, restartDelayMs }; + return { sessionKey, deliveryContext, threadId, note, restartDelayMs }; } diff --git a/src/gateway/server-methods/update.ts b/src/gateway/server-methods/update.ts index bf53e2a0227..47f4f0fe8d9 100644 --- a/src/gateway/server-methods/update.ts +++ b/src/gateway/server-methods/update.ts @@ -21,8 +21,17 @@ export const updateHandlers: GatewayRequestHandlers = { return; } const actor = resolveControlPlaneActor(client); - const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params); - const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + const { + sessionKey, + deliveryContext: requestedDeliveryContext, + threadId: requestedThreadId, + note, + restartDelayMs, + } = parseRestartRequestParams(params); + const { deliveryContext: sessionDeliveryContext, threadId: sessionThreadId } = + extractDeliveryInfo(sessionKey); + const deliveryContext = requestedDeliveryContext ?? sessionDeliveryContext; + const threadId = requestedThreadId ?? sessionThreadId; const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs; const timeoutMs = typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw) diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 91a1853e8e1..467b5ae0723 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -34,7 +34,10 @@ const mocks = vi.hoisted(() => ({ })), getChannelPlugin: vi.fn(() => undefined), normalizeChannelId: vi.fn((channel: string) => channel), - resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })), + resolveOutboundTarget: vi.fn((_params?: { to?: string }) => ({ + ok: true as const, + to: "+15550002", + })), deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]), enqueueDelivery: vi.fn(async () => "queue-1"), ackDelivery: vi.fn(async () => {}), @@ -196,7 +199,7 @@ describe("scheduleRestartSentinelWake", () => { const wakePromise = scheduleRestartSentinelWake({ deps: {} as never }); await Promise.resolve(); await Promise.resolve(); - await vi.advanceTimersByTimeAsync(750); + await vi.advanceTimersByTimeAsync(1_000); await wakePromise; expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1); @@ -218,31 +221,31 @@ describe("scheduleRestartSentinelWake", () => { expect(mocks.enqueueSystemEvent).toHaveBeenCalledTimes(1); expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1); expect(mocks.logWarn).toHaveBeenCalledWith( - expect.stringContaining("retrying in 750ms"), + expect.stringContaining("retrying in 1000ms"), expect.objectContaining({ channel: "whatsapp", to: "+15550002", sessionKey: "agent:main:main", attempt: 1, - maxAttempts: 2, + maxAttempts: 45, }), ); }); it("keeps one queued restart notice when outbound retries are exhausted", async () => { vi.useFakeTimers(); - mocks.deliverOutboundPayloads - .mockRejectedValueOnce(new Error("transport not ready")) - .mockRejectedValueOnce(new Error("transport still not ready")); + mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready")); const wakePromise = scheduleRestartSentinelWake({ deps: {} as never }); await Promise.resolve(); await Promise.resolve(); - await vi.advanceTimersByTimeAsync(750); + for (let attempt = 1; attempt < 45; attempt += 1) { + await vi.advanceTimersByTimeAsync(1_000); + } await wakePromise; expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1); - expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(2); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(45); expect(mocks.ackDelivery).not.toHaveBeenCalled(); expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready"); }); @@ -316,6 +319,48 @@ describe("scheduleRestartSentinelWake", () => { expect(mocks.resolveOutboundTarget).not.toHaveBeenCalled(); }); + it("resolves session routing before queueing the heartbeat wake", async () => { + mocks.consumeRestartSentinel.mockResolvedValue({ + payload: { + sessionKey: "agent:main:qa-channel:channel:qa-room", + }, + } as Awaited>); + mocks.parseSessionThreadInfo.mockReturnValue({ + baseSessionKey: "agent:main:qa-channel:channel:qa-room", + threadId: undefined, + }); + mocks.deliveryContextFromSession.mockReturnValue({ + channel: "qa-channel", + to: "channel:qa-room", + }); + mocks.requestHeartbeatNow.mockImplementation(() => { + mocks.deliveryContextFromSession.mockReturnValue({ + channel: "qa-channel", + to: "heartbeat", + }); + }); + mocks.resolveOutboundTarget.mockImplementation((params?: { to?: string }) => ({ + ok: true as const, + to: params?.to ?? "missing", + })); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1); + expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "qa-channel", + to: "channel:qa-room", + }), + ); + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "qa-channel", + to: "channel:qa-room", + }), + ); + }); + it("merges base session routing into partial thread metadata", async () => { mocks.consumeRestartSentinel.mockResolvedValue({ payload: { diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index 01497823cab..c766833b2ec 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -19,8 +19,8 @@ import { deliveryContextFromSession, mergeDeliveryContext } from "../utils/deliv import { loadSessionEntry } from "./session-utils.js"; const log = createSubsystemLogger("gateway/restart-sentinel"); -const OUTBOUND_RETRY_DELAY_MS = 750; -const OUTBOUND_MAX_ATTEMPTS = 2; +const OUTBOUND_RETRY_DELAY_MS = 1_000; +const OUTBOUND_MAX_ATTEMPTS = 45; function hasRoutableDeliveryContext(context?: { channel?: string; @@ -145,8 +145,6 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { return; } - enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); - const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey); const { cfg, entry } = loadSessionEntry(sessionKey); @@ -169,6 +167,8 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext); + enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext); + const channelRaw = origin?.channel; const channel = channelRaw ? normalizeChannelId(channelRaw) : null; const to = origin?.to;