fix(whatsapp): periodic delivery-queue drain so enqueued items don't wait for next reconnect (#79083)

Merged via squash.

Prepared head SHA: 9a619bb9d9
Co-authored-by: Oviemudiaga <49584793+Oviemudiaga@users.noreply.github.com>
Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com>
Reviewed-by: @mcaxtr
This commit is contained in:
Oviemudi.eth
2026-05-18 21:29:15 -05:00
committed by GitHub
parent ff4bf0c367
commit e9989f3a92
3 changed files with 121 additions and 1 deletions

View File

@@ -2,6 +2,12 @@
Docs: https://docs.openclaw.ai
## Unreleased
### Fixes
- WhatsApp: drain pending outbound deliveries on a 30s periodic timer in addition to the reconnect handler, so messages enqueued while the provider is already connected no longer wait for the next reconnect to send. (#79083) Thanks @Oviemudiaga.
## 2026.5.19
### Changes

View File

@@ -27,6 +27,25 @@ import {
startWebAutoReplyMonitor,
} from "./auto-reply.test-harness.js";
type DrainSelectionEntry = {
channel: string;
accountId?: string | null;
lastError?: string;
};
type DrainPendingDeliveriesCall = {
drainKey: string;
logLabel: string;
selectEntry: (entry: DrainSelectionEntry) => { match: boolean; bypassBackoff: boolean };
};
const deliveryQueueMocks = vi.hoisted(() => ({
drainPendingDeliveries: vi.fn(async (_opts: unknown) => undefined),
}));
vi.mock("openclaw/plugin-sdk/delivery-queue-runtime", () => ({
drainPendingDeliveries: deliveryQueueMocks.drainPendingDeliveries,
}));
installWebAutoReplyTestHomeHooks();
function requireOnMessage(
@@ -247,6 +266,78 @@ describe("web auto-reply connection", () => {
expect(sleep).toHaveBeenCalled();
});
it("drains pending deliveries while connected and stops after close", async () => {
vi.useFakeTimers();
try {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();
const { controller, run } = startWebAutoReplyMonitor({
monitorWebChannelFn: monitorWebChannel as never,
listenerFactory: scripted.listenerFactory,
sleep,
accountId: "work",
});
await vi.waitFor(
() => {
expect(scripted.getListenerCount()).toBe(1);
},
{ timeout: 250, interval: 2 },
);
expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledWith(
expect.objectContaining({
drainKey: "whatsapp:work",
logLabel: "WhatsApp reconnect drain",
}),
);
deliveryQueueMocks.drainPendingDeliveries.mockClear();
await vi.advanceTimersByTimeAsync(30_000);
await vi.waitFor(() => {
expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledTimes(1);
});
const periodicCall = deliveryQueueMocks.drainPendingDeliveries.mock.calls.at(-1)?.[0] as
| DrainPendingDeliveriesCall
| undefined;
expect(periodicCall).toBeDefined();
if (!periodicCall) {
throw new Error("Expected WhatsApp periodic drain call");
}
expect(periodicCall.drainKey).toBe("whatsapp:work");
expect(periodicCall.logLabel).toBe("WhatsApp periodic drain");
expect(
periodicCall.selectEntry({
channel: "whatsapp",
accountId: "work",
}),
).toEqual({ match: true, bypassBackoff: false });
expect(
periodicCall.selectEntry({
channel: "whatsapp",
accountId: "default",
}),
).toEqual({ match: false, bypassBackoff: false });
expect(
periodicCall.selectEntry({
channel: "telegram",
accountId: "work",
}),
).toEqual({ match: false, bypassBackoff: false });
controller.abort();
scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" });
await Promise.resolve();
await run;
deliveryQueueMocks.drainPendingDeliveries.mockClear();
await vi.advanceTimersByTimeAsync(30_000);
expect(deliveryQueueMocks.drainPendingDeliveries).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("treats status 440 as non-retryable and stops without retrying", async () => {
const sleep = vi.fn(async () => {});
const scripted = createScriptedWebListenerFactory();

View File

@@ -541,17 +541,40 @@ export async function monitorWebChannel(
);
});
const periodicDrainInterval = setInterval(() => {
void drainPendingDeliveries({
drainKey: `whatsapp:${normalizedAccountId}`,
logLabel: "WhatsApp periodic drain",
cfg,
log: reconnectLogger,
selectEntry: (entry) => ({
match:
entry.channel === "whatsapp" &&
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
bypassBackoff: false,
}),
}).catch((err) => {
reconnectLogger.warn(
{ connectionId: connection.connectionId, error: String(err) },
"periodic drain failed",
);
});
}, 30_000);
whatsappLog.info("Listening for personal WhatsApp inbound messages.");
if (process.stdout.isTTY || process.stderr.isTTY) {
whatsappLog.raw("Ctrl+C to stop.");
}
if (!keepAlive) {
clearInterval(periodicDrainInterval);
await controller.shutdown();
return;
}
const reason = await controller.waitForClose();
const reason = await controller
.waitForClose()
.finally(() => clearInterval(periodicDrainInterval));
if (stopRequested() || sigintStop || reason === "aborted") {
await controller.shutdown();
break;