mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
fix: stabilize live qa suite routing
This commit is contained in:
@@ -3,6 +3,7 @@ import { jsonResult, readStringParam } from "openclaw/plugin-sdk/channel-actions
|
||||
import { extractToolSend } from "openclaw/plugin-sdk/tool-send";
|
||||
import { resolveQaChannelAccount } from "./accounts.js";
|
||||
import {
|
||||
buildQaTarget,
|
||||
createQaBusThread,
|
||||
deleteQaBusMessage,
|
||||
editQaBusMessage,
|
||||
@@ -43,6 +44,33 @@ function listQaChannelActions(
|
||||
return Array.from(actions);
|
||||
}
|
||||
|
||||
function readQaSendText(params: Record<string, unknown>) {
|
||||
return (
|
||||
readStringParam(params, "message", { allowEmpty: true }) ??
|
||||
readStringParam(params, "text", { allowEmpty: true }) ??
|
||||
readStringParam(params, "content", { allowEmpty: true })
|
||||
);
|
||||
}
|
||||
|
||||
function readQaSendTarget(params: Record<string, unknown>) {
|
||||
const explicitTo = readStringParam(params, "to");
|
||||
if (explicitTo) {
|
||||
return explicitTo;
|
||||
}
|
||||
const channelId = readStringParam(params, "channelId");
|
||||
if (channelId) {
|
||||
return buildQaTarget({ chatType: "channel", conversationId: channelId });
|
||||
}
|
||||
const target = readStringParam(params, "target");
|
||||
if (!target) {
|
||||
return undefined;
|
||||
}
|
||||
if (/^(dm|channel):|^thread:[^/]+\/.+/i.test(target)) {
|
||||
return target;
|
||||
}
|
||||
return buildQaTarget({ chatType: "channel", conversationId: target });
|
||||
}
|
||||
|
||||
export const qaChannelMessageActions: ChannelMessageActionAdapter = {
|
||||
describeMessageTool: (context) => ({
|
||||
actions: listQaChannelActions(context.cfg as CoreConfig, context.accountId),
|
||||
@@ -60,8 +88,13 @@ export const qaChannelMessageActions: ChannelMessageActionAdapter = {
|
||||
}),
|
||||
extractToolSend: ({ args }: { args: Record<string, unknown> }) => {
|
||||
const action = typeof args.action === "string" ? args.action.trim() : "";
|
||||
if (action === "send") {
|
||||
const to = readQaSendTarget(args);
|
||||
const threadId = readStringParam(args, "threadId");
|
||||
return to ? { to, threadId } : null;
|
||||
}
|
||||
if (action === "sendMessage") {
|
||||
return extractToolSend(args, "sendMessage");
|
||||
return extractToolSend(args, "sendMessage") ?? null;
|
||||
}
|
||||
if (action === "threadReply") {
|
||||
const channelId = typeof args.channelId === "string" ? args.channelId.trim() : "";
|
||||
@@ -76,6 +109,30 @@ export const qaChannelMessageActions: ChannelMessageActionAdapter = {
|
||||
const baseUrl = account.baseUrl;
|
||||
|
||||
switch (action) {
|
||||
case "send": {
|
||||
const to = readQaSendTarget(params);
|
||||
const text = readQaSendText(params);
|
||||
if (!to || text === undefined) {
|
||||
throw new Error("qa-channel send requires to/target and message/text");
|
||||
}
|
||||
const parsed = parseQaTarget(to);
|
||||
const threadId = readStringParam(params, "threadId") ?? parsed.threadId;
|
||||
const { message } = await sendQaBusMessage({
|
||||
baseUrl,
|
||||
accountId: account.accountId,
|
||||
to: buildQaTarget({
|
||||
chatType: parsed.chatType,
|
||||
conversationId: parsed.conversationId,
|
||||
threadId,
|
||||
}),
|
||||
text,
|
||||
senderId: account.botUserId,
|
||||
senderName: account.botDisplayName,
|
||||
threadId,
|
||||
replyToId: readStringParam(params, "replyTo") ?? readStringParam(params, "replyToId"),
|
||||
});
|
||||
return jsonResult({ message });
|
||||
}
|
||||
case "thread-create": {
|
||||
const channelId =
|
||||
readStringParam(params, "channelId") ??
|
||||
|
||||
@@ -221,4 +221,57 @@ describe("qa-channel plugin", () => {
|
||||
await bus.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("routes the advertised send action to the qa bus", async () => {
|
||||
const state = createQaBusState();
|
||||
const bus = await startQaBusServer({ state });
|
||||
|
||||
try {
|
||||
const cfg = {
|
||||
channels: {
|
||||
"qa-channel": {
|
||||
baseUrl: bus.baseUrl,
|
||||
botUserId: "openclaw",
|
||||
botDisplayName: "OpenClaw QA",
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const sendTarget = qaChannelPlugin.actions?.extractToolSend?.({
|
||||
args: {
|
||||
action: "send",
|
||||
target: "qa-room",
|
||||
message: "hello",
|
||||
},
|
||||
});
|
||||
expect(sendTarget).toEqual({ to: "channel:qa-room", threadId: undefined });
|
||||
|
||||
const result = await qaChannelPlugin.actions?.handleAction?.({
|
||||
channel: "qa-channel",
|
||||
action: "send",
|
||||
cfg,
|
||||
accountId: "default",
|
||||
params: {
|
||||
target: "qa-room",
|
||||
message: "hello from action",
|
||||
},
|
||||
});
|
||||
const payload = extractToolPayload(result);
|
||||
expect(payload).toMatchObject({ message: { text: "hello from action" } });
|
||||
|
||||
const outbound = await state.waitFor({
|
||||
kind: "message-text",
|
||||
direction: "outbound",
|
||||
textIncludes: "hello from action",
|
||||
timeoutMs: 5_000,
|
||||
});
|
||||
expect("conversation" in outbound).toBe(true);
|
||||
if (!("conversation" in outbound)) {
|
||||
throw new Error("expected outbound message match");
|
||||
}
|
||||
expect(outbound.conversation).toMatchObject({ id: "qa-room", kind: "channel" });
|
||||
} finally {
|
||||
await bus.stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -947,9 +947,10 @@ export async function startQaGatewayChild(params: {
|
||||
async call(
|
||||
method: string,
|
||||
rpcParams?: unknown,
|
||||
opts?: { expectFinal?: boolean; timeoutMs?: number },
|
||||
opts?: { expectFinal?: boolean; timeoutMs?: number; retryOnRestart?: boolean },
|
||||
) {
|
||||
const timeoutMs = opts?.timeoutMs ?? 20_000;
|
||||
const retryOnRestart = opts?.retryOnRestart !== false;
|
||||
let lastDetails = "";
|
||||
for (let attempt = 1; attempt <= 3; attempt += 1) {
|
||||
try {
|
||||
@@ -960,7 +961,7 @@ export async function startQaGatewayChild(params: {
|
||||
} catch (error) {
|
||||
const details = formatErrorMessage(error);
|
||||
lastDetails = details;
|
||||
if (attempt >= 3 || !isRetryableGatewayCallError(details)) {
|
||||
if (attempt >= 3 || !retryOnRestart || !isRetryableGatewayCallError(details)) {
|
||||
throw new Error(`${details}\nGateway logs:\n${logs()}`, { cause: error });
|
||||
}
|
||||
await waitForGatewayReady({
|
||||
|
||||
@@ -6,6 +6,8 @@ export function hasModelSwitchContinuityEvidence(text: string) {
|
||||
lower.includes("handoff") || lower.includes("model switch") || lower.includes("switched");
|
||||
const mentionsKickoffTask =
|
||||
lower.includes("qa_kickoff_task") ||
|
||||
lower.includes("qa/scenarios/index.md") ||
|
||||
lower.includes("scenario pack") ||
|
||||
lower.includes("kickoff task") ||
|
||||
lower.includes("kickoff note") ||
|
||||
lower.includes("qa mission") ||
|
||||
|
||||
@@ -9,6 +9,7 @@ const FAILURE_REPLY_PREFIXES = [
|
||||
"⚠️ model login expired on the gateway",
|
||||
"⚠️ model login failed on the gateway",
|
||||
"⚠️ agent failed before reply:",
|
||||
"⚠️ ✉️ message failed",
|
||||
"⚠️ no api key found for provider ",
|
||||
"⚠️ missing api key for ",
|
||||
];
|
||||
|
||||
@@ -3,6 +3,24 @@ import { createQaBusState } from "./bus-state.js";
|
||||
import { qaSuiteTesting } from "./suite.js";
|
||||
|
||||
describe("qa suite failure reply handling", () => {
|
||||
const makeScenario = (
|
||||
id: string,
|
||||
config?: Record<string, unknown>,
|
||||
): Parameters<typeof qaSuiteTesting.selectQaSuiteScenarios>[0]["scenarios"][number] =>
|
||||
({
|
||||
id,
|
||||
title: id,
|
||||
surface: "test",
|
||||
objective: "test",
|
||||
successCriteria: ["test"],
|
||||
sourcePath: `qa/scenarios/${id}.md`,
|
||||
execution: {
|
||||
kind: "flow",
|
||||
config,
|
||||
flow: { steps: [{ name: "noop", actions: [{ assert: "true" }] }] },
|
||||
},
|
||||
}) as Parameters<typeof qaSuiteTesting.selectQaSuiteScenarios>[0]["scenarios"][number];
|
||||
|
||||
it("normalizes suite concurrency to a bounded integer", () => {
|
||||
const previous = process.env.OPENCLAW_QA_SUITE_CONCURRENCY;
|
||||
delete process.env.OPENCLAW_QA_SUITE_CONCURRENCY;
|
||||
@@ -36,6 +54,63 @@ describe("qa suite failure reply handling", () => {
|
||||
expect(result).toEqual([10, 20, 30, 40]);
|
||||
});
|
||||
|
||||
it("keeps explicitly requested provider-specific scenarios", () => {
|
||||
const scenarios = [
|
||||
makeScenario("generic"),
|
||||
makeScenario("anthropic-only", {
|
||||
requiredProvider: "anthropic",
|
||||
requiredModel: "claude-opus-4-6",
|
||||
}),
|
||||
];
|
||||
|
||||
expect(
|
||||
qaSuiteTesting
|
||||
.selectQaSuiteScenarios({
|
||||
scenarios,
|
||||
scenarioIds: ["anthropic-only"],
|
||||
providerMode: "live-frontier",
|
||||
primaryModel: "openai/gpt-5.4",
|
||||
})
|
||||
.map((scenario) => scenario.id),
|
||||
).toEqual(["anthropic-only"]);
|
||||
});
|
||||
|
||||
it("filters provider-specific scenarios from an implicit live lane", () => {
|
||||
const scenarios = [
|
||||
makeScenario("generic"),
|
||||
makeScenario("openai-only", { requiredProvider: "openai", requiredModel: "gpt-5.4" }),
|
||||
makeScenario("anthropic-only", {
|
||||
requiredProvider: "anthropic",
|
||||
requiredModel: "claude-opus-4-6",
|
||||
}),
|
||||
makeScenario("claude-subscription", {
|
||||
requiredProvider: "claude-cli",
|
||||
authMode: "subscription",
|
||||
}),
|
||||
];
|
||||
|
||||
expect(
|
||||
qaSuiteTesting
|
||||
.selectQaSuiteScenarios({
|
||||
scenarios,
|
||||
providerMode: "live-frontier",
|
||||
primaryModel: "openai/gpt-5.4",
|
||||
})
|
||||
.map((scenario) => scenario.id),
|
||||
).toEqual(["generic", "openai-only"]);
|
||||
|
||||
expect(
|
||||
qaSuiteTesting
|
||||
.selectQaSuiteScenarios({
|
||||
scenarios,
|
||||
providerMode: "live-frontier",
|
||||
primaryModel: "claude-cli/claude-sonnet-4-6",
|
||||
claudeCliAuthMode: "subscription",
|
||||
})
|
||||
.map((scenario) => scenario.id),
|
||||
).toEqual(["generic", "claude-subscription"]);
|
||||
});
|
||||
|
||||
it("reads retry-after from the primary gateway error before appended logs", () => {
|
||||
const error = new Error(
|
||||
"rate limit exceeded for config.patch; retry after 38s\nGateway logs:\nprevious config changed since last load",
|
||||
@@ -87,6 +162,24 @@ describe("qa suite failure reply handling", () => {
|
||||
await expect(pending).rejects.toThrow('No API key found for provider "openai".');
|
||||
});
|
||||
|
||||
it("treats QA channel message delivery failures as failure replies", async () => {
|
||||
const state = createQaBusState();
|
||||
const pending = qaSuiteTesting.waitForOutboundMessage(
|
||||
state,
|
||||
(candidate) => candidate.text.includes("QA-RESTART"),
|
||||
5_000,
|
||||
);
|
||||
|
||||
state.addOutboundMessage({
|
||||
to: "channel:qa-room",
|
||||
text: "⚠️ ✉️ Message failed",
|
||||
senderId: "openclaw",
|
||||
senderName: "OpenClaw QA",
|
||||
});
|
||||
|
||||
await expect(pending).rejects.toThrow("Message failed");
|
||||
});
|
||||
|
||||
it("fails raw scenario waitForCondition calls when a classified failure reply arrives", async () => {
|
||||
const state = createQaBusState();
|
||||
const waitForCondition = qaSuiteTesting.createScenarioWaitForCondition(state);
|
||||
|
||||
@@ -180,6 +180,68 @@ function splitModelRef(ref: string) {
|
||||
};
|
||||
}
|
||||
|
||||
function normalizeQaConfigString(value: unknown): string | undefined {
|
||||
return typeof value === "string" && value.trim() ? value.trim() : undefined;
|
||||
}
|
||||
|
||||
function scenarioMatchesLiveLane(params: {
|
||||
scenario: ReturnType<typeof readQaBootstrapScenarioCatalog>["scenarios"][number];
|
||||
primaryModel: string;
|
||||
providerMode: "mock-openai" | "live-frontier";
|
||||
claudeCliAuthMode?: QaCliBackendAuthMode;
|
||||
}) {
|
||||
if (params.providerMode !== "live-frontier") {
|
||||
return true;
|
||||
}
|
||||
const selected = splitModelRef(params.primaryModel);
|
||||
const config = params.scenario.execution.config ?? {};
|
||||
const requiredProvider = normalizeQaConfigString(config.requiredProvider);
|
||||
if (requiredProvider && selected?.provider !== requiredProvider) {
|
||||
return false;
|
||||
}
|
||||
const requiredModel = normalizeQaConfigString(config.requiredModel);
|
||||
if (requiredModel && selected?.model !== requiredModel) {
|
||||
return false;
|
||||
}
|
||||
const requiredAuthMode = normalizeQaConfigString(config.authMode);
|
||||
if (requiredAuthMode && params.claudeCliAuthMode !== requiredAuthMode) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function selectQaSuiteScenarios(params: {
|
||||
scenarios: ReturnType<typeof readQaBootstrapScenarioCatalog>["scenarios"];
|
||||
scenarioIds?: string[];
|
||||
providerMode: "mock-openai" | "live-frontier";
|
||||
primaryModel: string;
|
||||
claudeCliAuthMode?: QaCliBackendAuthMode;
|
||||
}) {
|
||||
const requestedScenarioIds =
|
||||
params.scenarioIds && params.scenarioIds.length > 0 ? new Set(params.scenarioIds) : null;
|
||||
const requestedScenarios = requestedScenarioIds
|
||||
? params.scenarios.filter((scenario) => requestedScenarioIds.has(scenario.id))
|
||||
: params.scenarios;
|
||||
if (requestedScenarioIds) {
|
||||
const foundScenarioIds = new Set(requestedScenarios.map((scenario) => scenario.id));
|
||||
const missingScenarioIds = [...requestedScenarioIds].filter(
|
||||
(scenarioId) => !foundScenarioIds.has(scenarioId),
|
||||
);
|
||||
if (missingScenarioIds.length > 0) {
|
||||
throw new Error(`unknown QA scenario id(s): ${missingScenarioIds.join(", ")}`);
|
||||
}
|
||||
return requestedScenarios;
|
||||
}
|
||||
return requestedScenarios.filter((scenario) =>
|
||||
scenarioMatchesLiveLane({
|
||||
scenario,
|
||||
providerMode: params.providerMode,
|
||||
primaryModel: params.primaryModel,
|
||||
claudeCliAuthMode: params.claudeCliAuthMode,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
function liveTurnTimeoutMs(env: QaSuiteEnvironment, fallbackMs: number) {
|
||||
return resolveQaLiveTurnTimeoutMs(env, fallbackMs);
|
||||
}
|
||||
@@ -525,6 +587,12 @@ async function runConfigMutation(params: {
|
||||
action: "config.patch" | "config.apply";
|
||||
raw: string;
|
||||
sessionKey?: string;
|
||||
deliveryContext?: {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
@@ -539,10 +607,11 @@ async function runConfigMutation(params: {
|
||||
raw: params.raw,
|
||||
baseHash: snapshot.hash,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
...(params.deliveryContext ? { deliveryContext: params.deliveryContext } : {}),
|
||||
...(params.note ? { note: params.note } : {}),
|
||||
restartDelayMs,
|
||||
},
|
||||
{ timeoutMs: 45_000 },
|
||||
{ timeoutMs: 45_000, retryOnRestart: false },
|
||||
);
|
||||
await waitForConfigRestartSettle(params.env, restartDelayMs);
|
||||
return result;
|
||||
@@ -576,6 +645,12 @@ async function patchConfig(params: {
|
||||
env: QaSuiteEnvironment;
|
||||
patch: Record<string, unknown>;
|
||||
sessionKey?: string;
|
||||
deliveryContext?: {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
@@ -584,6 +659,7 @@ async function patchConfig(params: {
|
||||
action: "config.patch",
|
||||
raw: JSON.stringify(params.patch, null, 2),
|
||||
sessionKey: params.sessionKey,
|
||||
deliveryContext: params.deliveryContext,
|
||||
note: params.note,
|
||||
restartDelayMs: params.restartDelayMs,
|
||||
});
|
||||
@@ -593,6 +669,12 @@ async function applyConfig(params: {
|
||||
env: QaSuiteEnvironment;
|
||||
nextConfig: Record<string, unknown>;
|
||||
sessionKey?: string;
|
||||
deliveryContext?: {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
threadId?: string | number;
|
||||
};
|
||||
note?: string;
|
||||
restartDelayMs?: number;
|
||||
}) {
|
||||
@@ -601,6 +683,7 @@ async function applyConfig(params: {
|
||||
action: "config.apply",
|
||||
raw: JSON.stringify(params.nextConfig, null, 2),
|
||||
sessionKey: params.sessionKey,
|
||||
deliveryContext: params.deliveryContext,
|
||||
note: params.note,
|
||||
restartDelayMs: params.restartDelayMs,
|
||||
});
|
||||
@@ -1205,6 +1288,8 @@ export const qaSuiteTesting = {
|
||||
isConfigHashConflict,
|
||||
mapQaSuiteWithConcurrency,
|
||||
normalizeQaSuiteConcurrency,
|
||||
scenarioMatchesLiveLane,
|
||||
selectQaSuiteScenarios,
|
||||
waitForOutboundMessage,
|
||||
};
|
||||
|
||||
@@ -1303,20 +1388,13 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
|
||||
path.join(repoRoot, ".artifacts", "qa-e2e", `suite-${Date.now().toString(36)}`);
|
||||
await fs.mkdir(outputDir, { recursive: true });
|
||||
const catalog = readQaBootstrapScenarioCatalog();
|
||||
const requestedScenarioIds =
|
||||
params?.scenarioIds && params.scenarioIds.length > 0 ? new Set(params.scenarioIds) : null;
|
||||
const selectedCatalogScenarios = requestedScenarioIds
|
||||
? catalog.scenarios.filter((scenario) => requestedScenarioIds.has(scenario.id))
|
||||
: catalog.scenarios;
|
||||
if (requestedScenarioIds) {
|
||||
const foundScenarioIds = new Set(selectedCatalogScenarios.map((scenario) => scenario.id));
|
||||
const missingScenarioIds = [...requestedScenarioIds].filter(
|
||||
(scenarioId) => !foundScenarioIds.has(scenarioId),
|
||||
);
|
||||
if (missingScenarioIds.length > 0) {
|
||||
throw new Error(`unknown QA scenario id(s): ${missingScenarioIds.join(", ")}`);
|
||||
}
|
||||
}
|
||||
const selectedCatalogScenarios = selectQaSuiteScenarios({
|
||||
scenarios: catalog.scenarios,
|
||||
scenarioIds: params?.scenarioIds,
|
||||
providerMode,
|
||||
primaryModel,
|
||||
claudeCliAuthMode: params?.claudeCliAuthMode,
|
||||
});
|
||||
const concurrency = normalizeQaSuiteConcurrency(
|
||||
params?.concurrency,
|
||||
selectedCatalogScenarios.length,
|
||||
|
||||
@@ -22,6 +22,17 @@ const ConfigApplyLikeParamsSchema = Type.Object(
|
||||
raw: NonEmptyString,
|
||||
baseHash: Type.Optional(NonEmptyString),
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
deliveryContext: Type.Optional(
|
||||
Type.Object(
|
||||
{
|
||||
channel: Type.Optional(Type.String()),
|
||||
to: Type.Optional(Type.String()),
|
||||
accountId: Type.Optional(Type.String()),
|
||||
threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
),
|
||||
note: Type.Optional(Type.String()),
|
||||
restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
},
|
||||
@@ -43,6 +54,17 @@ export const ConfigSchemaLookupParamsSchema = Type.Object(
|
||||
export const UpdateRunParamsSchema = Type.Object(
|
||||
{
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
deliveryContext: Type.Optional(
|
||||
Type.Object(
|
||||
{
|
||||
channel: Type.Optional(Type.String()),
|
||||
to: Type.Optional(Type.String()),
|
||||
accountId: Type.Optional(Type.String()),
|
||||
threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
),
|
||||
note: Type.Optional(Type.String()),
|
||||
restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })),
|
||||
|
||||
@@ -328,18 +328,25 @@ function resolveConfigRestartRequest(params: unknown): {
|
||||
deliveryContext: ReturnType<typeof extractDeliveryInfo>["deliveryContext"];
|
||||
threadId: ReturnType<typeof extractDeliveryInfo>["threadId"];
|
||||
} {
|
||||
const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params);
|
||||
const {
|
||||
sessionKey,
|
||||
deliveryContext: requestedDeliveryContext,
|
||||
threadId: requestedThreadId,
|
||||
note,
|
||||
restartDelayMs,
|
||||
} = parseRestartRequestParams(params);
|
||||
|
||||
// Extract deliveryContext + threadId for routing after restart.
|
||||
// Uses generic :thread: parsing plus plugin-owned session grammars.
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
const { deliveryContext: sessionDeliveryContext, threadId: sessionThreadId } =
|
||||
extractDeliveryInfo(sessionKey);
|
||||
|
||||
return {
|
||||
sessionKey,
|
||||
note,
|
||||
restartDelayMs,
|
||||
deliveryContext,
|
||||
threadId,
|
||||
deliveryContext: requestedDeliveryContext ?? sessionDeliveryContext,
|
||||
threadId: requestedThreadId ?? sessionThreadId,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,55 @@
|
||||
import { normalizeOptionalString } from "../../shared/string-coerce.js";
|
||||
|
||||
type RestartDeliveryContext = {
|
||||
channel?: string;
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
};
|
||||
|
||||
function parseRestartDeliveryContext(params: unknown): {
|
||||
deliveryContext: RestartDeliveryContext | undefined;
|
||||
threadId: string | undefined;
|
||||
} {
|
||||
const raw = (params as { deliveryContext?: unknown }).deliveryContext;
|
||||
if (!raw || typeof raw !== "object" || Array.isArray(raw)) {
|
||||
return { deliveryContext: undefined, threadId: undefined };
|
||||
}
|
||||
const context = raw as {
|
||||
channel?: unknown;
|
||||
to?: unknown;
|
||||
accountId?: unknown;
|
||||
threadId?: unknown;
|
||||
};
|
||||
const deliveryContext: RestartDeliveryContext = {
|
||||
channel: normalizeOptionalString(context.channel),
|
||||
to: normalizeOptionalString(context.to),
|
||||
accountId: normalizeOptionalString(context.accountId),
|
||||
};
|
||||
const normalizedContext =
|
||||
deliveryContext.channel || deliveryContext.to || deliveryContext.accountId
|
||||
? deliveryContext
|
||||
: undefined;
|
||||
const threadId =
|
||||
typeof context.threadId === "number" && Number.isFinite(context.threadId)
|
||||
? String(Math.trunc(context.threadId))
|
||||
: normalizeOptionalString(context.threadId);
|
||||
return { deliveryContext: normalizedContext, threadId };
|
||||
}
|
||||
|
||||
export function parseRestartRequestParams(params: unknown): {
|
||||
sessionKey: string | undefined;
|
||||
deliveryContext: RestartDeliveryContext | undefined;
|
||||
threadId: string | undefined;
|
||||
note: string | undefined;
|
||||
restartDelayMs: number | undefined;
|
||||
} {
|
||||
const sessionKey = normalizeOptionalString((params as { sessionKey?: unknown }).sessionKey);
|
||||
const { deliveryContext, threadId } = parseRestartDeliveryContext(params);
|
||||
const note = normalizeOptionalString((params as { note?: unknown }).note);
|
||||
const restartDelayMsRaw = (params as { restartDelayMs?: unknown }).restartDelayMs;
|
||||
const restartDelayMs =
|
||||
typeof restartDelayMsRaw === "number" && Number.isFinite(restartDelayMsRaw)
|
||||
? Math.max(0, Math.floor(restartDelayMsRaw))
|
||||
: undefined;
|
||||
return { sessionKey, note, restartDelayMs };
|
||||
return { sessionKey, deliveryContext, threadId, note, restartDelayMs };
|
||||
}
|
||||
|
||||
@@ -21,8 +21,17 @@ export const updateHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params);
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
const {
|
||||
sessionKey,
|
||||
deliveryContext: requestedDeliveryContext,
|
||||
threadId: requestedThreadId,
|
||||
note,
|
||||
restartDelayMs,
|
||||
} = parseRestartRequestParams(params);
|
||||
const { deliveryContext: sessionDeliveryContext, threadId: sessionThreadId } =
|
||||
extractDeliveryInfo(sessionKey);
|
||||
const deliveryContext = requestedDeliveryContext ?? sessionDeliveryContext;
|
||||
const threadId = requestedThreadId ?? sessionThreadId;
|
||||
const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs;
|
||||
const timeoutMs =
|
||||
typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw)
|
||||
|
||||
@@ -34,7 +34,10 @@ const mocks = vi.hoisted(() => ({
|
||||
})),
|
||||
getChannelPlugin: vi.fn(() => undefined),
|
||||
normalizeChannelId: vi.fn((channel: string) => channel),
|
||||
resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })),
|
||||
resolveOutboundTarget: vi.fn((_params?: { to?: string }) => ({
|
||||
ok: true as const,
|
||||
to: "+15550002",
|
||||
})),
|
||||
deliverOutboundPayloads: vi.fn(async () => [{ channel: "whatsapp", messageId: "msg-1" }]),
|
||||
enqueueDelivery: vi.fn(async () => "queue-1"),
|
||||
ackDelivery: vi.fn(async () => {}),
|
||||
@@ -196,7 +199,7 @@ describe("scheduleRestartSentinelWake", () => {
|
||||
const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await vi.advanceTimersByTimeAsync(750);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
await wakePromise;
|
||||
|
||||
expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1);
|
||||
@@ -218,31 +221,31 @@ describe("scheduleRestartSentinelWake", () => {
|
||||
expect(mocks.enqueueSystemEvent).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.logWarn).toHaveBeenCalledWith(
|
||||
expect.stringContaining("retrying in 750ms"),
|
||||
expect.stringContaining("retrying in 1000ms"),
|
||||
expect.objectContaining({
|
||||
channel: "whatsapp",
|
||||
to: "+15550002",
|
||||
sessionKey: "agent:main:main",
|
||||
attempt: 1,
|
||||
maxAttempts: 2,
|
||||
maxAttempts: 45,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps one queued restart notice when outbound retries are exhausted", async () => {
|
||||
vi.useFakeTimers();
|
||||
mocks.deliverOutboundPayloads
|
||||
.mockRejectedValueOnce(new Error("transport not ready"))
|
||||
.mockRejectedValueOnce(new Error("transport still not ready"));
|
||||
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready"));
|
||||
|
||||
const wakePromise = scheduleRestartSentinelWake({ deps: {} as never });
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await vi.advanceTimersByTimeAsync(750);
|
||||
for (let attempt = 1; attempt < 45; attempt += 1) {
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
}
|
||||
await wakePromise;
|
||||
|
||||
expect(mocks.enqueueDelivery).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(2);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(45);
|
||||
expect(mocks.ackDelivery).not.toHaveBeenCalled();
|
||||
expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
|
||||
});
|
||||
@@ -316,6 +319,48 @@ describe("scheduleRestartSentinelWake", () => {
|
||||
expect(mocks.resolveOutboundTarget).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("resolves session routing before queueing the heartbeat wake", async () => {
|
||||
mocks.consumeRestartSentinel.mockResolvedValue({
|
||||
payload: {
|
||||
sessionKey: "agent:main:qa-channel:channel:qa-room",
|
||||
},
|
||||
} as Awaited<ReturnType<typeof mocks.consumeRestartSentinel>>);
|
||||
mocks.parseSessionThreadInfo.mockReturnValue({
|
||||
baseSessionKey: "agent:main:qa-channel:channel:qa-room",
|
||||
threadId: undefined,
|
||||
});
|
||||
mocks.deliveryContextFromSession.mockReturnValue({
|
||||
channel: "qa-channel",
|
||||
to: "channel:qa-room",
|
||||
});
|
||||
mocks.requestHeartbeatNow.mockImplementation(() => {
|
||||
mocks.deliveryContextFromSession.mockReturnValue({
|
||||
channel: "qa-channel",
|
||||
to: "heartbeat",
|
||||
});
|
||||
});
|
||||
mocks.resolveOutboundTarget.mockImplementation((params?: { to?: string }) => ({
|
||||
ok: true as const,
|
||||
to: params?.to ?? "missing",
|
||||
}));
|
||||
|
||||
await scheduleRestartSentinelWake({ deps: {} as never });
|
||||
|
||||
expect(mocks.requestHeartbeatNow).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.resolveOutboundTarget).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "qa-channel",
|
||||
to: "channel:qa-room",
|
||||
}),
|
||||
);
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "qa-channel",
|
||||
to: "channel:qa-room",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("merges base session routing into partial thread metadata", async () => {
|
||||
mocks.consumeRestartSentinel.mockResolvedValue({
|
||||
payload: {
|
||||
|
||||
@@ -19,8 +19,8 @@ import { deliveryContextFromSession, mergeDeliveryContext } from "../utils/deliv
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
|
||||
const log = createSubsystemLogger("gateway/restart-sentinel");
|
||||
const OUTBOUND_RETRY_DELAY_MS = 750;
|
||||
const OUTBOUND_MAX_ATTEMPTS = 2;
|
||||
const OUTBOUND_RETRY_DELAY_MS = 1_000;
|
||||
const OUTBOUND_MAX_ATTEMPTS = 45;
|
||||
|
||||
function hasRoutableDeliveryContext(context?: {
|
||||
channel?: string;
|
||||
@@ -145,8 +145,6 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
return;
|
||||
}
|
||||
|
||||
enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext);
|
||||
|
||||
const { baseSessionKey, threadId: sessionThreadId } = parseSessionThreadInfo(sessionKey);
|
||||
|
||||
const { cfg, entry } = loadSessionEntry(sessionKey);
|
||||
@@ -169,6 +167,8 @@ export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
|
||||
const origin = mergeDeliveryContext(sentinelContext, sessionDeliveryContext);
|
||||
|
||||
enqueueRestartSentinelWake(message, sessionKey, wakeDeliveryContext);
|
||||
|
||||
const channelRaw = origin?.channel;
|
||||
const channel = channelRaw ? normalizeChannelId(channelRaw) : null;
|
||||
const to = origin?.to;
|
||||
|
||||
Reference in New Issue
Block a user