mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix(gateway): refresh stale channel health cache
This commit is contained in:
@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Discord/voice: lengthen the default voice join Ready wait, add configurable `voice.connectTimeoutMs`/`voice.reconnectGraceMs`, and warn before destroying unrecovered disconnected sessions so slow Discord voice handshakes and reconnects no longer fail silently. Fixes #63098; refs #39825 and #65039. Thanks @darealgege, @kzicherman, and @ayochim.
|
||||
- Gateway/health: refresh cached health RPC snapshots when channel runtime state diverges, so Discord and other channel status reads no longer report stale running or connected values until the cache TTL expires. (#75423) Thanks @clawsweeper.
|
||||
- Discord/voice: merge configured media-understanding providers such as Deepgram into partial active provider registries, so follow-up voice turns keep transcribing after another media plugin is already active. Fixes #65687. Thanks @OneMintJulep.
|
||||
- WhatsApp: stage `qrcode` with the WhatsApp plugin runtime dependencies so packaged QR pairing can render from staged plugin-runtime-deps installs. Fixes #75394. Thanks @FelipeX2001.
|
||||
- Discord/voice: apply per-channel Discord `systemPrompt` overrides to voice transcript turns by forwarding the trusted channel prompt through the voice agent run. Fixes #47095. Thanks @qearlyao.
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import type { ChannelAccountSnapshot } from "../../channels/plugins/types.public.js";
|
||||
import type { ChannelHealthSummary, HealthSummary } from "../../commands/health.types.js";
|
||||
import { getStatusSummary } from "../../commands/status.js";
|
||||
import { ErrorCodes, errorShape } from "../protocol/index.js";
|
||||
import type { ChannelRuntimeSnapshot } from "../server-channel-runtime.types.js";
|
||||
import { HEALTH_REFRESH_INTERVAL_MS } from "../server-constants.js";
|
||||
import { formatError } from "../server-utils.js";
|
||||
import { formatForLog } from "../ws-log.js";
|
||||
@@ -7,6 +10,78 @@ import type { GatewayRequestHandlers } from "./types.js";
|
||||
|
||||
const ADMIN_SCOPE = "operator.admin";
|
||||
|
||||
function cachedAccountForRuntimeSnapshot(params: {
|
||||
cachedChannel: ChannelHealthSummary | undefined;
|
||||
accountId: string | undefined;
|
||||
}): ChannelHealthSummary | undefined {
|
||||
const accountId = params.accountId;
|
||||
if (accountId && params.cachedChannel?.accounts?.[accountId]) {
|
||||
return params.cachedChannel.accounts[accountId];
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function cachedLifecycleDiffersFromRuntime(params: {
|
||||
cachedAccount: ChannelHealthSummary | undefined;
|
||||
runtimeSnapshot: ChannelAccountSnapshot;
|
||||
}): boolean {
|
||||
for (const key of ["running", "connected"] as const) {
|
||||
const runtimeValue = params.runtimeSnapshot[key];
|
||||
if (typeof runtimeValue !== "boolean") {
|
||||
continue;
|
||||
}
|
||||
if (params.cachedAccount?.[key] !== runtimeValue) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function cachedHealthDiffersFromRuntime(
|
||||
cached: HealthSummary,
|
||||
runtime: ChannelRuntimeSnapshot,
|
||||
): boolean {
|
||||
for (const [channelId, runtimeSnapshot] of Object.entries(runtime.channels)) {
|
||||
if (!runtimeSnapshot) {
|
||||
continue;
|
||||
}
|
||||
const cachedChannel = cached.channels[channelId];
|
||||
if (
|
||||
cachedLifecycleDiffersFromRuntime({
|
||||
cachedAccount: cachedChannel,
|
||||
runtimeSnapshot,
|
||||
})
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
for (const [channelId, accounts] of Object.entries(runtime.channelAccounts)) {
|
||||
if (!accounts) {
|
||||
continue;
|
||||
}
|
||||
const cachedChannel = cached.channels[channelId];
|
||||
for (const [accountId, runtimeSnapshot] of Object.entries(accounts)) {
|
||||
if (!runtimeSnapshot) {
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
cachedLifecycleDiffersFromRuntime({
|
||||
cachedAccount: cachedAccountForRuntimeSnapshot({
|
||||
cachedChannel,
|
||||
accountId,
|
||||
}),
|
||||
runtimeSnapshot,
|
||||
})
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export const healthHandlers: GatewayRequestHandlers = {
|
||||
health: async ({ respond, context, params, client }) => {
|
||||
const { getHealthCache, refreshHealthSnapshot, logHealth } = context;
|
||||
@@ -15,7 +90,23 @@ export const healthHandlers: GatewayRequestHandlers = {
|
||||
const includeSensitive = scopes.includes(ADMIN_SCOPE);
|
||||
const now = Date.now();
|
||||
const cached = getHealthCache();
|
||||
if (!wantsProbe && cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
|
||||
let cachedDiffersFromRuntime = false;
|
||||
if (!wantsProbe && cached) {
|
||||
try {
|
||||
cachedDiffersFromRuntime = cachedHealthDiffersFromRuntime(
|
||||
cached,
|
||||
context.getRuntimeSnapshot(),
|
||||
);
|
||||
} catch {
|
||||
cachedDiffersFromRuntime = false;
|
||||
}
|
||||
}
|
||||
if (
|
||||
!wantsProbe &&
|
||||
cached &&
|
||||
!cachedDiffersFromRuntime &&
|
||||
now - cached.ts < HEALTH_REFRESH_INTERVAL_MS
|
||||
) {
|
||||
respond(true, cached, undefined, { cached: true });
|
||||
void refreshHealthSnapshot({ probe: false, includeSensitive }).catch((err) =>
|
||||
logHealth.error(`background health refresh failed: ${formatError(err)}`),
|
||||
|
||||
@@ -1921,6 +1921,174 @@ describe("gateway healthHandlers.status scope handling", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("gateway healthHandlers.health cache freshness", () => {
|
||||
let healthHandlers: typeof import("./health.js").healthHandlers;
|
||||
|
||||
beforeAll(async () => {
|
||||
({ healthHandlers } = await import("./health.js"));
|
||||
});
|
||||
|
||||
it("refreshes cached health when runtime channel lifecycle has changed", async () => {
|
||||
const cached = {
|
||||
ok: true,
|
||||
ts: Date.now(),
|
||||
durationMs: 1,
|
||||
channels: {
|
||||
discord: {
|
||||
configured: true,
|
||||
running: false,
|
||||
connected: false,
|
||||
accounts: {
|
||||
default: {
|
||||
accountId: "default",
|
||||
configured: true,
|
||||
running: false,
|
||||
connected: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
channelOrder: ["discord"],
|
||||
channelLabels: { discord: "Discord" },
|
||||
heartbeatSeconds: 0,
|
||||
defaultAgentId: "main",
|
||||
agents: [],
|
||||
sessions: { path: "/tmp/sessions.json", count: 0, recent: [] },
|
||||
};
|
||||
const fresh = {
|
||||
...cached,
|
||||
ts: cached.ts + 1,
|
||||
channels: {
|
||||
discord: {
|
||||
...cached.channels.discord,
|
||||
running: true,
|
||||
connected: true,
|
||||
accounts: {
|
||||
default: {
|
||||
...cached.channels.discord.accounts.default,
|
||||
running: true,
|
||||
connected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const respond = vi.fn();
|
||||
const refreshHealthSnapshot = vi.fn().mockResolvedValue(fresh);
|
||||
|
||||
await healthHandlers.health({
|
||||
req: {} as never,
|
||||
params: {} as never,
|
||||
respond: respond as never,
|
||||
context: {
|
||||
getHealthCache: () => cached,
|
||||
refreshHealthSnapshot,
|
||||
getRuntimeSnapshot: () => ({
|
||||
channels: {},
|
||||
channelAccounts: {
|
||||
discord: {
|
||||
default: {
|
||||
accountId: "default",
|
||||
running: true,
|
||||
connected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
logHealth: { error: vi.fn() },
|
||||
} as never,
|
||||
client: { connect: { role: "operator", scopes: ["operator.read"] } } as never,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({
|
||||
probe: false,
|
||||
includeSensitive: false,
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(true, fresh, undefined);
|
||||
});
|
||||
|
||||
it("refreshes cached health when a runtime account is missing from the cached account summary", async () => {
|
||||
const cached = {
|
||||
ok: true,
|
||||
ts: Date.now(),
|
||||
durationMs: 1,
|
||||
channels: {
|
||||
discord: {
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: true,
|
||||
accounts: {
|
||||
default: {
|
||||
accountId: "default",
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
channelOrder: ["discord"],
|
||||
channelLabels: { discord: "Discord" },
|
||||
heartbeatSeconds: 0,
|
||||
defaultAgentId: "main",
|
||||
agents: [],
|
||||
sessions: { path: "/tmp/sessions.json", count: 0, recent: [] },
|
||||
};
|
||||
const fresh = {
|
||||
...cached,
|
||||
ts: cached.ts + 1,
|
||||
channels: {
|
||||
discord: {
|
||||
...cached.channels.discord,
|
||||
accounts: {
|
||||
...cached.channels.discord.accounts,
|
||||
work: {
|
||||
accountId: "work",
|
||||
configured: true,
|
||||
running: true,
|
||||
connected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
const respond = vi.fn();
|
||||
const refreshHealthSnapshot = vi.fn().mockResolvedValue(fresh);
|
||||
|
||||
await healthHandlers.health({
|
||||
req: {} as never,
|
||||
params: {} as never,
|
||||
respond: respond as never,
|
||||
context: {
|
||||
getHealthCache: () => cached,
|
||||
refreshHealthSnapshot,
|
||||
getRuntimeSnapshot: () => ({
|
||||
channels: {},
|
||||
channelAccounts: {
|
||||
discord: {
|
||||
work: {
|
||||
accountId: "work",
|
||||
running: true,
|
||||
connected: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
logHealth: { error: vi.fn() },
|
||||
} as never,
|
||||
client: { connect: { role: "operator", scopes: ["operator.read"] } } as never,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(refreshHealthSnapshot).toHaveBeenCalledWith({
|
||||
probe: false,
|
||||
includeSensitive: false,
|
||||
});
|
||||
expect(respond).toHaveBeenCalledWith(true, fresh, undefined);
|
||||
});
|
||||
});
|
||||
|
||||
describe("logs.tail", () => {
|
||||
const logsNoop = () => false;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user