refactor: centralize Discord gateway supervision

This commit is contained in:
Peter Steinberger
2026-03-24 22:44:00 -07:00
parent efafbece17
commit 6c3e767289
11 changed files with 511 additions and 232 deletions

View File

@@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai
- TUI/chat: preserve pending user messages when a slow local run emits an empty final event, but still defer and flush the needed history reload after the newer active run finishes so silent/tool-only runs do not stay incomplete. (#53130) Thanks @joelnishanth.
- Marketplace/agents: correct the ClawHub skill URL in agent docs and stream marketplace archive downloads to disk so installs avoid excess memory use and fail cleanly on empty responses. (#54160) Thanks @QuinnH496.
- Gateway/channels: keep channel startup sequential while isolating per-channel boot failures, so one broken channel no longer blocks later channels from starting. (#54215) Thanks @JonathanJing.
- Discord/gateway supervision: centralize gateway error handling behind a lifetime-owned supervisor so early, active, and late-teardown Carbon gateway errors stay classified consistently and stop surfacing as process-killing teardown crashes.
- Docs/IRC: fix five `json55` code-fence typos in the IRC channel examples so Mintlify applies JSON5 syntax highlighting correctly. (#50842) Thanks @Hollychou924.
- Telegram/forum topics: recover `#General` topic `1` routing when Telegram omits forum metadata, including native commands, interactive callbacks, inbound message context, and fallback error replies. (#53699) thanks @huntharo
- Discord/config types: add missing `autoArchiveDuration` to `DiscordGuildChannelConfig` so TypeScript config definitions match the existing schema and runtime support. (#43427) Thanks @davidguttman.

View File

@@ -1,87 +1,114 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { waitForDiscordGatewayStop } from "./monitor.gateway.js";
import type { DiscordGatewayEvent } from "./monitor/gateway-supervisor.js";
function createGatewayEvent(
type: DiscordGatewayEvent["type"],
message: string,
): DiscordGatewayEvent {
const err = new Error(message);
return {
type,
err,
message: String(err),
shouldStopLifecycle: type !== "other",
};
}
function createGatewayWaitHarness() {
const emitter = new EventEmitter();
let lifecycleHandler: ((event: DiscordGatewayEvent) => void) | undefined;
const disconnect = vi.fn();
const abort = new AbortController();
return { emitter, disconnect, abort };
const attachLifecycle = vi.fn((handler: (event: DiscordGatewayEvent) => void) => {
lifecycleHandler = handler;
});
const detachLifecycle = vi.fn(() => {
lifecycleHandler = undefined;
});
return {
abort,
attachLifecycle,
detachLifecycle,
disconnect,
emitGatewayEvent: (event: DiscordGatewayEvent) => {
lifecycleHandler?.(event);
},
gatewaySupervisor: {
attachLifecycle,
detachLifecycle,
},
};
}
function startGatewayWait(params?: {
onGatewayError?: (error: unknown) => void;
shouldStopOnError?: (error: unknown) => boolean;
disconnect?: () => void;
onGatewayEvent?: (event: DiscordGatewayEvent) => "continue" | "stop";
registerForceStop?: (fn: (error: unknown) => void) => void;
}) {
const harness = createGatewayWaitHarness();
if (params?.disconnect) {
harness.disconnect.mockImplementation(params.disconnect);
}
const promise = waitForDiscordGatewayStop({
gateway: { emitter: harness.emitter, disconnect: harness.disconnect },
gateway: { disconnect: harness.disconnect },
abortSignal: harness.abort.signal,
...(params?.onGatewayError ? { onGatewayError: params.onGatewayError } : {}),
...(params?.shouldStopOnError ? { shouldStopOnError: params.shouldStopOnError } : {}),
gatewaySupervisor: harness.gatewaySupervisor,
...(params?.onGatewayEvent ? { onGatewayEvent: params.onGatewayEvent } : {}),
...(params?.registerForceStop ? { registerForceStop: params.registerForceStop } : {}),
});
return { ...harness, promise };
}
async function expectAbortToResolve(params: {
emitter: EventEmitter;
disconnect: ReturnType<typeof vi.fn>;
abort: AbortController;
attachLifecycle: ReturnType<typeof vi.fn>;
detachLifecycle: ReturnType<typeof vi.fn>;
disconnect: ReturnType<typeof vi.fn>;
promise: Promise<void>;
expectedDisconnectBeforeAbort?: number;
}) {
if (params.expectedDisconnectBeforeAbort !== undefined) {
expect(params.disconnect).toHaveBeenCalledTimes(params.expectedDisconnectBeforeAbort);
}
expect(params.emitter.listenerCount("error")).toBe(1);
expect(params.attachLifecycle).toHaveBeenCalledTimes(1);
params.abort.abort();
await expect(params.promise).resolves.toBeUndefined();
expect(params.disconnect).toHaveBeenCalledTimes(1);
expect(params.emitter.listenerCount("error")).toBe(0);
expect(params.detachLifecycle).toHaveBeenCalledTimes(1);
}
describe("waitForDiscordGatewayStop", () => {
it("resolves on abort and disconnects gateway", async () => {
const { emitter, disconnect, abort, promise } = startGatewayWait();
await expectAbortToResolve({ emitter, disconnect, abort, promise });
const { abort, attachLifecycle, detachLifecycle, disconnect, promise } = startGatewayWait();
await expectAbortToResolve({ abort, attachLifecycle, detachLifecycle, disconnect, promise });
});
it("rejects on gateway error and disconnects", async () => {
const onGatewayError = vi.fn();
const err = new Error("boom");
it("rejects on lifecycle stop events and disconnects", async () => {
const fatalEvent = createGatewayEvent("fatal", "boom");
const { detachLifecycle, disconnect, emitGatewayEvent, promise } = startGatewayWait();
const { emitter, disconnect, abort, promise } = startGatewayWait({
onGatewayError,
});
emitter.emit("error", err);
emitGatewayEvent(fatalEvent);
await expect(promise).rejects.toThrow("boom");
expect(onGatewayError).toHaveBeenCalledWith(err);
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
abort.abort();
expect(disconnect).toHaveBeenCalledTimes(1);
expect(detachLifecycle).toHaveBeenCalledTimes(1);
});
it("ignores gateway errors when instructed", async () => {
const onGatewayError = vi.fn();
const err = new Error("transient");
it("ignores transient gateway events when instructed", async () => {
const transientEvent = createGatewayEvent("other", "transient");
const onGatewayEvent = vi.fn(() => "continue" as const);
const { abort, attachLifecycle, detachLifecycle, disconnect, emitGatewayEvent, promise } =
startGatewayWait({
onGatewayEvent,
});
const { emitter, disconnect, abort, promise } = startGatewayWait({
onGatewayError,
shouldStopOnError: () => false,
});
emitter.emit("error", err);
expect(onGatewayError).toHaveBeenCalledWith(err);
emitGatewayEvent(transientEvent);
expect(onGatewayEvent).toHaveBeenCalledWith(transientEvent);
await expectAbortToResolve({
emitter,
disconnect,
abort,
attachLifecycle,
detachLifecycle,
disconnect,
promise,
expectedDisconnectBeforeAbort: 0,
});
@@ -89,7 +116,6 @@ describe("waitForDiscordGatewayStop", () => {
it("resolves on abort without a gateway", async () => {
const abort = new AbortController();
const promise = waitForDiscordGatewayStop({
abortSignal: abort.signal,
});
@@ -102,7 +128,7 @@ describe("waitForDiscordGatewayStop", () => {
it("rejects via registerForceStop and disconnects gateway", async () => {
let forceStop: ((err: unknown) => void) | undefined;
const { emitter, disconnect, promise } = startGatewayWait({
const { detachLifecycle, disconnect, promise } = startGatewayWait({
registerForceStop: (fn) => {
forceStop = fn;
},
@@ -115,7 +141,7 @@ describe("waitForDiscordGatewayStop", () => {
await expect(promise).rejects.toThrow("reconnect watchdog timeout");
expect(disconnect).toHaveBeenCalledTimes(1);
expect(emitter.listenerCount("error")).toBe(0);
expect(detachLifecycle).toHaveBeenCalledTimes(1);
});
it("ignores forceStop after promise already settled", async () => {
@@ -133,4 +159,49 @@ describe("waitForDiscordGatewayStop", () => {
forceStop?.(new Error("too late"));
expect(disconnect).toHaveBeenCalledTimes(1);
});
it("keeps the lifecycle handler active until disconnect returns on abort", async () => {
const onGatewayEvent = vi.fn(() => "stop" as const);
const fatalEvent = createGatewayEvent("fatal", "disconnect emitted error");
let emitFromDisconnect: ((event: DiscordGatewayEvent) => void) | undefined;
const { abort, detachLifecycle, disconnect, emitGatewayEvent, promise } = startGatewayWait({
onGatewayEvent,
disconnect: () => {
emitFromDisconnect?.(fatalEvent);
},
});
emitFromDisconnect = emitGatewayEvent;
abort.abort();
await expect(promise).resolves.toBeUndefined();
expect(onGatewayEvent).toHaveBeenCalledWith(fatalEvent);
expect(disconnect).toHaveBeenCalledTimes(1);
expect(detachLifecycle).toHaveBeenCalledTimes(1);
});
it("keeps the original rejection when disconnect emits another stop event", async () => {
const firstEvent = createGatewayEvent("fatal", "first failure");
const secondEvent = createGatewayEvent("fatal", "second failure");
const seenEvents: DiscordGatewayEvent[] = [];
let emitFromDisconnect: ((event: DiscordGatewayEvent) => void) | undefined;
const { emitGatewayEvent, promise } = startGatewayWait({
onGatewayEvent: (event) => {
seenEvents.push(event);
return "stop";
},
disconnect: () => {
emitFromDisconnect?.(secondEvent);
},
});
emitFromDisconnect = emitGatewayEvent;
emitGatewayEvent(firstEvent);
await expect(promise).rejects.toThrow("first failure");
expect(seenEvents.map((event) => event.message)).toEqual([
firstEvent.message,
secondEvent.message,
]);
});
});

View File

@@ -1,15 +1,18 @@
import type { EventEmitter } from "node:events";
import type {
DiscordGatewayEvent,
DiscordGatewaySupervisor,
} from "./monitor/gateway-supervisor.js";
export type DiscordGatewayHandle = {
emitter?: Pick<EventEmitter, "on" | "removeListener">;
disconnect?: () => void;
};
export type WaitForDiscordGatewayStopParams = {
gateway?: DiscordGatewayHandle;
abortSignal?: AbortSignal;
onGatewayError?: (err: unknown) => void;
shouldStopOnError?: (err: unknown) => boolean;
gatewaySupervisor?: Pick<DiscordGatewaySupervisor, "attachLifecycle" | "detachLifecycle">;
onGatewayEvent?: (event: DiscordGatewayEvent) => "continue" | "stop";
registerForceStop?: (forceStop: (err: unknown) => void) => void;
};
@@ -20,13 +23,12 @@ export function getDiscordGatewayEmitter(gateway?: unknown): EventEmitter | unde
export async function waitForDiscordGatewayStop(
params: WaitForDiscordGatewayStopParams,
): Promise<void> {
const { gateway, abortSignal, onGatewayError, shouldStopOnError } = params;
const emitter = gateway?.emitter;
const { gateway, abortSignal } = params;
return await new Promise<void>((resolve, reject) => {
let settled = false;
const cleanup = () => {
abortSignal?.removeEventListener("abort", onAbort);
emitter?.removeListener("error", onGatewayErrorEvent);
params.gatewaySupervisor?.detachLifecycle();
};
const finishResolve = () => {
if (settled) {
@@ -57,11 +59,10 @@ export async function waitForDiscordGatewayStop(
const onAbort = () => {
finishResolve();
};
const onGatewayErrorEvent = (err: unknown) => {
onGatewayError?.(err);
const shouldStop = shouldStopOnError?.(err) ?? true;
const onGatewayEvent = (event: DiscordGatewayEvent) => {
const shouldStop = (params.onGatewayEvent?.(event) ?? "stop") === "stop";
if (shouldStop) {
finishReject(err);
finishReject(event.err);
}
};
const onForceStop = (err: unknown) => {
@@ -74,7 +75,7 @@ export async function waitForDiscordGatewayStop(
}
abortSignal?.addEventListener("abort", onAbort, { once: true });
emitter?.on("error", onGatewayErrorEvent);
params.gatewaySupervisor?.attachLifecycle(onGatewayEvent);
params.registerForceStop?.(onForceStop);
});
}

View File

@@ -1,33 +0,0 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js";
describe("attachEarlyGatewayErrorGuard", () => {
it("captures gateway errors until released", () => {
const emitter = new EventEmitter();
const fallbackErrorListener = vi.fn();
emitter.on("error", fallbackErrorListener);
const client = {
getPlugin: vi.fn(() => ({ emitter })),
};
const guard = attachEarlyGatewayErrorGuard(client as never);
emitter.emit("error", new Error("Fatal Gateway error: 4014"));
expect(guard.pendingErrors).toHaveLength(1);
guard.release();
emitter.emit("error", new Error("Fatal Gateway error: 4000"));
expect(guard.pendingErrors).toHaveLength(1);
expect(fallbackErrorListener).toHaveBeenCalledTimes(2);
});
it("returns noop guard when gateway emitter is unavailable", () => {
const client = {
getPlugin: vi.fn(() => undefined),
};
const guard = attachEarlyGatewayErrorGuard(client as never);
expect(guard.pendingErrors).toEqual([]);
expect(() => guard.release()).not.toThrow();
});
});

View File

@@ -1,36 +0,0 @@
import type { Client } from "@buape/carbon";
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
export type EarlyGatewayErrorGuard = {
pendingErrors: unknown[];
release: () => void;
};
export function attachEarlyGatewayErrorGuard(client: Client): EarlyGatewayErrorGuard {
const pendingErrors: unknown[] = [];
const gateway = client.getPlugin("gateway");
const emitter = getDiscordGatewayEmitter(gateway);
if (!emitter) {
return {
pendingErrors,
release: () => {},
};
}
let released = false;
const onGatewayError = (err: unknown) => {
pendingErrors.push(err);
};
emitter.on("error", onGatewayError);
return {
pendingErrors,
release: () => {
if (released) {
return;
}
released = true;
emitter.removeListener("error", onGatewayError);
},
};
}

View File

@@ -0,0 +1,88 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import {
classifyDiscordGatewayEvent,
createDiscordGatewaySupervisor,
} from "./gateway-supervisor.js";
describe("classifyDiscordGatewayEvent", () => {
it("maps raw gateway errors onto domain events", () => {
const reconnectEvent = classifyDiscordGatewayEvent({
err: new Error("Max reconnect attempts (0) reached after code 1006"),
isDisallowedIntentsError: () => false,
});
const fatalEvent = classifyDiscordGatewayEvent({
err: new Error("Fatal Gateway error: 4000"),
isDisallowedIntentsError: () => false,
});
const disallowedEvent = classifyDiscordGatewayEvent({
err: new Error("Fatal Gateway error: 4014"),
isDisallowedIntentsError: (err) => String(err).includes("4014"),
});
const transientEvent = classifyDiscordGatewayEvent({
err: new Error("transient"),
isDisallowedIntentsError: () => false,
});
expect(reconnectEvent.type).toBe("reconnect-exhausted");
expect(reconnectEvent.shouldStopLifecycle).toBe(true);
expect(fatalEvent.type).toBe("fatal");
expect(disallowedEvent.type).toBe("disallowed-intents");
expect(transientEvent.type).toBe("other");
expect(transientEvent.shouldStopLifecycle).toBe(false);
});
});
describe("createDiscordGatewaySupervisor", () => {
it("buffers early errors, routes active ones, and logs late teardown errors", () => {
const emitter = new EventEmitter();
const runtime = {
error: vi.fn(),
};
const supervisor = createDiscordGatewaySupervisor({
client: {
getPlugin: vi.fn(() => ({ emitter })),
} as never,
isDisallowedIntentsError: (err) => String(err).includes("4014"),
runtime: runtime as never,
});
const seen: string[] = [];
emitter.emit("error", new Error("Fatal Gateway error: 4014"));
expect(
supervisor.drainPending((event) => {
seen.push(event.type);
return "continue";
}),
).toBe("continue");
supervisor.attachLifecycle((event) => {
seen.push(event.type);
});
emitter.emit("error", new Error("Fatal Gateway error: 4000"));
supervisor.detachLifecycle();
emitter.emit("error", new Error("Max reconnect attempts (0) reached after code 1006"));
expect(seen).toEqual(["disallowed-intents", "fatal"]);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("suppressed late gateway reconnect-exhausted error during teardown"),
);
});
it("is idempotent on dispose and noops without an emitter", () => {
const supervisor = createDiscordGatewaySupervisor({
client: {
getPlugin: vi.fn(() => undefined),
} as never,
isDisallowedIntentsError: () => false,
runtime: { error: vi.fn() } as never,
});
expect(supervisor.drainPending(() => "continue")).toBe("continue");
expect(() => supervisor.attachLifecycle(() => {})).not.toThrow();
expect(() => supervisor.detachLifecycle()).not.toThrow();
expect(() => supervisor.dispose()).not.toThrow();
expect(() => supervisor.dispose()).not.toThrow();
});
});

View File

@@ -0,0 +1,151 @@
import type { EventEmitter } from "node:events";
import type { Client } from "@buape/carbon";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
export type DiscordGatewayEventType =
| "disallowed-intents"
| "fatal"
| "other"
| "reconnect-exhausted";
export type DiscordGatewayEvent = {
type: DiscordGatewayEventType;
err: unknown;
message: string;
shouldStopLifecycle: boolean;
};
export type DiscordGatewaySupervisor = {
emitter?: EventEmitter;
attachLifecycle: (handler: (event: DiscordGatewayEvent) => void) => void;
detachLifecycle: () => void;
drainPending: (
handler: (event: DiscordGatewayEvent) => "continue" | "stop",
) => "continue" | "stop";
dispose: () => void;
};
type GatewaySupervisorPhase = "active" | "buffering" | "disposed" | "teardown";
export function classifyDiscordGatewayEvent(params: {
err: unknown;
isDisallowedIntentsError: (err: unknown) => boolean;
}): DiscordGatewayEvent {
const message = String(params.err);
if (params.isDisallowedIntentsError(params.err)) {
return {
type: "disallowed-intents",
err: params.err,
message,
shouldStopLifecycle: true,
};
}
if (message.includes("Max reconnect attempts")) {
return {
type: "reconnect-exhausted",
err: params.err,
message,
shouldStopLifecycle: true,
};
}
if (message.includes("Fatal Gateway error")) {
return {
type: "fatal",
err: params.err,
message,
shouldStopLifecycle: true,
};
}
return {
type: "other",
err: params.err,
message,
shouldStopLifecycle: false,
};
}
export function createDiscordGatewaySupervisor(params: {
client: Client;
isDisallowedIntentsError: (err: unknown) => boolean;
runtime: RuntimeEnv;
}): DiscordGatewaySupervisor {
const gateway = params.client.getPlugin("gateway");
const emitter = getDiscordGatewayEmitter(gateway);
const pending: DiscordGatewayEvent[] = [];
if (!emitter) {
return {
attachLifecycle: () => {},
detachLifecycle: () => {},
drainPending: () => "continue",
dispose: () => {},
emitter,
};
}
let lifecycleHandler: ((event: DiscordGatewayEvent) => void) | undefined;
let phase: GatewaySupervisorPhase = "buffering";
let disposed = false;
const logLateTeardownEvent = (event: DiscordGatewayEvent) => {
params.runtime.error?.(
danger(
`discord: suppressed late gateway ${event.type} error during teardown: ${event.message}`,
),
);
};
const onGatewayError = (err: unknown) => {
if (disposed) {
return;
}
const event = classifyDiscordGatewayEvent({
err,
isDisallowedIntentsError: params.isDisallowedIntentsError,
});
if (phase === "active" && lifecycleHandler) {
lifecycleHandler(event);
return;
}
if (phase === "teardown") {
logLateTeardownEvent(event);
return;
}
pending.push(event);
};
emitter.on("error", onGatewayError);
return {
emitter,
attachLifecycle: (handler) => {
lifecycleHandler = handler;
phase = "active";
},
detachLifecycle: () => {
lifecycleHandler = undefined;
phase = "teardown";
},
drainPending: (handler) => {
if (pending.length === 0) {
return "continue";
}
const queued = [...pending];
pending.length = 0;
for (const event of queued) {
if (handler(event) === "stop") {
return "stop";
}
}
return "continue";
},
dispose: () => {
if (disposed) {
return;
}
disposed = true;
lifecycleHandler = undefined;
phase = "disposed";
pending.length = 0;
emitter.removeListener("error", onGatewayError);
},
};
}

View File

@@ -3,6 +3,7 @@ import type { Client } from "@buape/carbon";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { RuntimeEnv } from "../../../../src/runtime.js";
import type { WaitForDiscordGatewayStopParams } from "../monitor.gateway.js";
import type { DiscordGatewayEvent } from "./gateway-supervisor.js";
const {
attachDiscordGatewayLoggingMock,
@@ -55,7 +56,7 @@ describe("runDiscordGatewayLifecycle", () => {
start?: () => Promise<void>;
stop?: () => Promise<void>;
isDisallowedIntentsError?: (err: unknown) => boolean;
pendingGatewayErrors?: unknown[];
pendingGatewayEvents?: DiscordGatewayEvent[];
gateway?: {
isConnected?: boolean;
options?: Record<string, unknown>;
@@ -76,7 +77,26 @@ describe("runDiscordGatewayLifecycle", () => {
const runtimeLog = vi.fn();
const runtimeError = vi.fn();
const runtimeExit = vi.fn();
const releaseEarlyGatewayErrorGuard = vi.fn();
const pendingGatewayEvents = params?.pendingGatewayEvents ?? [];
const gatewaySupervisor = {
attachLifecycle: vi.fn(),
detachLifecycle: vi.fn(),
drainPending: vi.fn((handler: (event: DiscordGatewayEvent) => "continue" | "stop") => {
if (pendingGatewayEvents.length === 0) {
return "continue";
}
const queued = [...pendingGatewayEvents];
pendingGatewayEvents.length = 0;
for (const event of queued) {
if (handler(event) === "stop") {
return "stop";
}
}
return "continue";
}),
dispose: vi.fn(),
emitter: params?.gateway?.emitter,
};
const statusSink = vi.fn();
const runtime: RuntimeEnv = {
log: runtimeLog,
@@ -89,7 +109,7 @@ describe("runDiscordGatewayLifecycle", () => {
threadStop,
runtimeLog,
runtimeError,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
statusSink,
lifecycleParams: {
accountId: params?.accountId ?? "default",
@@ -102,8 +122,7 @@ describe("runDiscordGatewayLifecycle", () => {
voiceManagerRef: { current: null },
execApprovalsHandler: { start, stop },
threadBindings: { stop: threadStop },
pendingGatewayErrors: params?.pendingGatewayErrors,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
statusSink,
abortSignal: undefined as AbortSignal | undefined,
},
@@ -115,7 +134,7 @@ describe("runDiscordGatewayLifecycle", () => {
stop: ReturnType<typeof vi.fn>;
threadStop: ReturnType<typeof vi.fn>;
waitCalls: number;
releaseEarlyGatewayErrorGuard: ReturnType<typeof vi.fn>;
gatewaySupervisor: { detachLifecycle: ReturnType<typeof vi.fn> };
}) {
expect(params.start).toHaveBeenCalledTimes(1);
expect(params.stop).toHaveBeenCalledTimes(1);
@@ -123,7 +142,7 @@ describe("runDiscordGatewayLifecycle", () => {
expect(unregisterGatewayMock).toHaveBeenCalledWith("default");
expect(stopGatewayLoggingMock).toHaveBeenCalledTimes(1);
expect(params.threadStop).toHaveBeenCalledTimes(1);
expect(params.releaseEarlyGatewayErrorGuard).toHaveBeenCalledTimes(1);
expect(params.gatewaySupervisor.detachLifecycle).toHaveBeenCalledTimes(1);
}
function createGatewayHarness(params?: {
@@ -152,14 +171,26 @@ describe("runDiscordGatewayLifecycle", () => {
await vi.advanceTimersByTimeAsync(delayMs);
}
function createGatewayEvent(
type: DiscordGatewayEvent["type"],
message: string,
): DiscordGatewayEvent {
const err = new Error(message);
return {
type,
err,
message: String(err),
shouldStopLifecycle: type !== "other",
};
}
it("cleans up thread bindings when exec approvals startup fails", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const { lifecycleParams, start, stop, threadStop, releaseEarlyGatewayErrorGuard } =
createLifecycleHarness({
start: async () => {
throw new Error("startup failed");
},
});
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({
start: async () => {
throw new Error("startup failed");
},
});
await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow("startup failed");
@@ -168,14 +199,14 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 0,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
});
it("cleans up when gateway wait fails after startup", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
waitForDiscordGatewayStopMock.mockRejectedValueOnce(new Error("gateway wait failed"));
const { lifecycleParams, start, stop, threadStop, releaseEarlyGatewayErrorGuard } =
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } =
createLifecycleHarness();
await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow(
@@ -187,13 +218,13 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 1,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
});
it("cleans up after successful gateway wait", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const { lifecycleParams, start, stop, threadStop, releaseEarlyGatewayErrorGuard } =
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } =
createLifecycleHarness();
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
@@ -203,7 +234,7 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 1,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
});
@@ -264,7 +295,7 @@ describe("runDiscordGatewayLifecycle", () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const { emitter, gateway } = createGatewayHarness();
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const { lifecycleParams, start, stop, threadStop, releaseEarlyGatewayErrorGuard } =
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } =
createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
@@ -282,7 +313,7 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 0,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
} finally {
vi.useRealTimers();
@@ -291,17 +322,13 @@ describe("runDiscordGatewayLifecycle", () => {
it("handles queued disallowed intents errors without waiting for gateway events", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const {
lifecycleParams,
start,
stop,
threadStop,
runtimeError,
releaseEarlyGatewayErrorGuard,
} = createLifecycleHarness({
pendingGatewayErrors: [new Error("Fatal Gateway error: 4014")],
isDisallowedIntentsError: (err) => String(err).includes("4014"),
});
const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } =
createLifecycleHarness({
pendingGatewayEvents: [
createGatewayEvent("disallowed-intents", "Fatal Gateway error: 4014"),
],
isDisallowedIntentsError: (err) => String(err).includes("4014"),
});
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
@@ -313,16 +340,36 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 0,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
});
it("logs queued non-fatal startup gateway errors and continues", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } =
createLifecycleHarness({
pendingGatewayEvents: [createGatewayEvent("other", "transient startup error")],
});
await expect(runDiscordGatewayLifecycle(lifecycleParams)).resolves.toBeUndefined();
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("discord gateway error: Error: transient startup error"),
);
expectLifecycleCleanup({
start,
stop,
threadStop,
waitCalls: 1,
gatewaySupervisor,
});
});
it("throws queued non-disallowed fatal gateway errors", async () => {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const { lifecycleParams, start, stop, threadStop, releaseEarlyGatewayErrorGuard } =
createLifecycleHarness({
pendingGatewayErrors: [new Error("Fatal Gateway error: 4000")],
});
const { lifecycleParams, start, stop, threadStop, gatewaySupervisor } = createLifecycleHarness({
pendingGatewayEvents: [createGatewayEvent("fatal", "Fatal Gateway error: 4000")],
});
await expect(runDiscordGatewayLifecycle(lifecycleParams)).rejects.toThrow(
"Fatal Gateway error: 4000",
@@ -333,7 +380,7 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 0,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
});
@@ -341,23 +388,17 @@ describe("runDiscordGatewayLifecycle", () => {
vi.useFakeTimers();
try {
const { runDiscordGatewayLifecycle } = await import("./provider.lifecycle.js");
const pendingGatewayErrors: unknown[] = [];
const pendingGatewayEvents: DiscordGatewayEvent[] = [];
const { emitter, gateway } = createGatewayHarness();
getDiscordGatewayEmitterMock.mockReturnValueOnce(emitter);
const {
lifecycleParams,
start,
stop,
threadStop,
runtimeError,
releaseEarlyGatewayErrorGuard,
} = createLifecycleHarness({
gateway,
pendingGatewayErrors,
});
const { lifecycleParams, start, stop, threadStop, runtimeError, gatewaySupervisor } =
createLifecycleHarness({
gateway,
pendingGatewayEvents,
});
setTimeout(() => {
pendingGatewayErrors.push(new Error("Fatal Gateway error: 4001"));
pendingGatewayEvents.push(createGatewayEvent("fatal", "Fatal Gateway error: 4001"));
}, 1_000);
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
@@ -375,7 +416,7 @@ describe("runDiscordGatewayLifecycle", () => {
stop,
threadStop,
waitCalls: 0,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
} finally {
vi.useRealTimers();

View File

@@ -8,6 +8,7 @@ import { attachDiscordGatewayLogging } from "../gateway-logging.js";
import { getDiscordGatewayEmitter, waitForDiscordGatewayStop } from "../monitor.gateway.js";
import type { DiscordVoiceManager } from "../voice/manager.js";
import { registerGateway, unregisterGateway } from "./gateway-registry.js";
import type { DiscordGatewayEvent, DiscordGatewaySupervisor } from "./gateway-supervisor.js";
import type { DiscordMonitorStatusSink } from "./status.js";
type ExecApprovalsHandler = {
@@ -56,8 +57,7 @@ export async function runDiscordGatewayLifecycle(params: {
voiceManagerRef: { current: DiscordVoiceManager | null };
execApprovalsHandler: ExecApprovalsHandler | null;
threadBindings: { stop: () => void };
pendingGatewayErrors?: unknown[];
releaseEarlyGatewayErrorGuard?: () => void;
gatewaySupervisor: DiscordGatewaySupervisor;
statusSink?: DiscordMonitorStatusSink;
}) {
const HELLO_TIMEOUT_MS = 30000;
@@ -68,7 +68,7 @@ export async function runDiscordGatewayLifecycle(params: {
if (gateway) {
registerGateway(params.accountId, gateway);
}
const gatewayEmitter = getDiscordGatewayEmitter(gateway);
const gatewayEmitter = params.gatewaySupervisor.emitter ?? getDiscordGatewayEmitter(gateway);
const stopGatewayLogging = attachDiscordGatewayLogging({
emitter: gatewayEmitter,
runtime: params.runtime,
@@ -128,7 +128,6 @@ export async function runDiscordGatewayLifecycle(params: {
if (!gateway) {
return;
}
gatewayEmitter?.once("error", () => {});
gateway.options.reconnect = { maxAttempts: 0 };
gateway.disconnect();
};
@@ -274,45 +273,30 @@ export async function runDiscordGatewayLifecycle(params: {
gatewayEmitter?.on("debug", onGatewayDebug);
let sawDisallowedIntents = false;
const logGatewayError = (err: unknown) => {
if (params.isDisallowedIntentsError(err)) {
const handleGatewayEvent = (event: DiscordGatewayEvent): "continue" | "stop" => {
if (event.type === "disallowed-intents") {
sawDisallowedIntents = true;
params.runtime.error?.(
danger(
"discord: gateway closed with code 4014 (missing privileged gateway intents). Enable the required intents in the Discord Developer Portal or disable them in config.",
),
);
return;
return "stop";
}
params.runtime.error?.(danger(`discord gateway error: ${String(err)}`));
params.runtime.error?.(danger(`discord gateway error: ${event.message}`));
return event.shouldStopLifecycle ? "stop" : "continue";
};
const shouldStopOnGatewayError = (err: unknown) => {
const message = String(err);
return (
message.includes("Max reconnect attempts") ||
message.includes("Fatal Gateway error") ||
params.isDisallowedIntentsError(err)
);
};
const drainPendingGatewayErrors = (): "continue" | "stop" => {
const pendingGatewayErrors = params.pendingGatewayErrors ?? [];
if (pendingGatewayErrors.length === 0) {
return "continue";
}
const queuedErrors = [...pendingGatewayErrors];
pendingGatewayErrors.length = 0;
for (const err of queuedErrors) {
logGatewayError(err);
if (!shouldStopOnGatewayError(err)) {
continue;
const drainPendingGatewayErrors = (): "continue" | "stop" =>
params.gatewaySupervisor.drainPending((event) => {
const decision = handleGatewayEvent(event);
if (decision !== "stop") {
return "continue";
}
if (params.isDisallowedIntentsError(err)) {
if (event.type === "disallowed-intents") {
return "stop";
}
throw err;
}
return "continue";
};
throw event.err;
});
try {
if (params.execApprovalsHandler) {
await params.execApprovalsHandler.start();
@@ -395,16 +379,19 @@ export async function runDiscordGatewayLifecycle(params: {
});
}
if (drainPendingGatewayErrors() === "stop") {
return;
}
await waitForDiscordGatewayStop({
gateway: gateway
? {
emitter: gatewayEmitter,
disconnect: () => gateway.disconnect(),
}
: undefined,
abortSignal: params.abortSignal,
onGatewayError: logGatewayError,
shouldStopOnError: shouldStopOnGatewayError,
gatewaySupervisor: params.gatewaySupervisor,
onGatewayEvent: handleGatewayEvent,
registerForceStop: (forceStop) => {
forceStopHandler = forceStop;
if (queuedForceStopError !== undefined) {
@@ -420,14 +407,7 @@ export async function runDiscordGatewayLifecycle(params: {
}
} finally {
lifecycleStopping = true;
// attach a safety listener before releasing other listeners so that late
// "error" events emitted by Carbon during teardown do not become uncaught
// exceptions and crash the entire gateway process.
const suppressLateError = (err: unknown) => {
params.runtime.error?.(danger(`discord: suppressed late gateway error: ${String(err)}`));
};
gatewayEmitter?.on("error", suppressLateError);
params.releaseEarlyGatewayErrorGuard?.();
params.gatewaySupervisor.detachLifecycle();
unregisterGateway(params.accountId);
stopGatewayLogging();
reconnectStallWatchdog.stop();
@@ -442,6 +422,5 @@ export async function runDiscordGatewayLifecycle(params: {
await params.execApprovalsHandler.stop();
}
params.threadBindings.stop();
gatewayEmitter?.removeListener("error", suppressLateError);
}
}

View File

@@ -354,9 +354,26 @@ describe("monitorDiscordProvider", () => {
it("captures gateway errors emitted before lifecycle wait starts", async () => {
const emitter = new EventEmitter();
const drained: Array<{ message: string; type: string }> = [];
clientGetPluginMock.mockImplementation((name: string) =>
name === "gateway" ? { emitter, disconnect: vi.fn() } : undefined,
);
monitorLifecycleMock.mockImplementationOnce(async (params) => {
(
params as {
gatewaySupervisor?: {
drainPending: (
handler: (event: { message: string; type: string }) => "continue" | "stop",
) => "continue" | "stop";
};
threadBindings: { stop: () => void };
}
).gatewaySupervisor?.drainPending((event) => {
drained.push(event);
return "continue";
});
params.threadBindings.stop();
});
clientFetchUserMock.mockImplementationOnce(async () => {
emitter.emit("error", new Error("Fatal Gateway error: 4014"));
return { id: "bot-1" };
@@ -368,11 +385,9 @@ describe("monitorDiscordProvider", () => {
});
expect(monitorLifecycleMock).toHaveBeenCalledTimes(1);
const lifecycleArgs = monitorLifecycleMock.mock.calls[0]?.[0] as {
pendingGatewayErrors?: unknown[];
};
expect(lifecycleArgs.pendingGatewayErrors).toHaveLength(1);
expect(String(lifecycleArgs.pendingGatewayErrors?.[0])).toContain("4014");
expect(drained).toHaveLength(1);
expect(drained[0]?.type).toBe("disallowed-intents");
expect(drained[0]?.message).toContain("4014");
});
it("passes default eventQueue.listenerTimeout of 120s to Carbon Client", async () => {

View File

@@ -45,7 +45,6 @@ import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import { createNonExitingRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { summarizeStringEntries } from "openclaw/plugin-sdk/text-runtime";
import { resolveDiscordAccount } from "../accounts.js";
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
import { fetchDiscordApplicationId } from "../probe.js";
import { normalizeDiscordToken } from "../token.js";
import { createDiscordVoiceCommand } from "../voice/command.js";
@@ -63,8 +62,8 @@ import {
import { createDiscordAutoPresenceController } from "./auto-presence.js";
import { resolveDiscordSlashCommandConfig } from "./commands.js";
import { createExecApprovalButton, DiscordExecApprovalHandler } from "./exec-approvals.js";
import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js";
import { createDiscordGatewayPlugin } from "./gateway-plugin.js";
import { createDiscordGatewaySupervisor } from "./gateway-supervisor.js";
import {
DiscordMessageListener,
DiscordPresenceListener,
@@ -649,10 +648,10 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
}
}
let lifecycleStarted = false;
let releaseEarlyGatewayErrorGuard = () => {};
let gatewaySupervisor: ReturnType<typeof createDiscordGatewaySupervisor> | undefined;
let deactivateMessageHandler: (() => void) | undefined;
let autoPresenceController: ReturnType<typeof createDiscordAutoPresenceController> | null = null;
let earlyGatewayEmitter: ReturnType<typeof getDiscordGatewayEmitter> | undefined;
let earlyGatewayEmitter = gatewaySupervisor?.emitter;
let onEarlyGatewayDebug: ((msg: unknown) => void) | undefined;
try {
const commands: BaseCommand[] = commandSpecs.map((spec) =>
@@ -798,11 +797,14 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
},
clientPlugins,
);
const earlyGatewayErrorGuard = attachEarlyGatewayErrorGuard(client);
releaseEarlyGatewayErrorGuard = earlyGatewayErrorGuard.release;
gatewaySupervisor = createDiscordGatewaySupervisor({
client,
isDisallowedIntentsError: isDiscordDisallowedIntentsError,
runtime,
});
const lifecycleGateway = client.getPlugin<GatewayPlugin>("gateway");
earlyGatewayEmitter = getDiscordGatewayEmitter(lifecycleGateway);
earlyGatewayEmitter = gatewaySupervisor.emitter;
onEarlyGatewayDebug = (msg: unknown) => {
if (!isVerbose()) {
return;
@@ -1019,8 +1021,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
voiceManagerRef,
execApprovalsHandler,
threadBindings,
pendingGatewayErrors: earlyGatewayErrorGuard.pendingErrors,
releaseEarlyGatewayErrorGuard,
gatewaySupervisor,
});
} finally {
deactivateMessageHandler?.();
@@ -1029,7 +1030,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
if (onEarlyGatewayDebug) {
earlyGatewayEmitter?.removeListener("debug", onEarlyGatewayDebug);
}
releaseEarlyGatewayErrorGuard();
gatewaySupervisor?.dispose();
if (!lifecycleStarted) {
threadBindings.stop();
}