fix: harden startup readiness and discord replies

(cherry picked from commit 3956672106b3387d42427a485a9ca01e77f3b78f)
This commit is contained in:
Satoshi
2026-05-04 14:42:16 +01:00
committed by Peter Steinberger
parent 7e229f0d3d
commit e259938e96
12 changed files with 381 additions and 49 deletions

View File

@@ -333,7 +333,7 @@ describe("runDiscordGatewayLifecycle", () => {
expect(statusSink).toHaveBeenCalledTimes(callCountAfterCleanup);
});
it("restarts the gateway once when startup never reaches READY, then recovers", async () => {
it("reconnects with backoff when startup never reaches READY, then recovers", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
@@ -347,10 +347,13 @@ describe("runDiscordGatewayLifecycle", () => {
const { lifecycleParams, runtimeError, statusSink } = createLifecycleHarness({ gateway });
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
await vi.advanceTimersByTimeAsync(16_500);
await vi.advanceTimersByTimeAsync(18_500);
await expect(lifecyclePromise).resolves.toBeUndefined();
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("gateway READY wait timed out after 15000ms"),
);
expect(runtimeError).not.toHaveBeenCalledWith(
expect.stringContaining("gateway was not ready after 15000ms; restarting gateway"),
);
expect(gateway.disconnect).toHaveBeenCalledTimes(1);
@@ -396,14 +399,14 @@ describe("runDiscordGatewayLifecycle", () => {
expect(gateway.connect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledWith(false);
await vi.advanceTimersByTimeAsync(1_000);
await vi.advanceTimersByTimeAsync(3_000);
await expect(lifecyclePromise).resolves.toBeUndefined();
} finally {
vi.useRealTimers();
}
});
it("fails when startup still is not ready after a restart", async () => {
it("keeps retrying when startup still is not ready after a reconnect", async () => {
vi.useFakeTimers();
try {
const { emitter, gateway } = createGatewayHarness();
@@ -414,19 +417,17 @@ describe("runDiscordGatewayLifecycle", () => {
const lifecyclePromise = runDiscordGatewayLifecycle(lifecycleParams);
lifecyclePromise.catch(() => {});
await vi.advanceTimersByTimeAsync(31_000);
await vi.advanceTimersByTimeAsync(34_000);
await expect(lifecyclePromise).rejects.toThrow(
"discord gateway did not reach READY within 15000ms after restart",
);
expect(gateway.disconnect).toHaveBeenCalledTimes(1);
expect(gateway.connect).toHaveBeenCalledTimes(1);
expect(gateway.disconnect).toHaveBeenCalledTimes(2);
expect(gateway.connect).toHaveBeenCalledTimes(2);
expect(gateway.connect).toHaveBeenCalledWith(false);
expectLifecycleCleanup({
threadStop,
waitCalls: 0,
gatewaySupervisor,
});
expect(waitForDiscordGatewayStopMock).not.toHaveBeenCalled();
gateway.isConnected = true;
await vi.advanceTimersByTimeAsync(2_500);
await expect(lifecyclePromise).resolves.toBeUndefined();
expectLifecycleCleanup({ threadStop, waitCalls: 1, gatewaySupervisor });
} finally {
vi.useRealTimers();
}

View File

@@ -25,6 +25,7 @@ const MAX_DISCORD_GATEWAY_READY_TIMEOUT_MS = 120_000;
const DISCORD_GATEWAY_READY_TIMEOUT_ENV = "OPENCLAW_DISCORD_READY_TIMEOUT_MS";
const DISCORD_GATEWAY_RUNTIME_READY_TIMEOUT_ENV = "OPENCLAW_DISCORD_RUNTIME_READY_TIMEOUT_MS";
const DISCORD_GATEWAY_READY_POLL_MS = 250;
const DISCORD_GATEWAY_READY_RETRY_BACKOFF_MS = 2_000;
const DISCORD_GATEWAY_STARTUP_DISCONNECT_DRAIN_TIMEOUT_MS = 5_000;
const DISCORD_GATEWAY_STARTUP_TERMINATE_CLOSE_TIMEOUT_MS = 1_000;
const DISCORD_GATEWAY_TRANSPORT_ACTIVITY_STATUS_MIN_INTERVAL_MS = 30_000;
@@ -355,41 +356,50 @@ async function waitForGatewayReady(params: {
return "stopped";
};
const firstAttempt = await waitUntilReady();
if (firstAttempt !== "timeout") {
return;
}
if (!params.gateway) {
throw new Error(`discord gateway did not reach READY within ${params.readyTimeoutMs}ms`);
}
const restartAt = Date.now();
params.runtime.error?.(
danger(`discord: gateway was not ready after ${params.readyTimeoutMs}ms; restarting gateway`),
);
params.pushStatus?.({
connected: false,
lastEventAt: restartAt,
lastDisconnect: {
at: restartAt,
error: "startup-not-ready",
},
lastError: "startup-not-ready",
});
if (params.abortSignal?.aborted) {
const attempt = await waitUntilReady();
if (attempt === "timeout") {
throw new Error(`discord gateway did not reach READY within ${params.readyTimeoutMs}ms`);
}
return;
}
await params.beforeRestart?.();
await restartGatewayAfterReadyTimeout({
gateway: params.gateway,
abortSignal: params.abortSignal,
runtime: params.runtime,
});
if ((await waitUntilReady()) === "timeout") {
throw new Error(
`discord gateway did not reach READY within ${params.readyTimeoutMs}ms after restart`,
let attempt = 0;
while (!params.abortSignal?.aborted) {
const result = await waitUntilReady();
if (result !== "timeout") {
return;
}
attempt += 1;
const restartAt = Date.now();
params.runtime.error?.(
danger(
`discord: gateway READY wait timed out after ${params.readyTimeoutMs}ms; reconnecting with backoff (attempt ${attempt})`,
),
);
params.pushStatus?.({
connected: false,
lastEventAt: restartAt,
lastDisconnect: {
at: restartAt,
error: "startup-not-ready",
},
lastError: "startup-not-ready",
});
await params.beforeRestart?.();
await restartGatewayAfterReadyTimeout({
gateway: params.gateway,
abortSignal: params.abortSignal,
runtime: params.runtime,
});
if (params.abortSignal?.aborted) {
return;
}
await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, DISCORD_GATEWAY_READY_RETRY_BACKOFF_MS);
timeout.unref?.();
});
}
}

View File

@@ -105,6 +105,76 @@ describe("deliverDiscordReply", () => {
);
});
it("strips internal execution trace lines at the final Discord send boundary", async () => {
await deliverDiscordReply({
replies: [
{
text: [
"📊 Session Status: current",
"🛠️ Exec: run git status",
"📖 Read: lines 1-40 from secret.md",
"Visible reply.",
].join("\n"),
},
],
target: "channel:101",
token: "token",
accountId: "default",
runtime,
cfg,
textLimit: 2000,
});
expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [{ text: "Visible reply." }],
}),
);
});
it("drops pure internal trace text while preserving media-only delivery", async () => {
await deliverDiscordReply({
replies: [
{
text: "commentary: calling tool\nanalysis: inspect private state",
mediaUrl: "https://example.com/result.png",
},
],
target: "channel:101",
token: "token",
accountId: "default",
runtime,
cfg,
textLimit: 2000,
});
expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [{ mediaUrl: "https://example.com/result.png", text: undefined }],
}),
);
});
it("does not strip ordinary code-fenced examples of tool-call labels", async () => {
const text = ["Example:", "```", "🛠️ Exec: run ls", "```"].join("\n");
await deliverDiscordReply({
replies: [{ text }],
target: "channel:101",
token: "token",
accountId: "default",
runtime,
cfg,
textLimit: 2000,
});
expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith(
expect.objectContaining({
payloads: [{ text }],
}),
);
});
it("passes resolved Discord formatting options as explicit delivery options", async () => {
const baseCfg = {
channels: {

View File

@@ -18,6 +18,7 @@ import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import type { RequestClient } from "../internal/discord.js";
import { sendMessageDiscord, sendVoiceMessageDiscord } from "../send.js";
import { sanitizeDiscordFrontChannelReplyPayloads } from "./reply-safety.js";
export type DiscordThreadBindingLookupRecord = {
accountId: string;
@@ -175,13 +176,17 @@ export async function deliverDiscordReply(params: {
void params.runtime;
const delivery = resolveDiscordDeliveryOptions(params);
const payloads = sanitizeDiscordFrontChannelReplyPayloads(params.replies);
if (payloads.length === 0) {
return;
}
await deliverOutboundPayloads({
cfg: params.cfg,
channel: "discord",
to: delivery.to,
accountId: params.accountId,
payloads: params.replies,
payloads,
replyToId: normalizeOptionalString(params.replyToId),
replyToMode: delivery.replyToMode,
formatting: delivery.formatting,

View File

@@ -0,0 +1,64 @@
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { sanitizeAssistantVisibleText } from "openclaw/plugin-sdk/text-runtime";
const DISCORD_INTERNAL_TRACE_LINE_RE =
/^(?:>\s*)?(?:(?:📊|🛠️|📖|📝|🔍|🔎|⚙️)\s*)?(?:Session Status|Exec|Read|Edit|Write|Patch|Search|Open|Click|Find|Screenshot|Update Plan|Tool Call|Tool Result|Function Call|Shell|Command)\s*:/i;
const DISCORD_INTERNAL_CHANNEL_LINE_RE =
/^(?:>\s*)?(?:analysis|commentary|tool[-_ ]?call|tool[-_ ]?result|function[-_ ]?call|thinking|reasoning)\s*[:=]/i;
function stripDiscordInternalTraceLines(text: string): string {
let inFence = false;
const kept: string[] = [];
for (const line of text.split(/\r?\n/)) {
if (/^\s*```/.test(line)) {
inFence = !inFence;
kept.push(line);
continue;
}
if (!inFence) {
const trimmed = line.trim();
if (
DISCORD_INTERNAL_TRACE_LINE_RE.test(trimmed) ||
DISCORD_INTERNAL_CHANNEL_LINE_RE.test(trimmed)
) {
continue;
}
}
kept.push(line);
}
return kept.join("\n");
}
function collapseExcessBlankLines(text: string): string {
return text.replace(/[ \t]+\n/g, "\n").replace(/\n{3,}/g, "\n\n");
}
export function sanitizeDiscordFrontChannelText(text: string): string {
const withoutAssistantScaffolding = sanitizeAssistantVisibleText(text);
const withoutTraceLines = stripDiscordInternalTraceLines(withoutAssistantScaffolding);
return collapseExcessBlankLines(withoutTraceLines).trim();
}
export function sanitizeDiscordFrontChannelReplyPayloads(
payloads: readonly ReplyPayload[],
): ReplyPayload[] {
const safePayloads: ReplyPayload[] = [];
for (const payload of payloads) {
const originalParts = resolveSendableOutboundReplyParts(payload);
const safeText =
typeof payload.text === "string"
? sanitizeDiscordFrontChannelText(payload.text)
: payload.text;
const nextPayload =
safeText === payload.text
? payload
: ({ ...payload, text: safeText || undefined } as ReplyPayload);
const nextParts = resolveSendableOutboundReplyParts(nextPayload);
if (!nextParts.hasText && !originalParts.hasMedia) {
continue;
}
safePayloads.push(nextPayload);
}
return safePayloads;
}