mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:50:43 +00:00
fix(gateway): restart channels after secret reload (#70720)
* fix(gateway): restart channels after secret reload * fix(gateway): serialize secrets.reload and isolate channel restart errors Address review feedback from Greptile (P1), Codex (P2), and Aisle (Medium, CWE-362) on #70720: - Serialize the entire secrets.reload path through a promise tail lock so concurrent callers cannot overlap the stop/start loop or diff against a stale pre-activation snapshot. - Wrap each channel's stop/start pair in a try/catch so one channel failing to restart does not leave other changed channels unrestarted. - Register slack/zalo/discord channel plugins with reload.configPrefixes in the test setup so channels.<id>.* diff paths actually match a restart rule (without this, the diff falls through to restart-gateway and the handler never enters the per-channel restart branch). - Add tests covering concurrent-reload serialization and per-channel restart-failure isolation. * fix(gateway): surface channel restart failures from secrets.reload Address review feedback on the previous commit: - Codex P1: `secrets.reload` swallowed per-channel restart failures and still returned `{ ok: true }`, so a rotation that left a channel on the old secret looked successful to the caller. The handler now collects restart failures during the loop and throws an aggregate error after attempting every channel, so the client-side RPC response surfaces the partial failure while unaffected channels still restart (preserving the original Greptile P1 non-cascading semantic). - Greptile P2: test mock-call assertions sorted the captured channel arguments so they no longer depend on `Set`/object-key iteration order, which is not a stable contract of the handler. * fix(gateway): harden secrets reload followups * docs(changelog): note secret-backed channel restart on secrets.reload * test(gateway): align secrets reload snapshot activation * test(gateway): reset plugin runtime state in aux handlers * fix(gateway): refresh reload rules and roll back channels * fix(gateway): harden secrets.reload rollback tests * test(gateway): inject aux handler reload plan * test(gateway): avoid resettable reload-plan mocks * test(gateway): isolate aux handler tests from skip env-var leakage test-helpers.mocks.ts and test-helpers.server.ts set OPENCLAW_SKIP_CHANNELS=1 / OPENCLAW_SKIP_PROVIDERS=1 at module load. When a shared vitest worker imports those helpers before this file's tests run, the leaked env vars route the secrets.reload skip-mode branch and the channel restart loop never fires. Add a beforeEach that clears both env vars so the suite is independent of worker import order. * fix(gateway): restore required generation on secrets.reload rollback setCurrentSharedGatewaySessionGeneration can clear `required` as a side effect of activating a new generation. The previous rollback path restored only `current`, leaving `required` cleared and weakening shared-gateway auth-generation enforcement after a failed reload (Aisle CWE-287). Capture both fields before activation and restore both in the catch block. Add a focused regression test that locks in the contract. * fix(gateway): track restart channels for rollback before stopChannel awaits Pushing to stoppedChannels only after `await stopChannel` succeeded meant that if stopChannel rejected mid-call (for example, a plugin stopAccount hook throws after the runtime already closed the socket), the rollback loop skipped that channel entirely. A failed secrets.reload could then leave the channel down. Track the channel before awaiting so rollback always attempts to bring it back, and add a regression test.
This commit is contained in:
@@ -198,6 +198,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Memory/dreaming: decouple the managed dreaming cron from heartbeat by running it as an isolated lightweight agent turn, so dreaming runs even when heartbeat is disabled for the default agent and is no longer skipped by `heartbeat.activeHours`. `openclaw doctor --fix` migrates stale main-session dreaming jobs in persisted cron configs to the new shape. Fixes #69811, #67397, #68972. (#70737) Thanks @jalehman.
|
||||
- Agents/CLI: keep `--agent` plus `--session-id` lookup scoped to the requested agent store, so explicit agent resumes cannot select another agent's session. (#70985) Thanks @frankekn.
|
||||
- Plugins/Comfy: read workflow and cloud auth configuration from `plugins.entries.comfy.config` while preserving legacy Comfy config fallback, so image, video, and music workflows pass config validation. Fixes #61915. (#63058) Thanks @547895019.
|
||||
- Gateway/secrets: restart secret-backed channels such as Slack and Zalo during `secrets.reload` so rotated webhook secrets take effect immediately, with the reload serialized and per-channel restart errors isolated. (#70720) Thanks @drobison00.
|
||||
|
||||
## 2026.4.22
|
||||
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { getActivePluginRegistry } from "../plugins/runtime.js";
|
||||
import {
|
||||
getActivePluginChannelRegistryVersion,
|
||||
getActivePluginRegistry,
|
||||
getActivePluginRegistryVersion,
|
||||
} from "../plugins/runtime.js";
|
||||
import { isPlainObject } from "../utils.js";
|
||||
|
||||
export type ChannelKind = ChannelId;
|
||||
@@ -113,12 +117,22 @@ const BASE_RELOAD_RULES_TAIL: ReloadRule[] = [
|
||||
|
||||
let cachedReloadRules: ReloadRule[] | null = null;
|
||||
let cachedRegistry: ReturnType<typeof getActivePluginRegistry> | null = null;
|
||||
let cachedActiveRegistryVersion = -1;
|
||||
let cachedChannelRegistryVersion = -1;
|
||||
|
||||
function listReloadRules(): ReloadRule[] {
|
||||
const registry = getActivePluginRegistry();
|
||||
if (registry !== cachedRegistry) {
|
||||
const activeRegistryVersion = getActivePluginRegistryVersion();
|
||||
const channelRegistryVersion = getActivePluginChannelRegistryVersion();
|
||||
if (
|
||||
registry !== cachedRegistry ||
|
||||
activeRegistryVersion !== cachedActiveRegistryVersion ||
|
||||
channelRegistryVersion !== cachedChannelRegistryVersion
|
||||
) {
|
||||
cachedReloadRules = null;
|
||||
cachedRegistry = registry;
|
||||
cachedActiveRegistryVersion = activeRegistryVersion;
|
||||
cachedChannelRegistryVersion = channelRegistryVersion;
|
||||
}
|
||||
if (cachedReloadRules) {
|
||||
return cachedReloadRules;
|
||||
|
||||
@@ -11,7 +11,11 @@ import type {
|
||||
ConfigWriteNotification,
|
||||
OpenClawConfig,
|
||||
} from "../config/config.js";
|
||||
import { setActivePluginRegistry } from "../plugins/runtime.js";
|
||||
import {
|
||||
pinActivePluginChannelRegistry,
|
||||
resetPluginRuntimeStateForTest,
|
||||
setActivePluginRegistry,
|
||||
} from "../plugins/runtime.js";
|
||||
import { createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||
import {
|
||||
buildGatewayReloadPlan,
|
||||
@@ -171,6 +175,7 @@ describe("buildGatewayReloadPlan", () => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetPluginRuntimeStateForTest();
|
||||
setActivePluginRegistry(emptyRegistry);
|
||||
});
|
||||
|
||||
@@ -211,6 +216,23 @@ describe("buildGatewayReloadPlan", () => {
|
||||
expect(plan.restartChannels).toEqual(expected);
|
||||
});
|
||||
|
||||
it("refreshes channel reload rules when only the tracked channel registry changes", () => {
|
||||
const activeOnlyRegistry = createTestRegistry([]);
|
||||
const channelOnlyRegistry = createTestRegistry([
|
||||
{ pluginId: "telegram", plugin: telegramPlugin, source: "test" },
|
||||
]);
|
||||
|
||||
setActivePluginRegistry(activeOnlyRegistry);
|
||||
const beforePinPlan = buildGatewayReloadPlan(["channels.telegram.botToken"]);
|
||||
expect(beforePinPlan.restartGateway).toBe(true);
|
||||
expect(beforePinPlan.restartChannels).toEqual(new Set());
|
||||
|
||||
pinActivePluginChannelRegistry(channelOnlyRegistry);
|
||||
const afterPinPlan = buildGatewayReloadPlan(["channels.telegram.botToken"]);
|
||||
expect(afterPinPlan.restartGateway).toBe(false);
|
||||
expect(afterPinPlan.restartChannels).toEqual(new Set(["telegram"]));
|
||||
});
|
||||
|
||||
it("restarts heartbeat when model-related config changes", () => {
|
||||
const plan = buildGatewayReloadPlan([
|
||||
"models.providers.openai.models",
|
||||
|
||||
525
src/gateway/server-aux-handlers.test.ts
Normal file
525
src/gateway/server-aux-handlers.test.ts
Normal file
@@ -0,0 +1,525 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import {
|
||||
activateSecretsRuntimeSnapshot,
|
||||
clearSecretsRuntimeSnapshot,
|
||||
getActiveSecretsRuntimeSnapshot,
|
||||
type PreparedSecretsRuntimeSnapshot,
|
||||
} from "../secrets/runtime.js";
|
||||
import type { GatewayReloadPlan } from "./config-reload.js";
|
||||
import { createGatewayAuxHandlers } from "./server-aux-handlers.js";
|
||||
|
||||
function asConfig(value: unknown): OpenClawConfig {
|
||||
return value as OpenClawConfig;
|
||||
}
|
||||
|
||||
function createReloadPlan(overrides?: Partial<GatewayReloadPlan>): GatewayReloadPlan {
|
||||
return {
|
||||
changedPaths: overrides?.changedPaths ?? [],
|
||||
restartGateway: overrides?.restartGateway ?? false,
|
||||
restartReasons: overrides?.restartReasons ?? [],
|
||||
hotReasons: overrides?.hotReasons ?? [],
|
||||
reloadHooks: overrides?.reloadHooks ?? false,
|
||||
restartGmailWatcher: overrides?.restartGmailWatcher ?? false,
|
||||
restartCron: overrides?.restartCron ?? false,
|
||||
restartHeartbeat: overrides?.restartHeartbeat ?? false,
|
||||
restartHealthMonitor: overrides?.restartHealthMonitor ?? false,
|
||||
restartChannels: overrides?.restartChannels ?? new Set(),
|
||||
noopPaths: overrides?.noopPaths ?? [],
|
||||
};
|
||||
}
|
||||
|
||||
function createSnapshot(config: OpenClawConfig): PreparedSecretsRuntimeSnapshot {
|
||||
return {
|
||||
sourceConfig: asConfig({}),
|
||||
config,
|
||||
authStores: [],
|
||||
warnings: [],
|
||||
webTools: {
|
||||
search: { providerSource: "none", diagnostics: [] },
|
||||
fetch: { providerSource: "none", diagnostics: [] },
|
||||
diagnostics: [],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function invokeSecretsReload(params: {
|
||||
handlers: ReturnType<typeof createGatewayAuxHandlers>["extraHandlers"];
|
||||
respond: ReturnType<typeof vi.fn>;
|
||||
}) {
|
||||
await params.handlers["secrets.reload"]({
|
||||
req: { type: "req", id: "1", method: "secrets.reload" },
|
||||
params: {},
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
respond: params.respond as Parameters<
|
||||
ReturnType<typeof createGatewayAuxHandlers>["extraHandlers"]["secrets.reload"]
|
||||
>[0]["respond"],
|
||||
context: {} as never,
|
||||
});
|
||||
}
|
||||
|
||||
// Other gateway test helpers (e.g. test-helpers.mocks.ts, test-helpers.server.ts)
|
||||
// set OPENCLAW_SKIP_CHANNELS / OPENCLAW_SKIP_PROVIDERS at module load. When a
|
||||
// shared vitest worker imports those helpers before this file's tests run,
|
||||
// the leaked env vars route the secrets.reload skip-mode branch and prevent
|
||||
// the channel restart loop from firing. Reset them before every test so this
|
||||
// suite is independent of worker import order.
|
||||
beforeEach(() => {
|
||||
delete process.env.OPENCLAW_SKIP_CHANNELS;
|
||||
delete process.env.OPENCLAW_SKIP_PROVIDERS;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearSecretsRuntimeSnapshot();
|
||||
delete process.env.OPENCLAW_SKIP_CHANNELS;
|
||||
delete process.env.OPENCLAW_SKIP_PROVIDERS;
|
||||
});
|
||||
|
||||
describe("gateway aux handlers", () => {
|
||||
it("restarts only channels whose resolved secret-backed config changed on secrets.reload", async () => {
|
||||
const buildReloadPlanCalls: string[][] = [];
|
||||
const buildReloadPlan = (changedPaths: string[]) => {
|
||||
buildReloadPlanCalls.push([...changedPaths]);
|
||||
return createReloadPlan({
|
||||
restartChannels: new Set(["slack", "zalo"]),
|
||||
});
|
||||
};
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
zalo: { webhookSecret: "old-zalo-secret" },
|
||||
discord: { token: "unchanged-discord-token" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const prepared = createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "new-slack-secret" },
|
||||
zalo: { webhookSecret: "new-zalo-secret" },
|
||||
discord: { token: "unchanged-discord-token" },
|
||||
},
|
||||
}),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockImplementation(async () => {
|
||||
activateSecretsRuntimeSnapshot(prepared);
|
||||
return prepared;
|
||||
});
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: vi.fn() },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
expect(activateRuntimeSecrets).toHaveBeenCalledTimes(1);
|
||||
expect(buildReloadPlanCalls).toEqual([
|
||||
["channels.slack.signingSecret", "channels.zalo.webhookSecret"],
|
||||
]);
|
||||
expect(stopChannel.mock.calls.map(([ch]) => ch).toSorted((a, b) => a.localeCompare(b))).toEqual(
|
||||
["slack", "zalo"],
|
||||
);
|
||||
expect(
|
||||
startChannel.mock.calls.map(([ch]) => ch).toSorted((a, b) => a.localeCompare(b)),
|
||||
).toEqual(["slack", "zalo"]);
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, warningCount: 0 });
|
||||
});
|
||||
|
||||
it("coalesces concurrent secrets.reload calls so channels are not restarted twice", async () => {
|
||||
const buildReloadPlan = () =>
|
||||
createReloadPlan({
|
||||
restartChannels: new Set(["slack"]),
|
||||
});
|
||||
const initialActive = createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
},
|
||||
}),
|
||||
);
|
||||
activateSecretsRuntimeSnapshot(initialActive);
|
||||
|
||||
const preparedFirst = createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "new-slack-secret" },
|
||||
},
|
||||
}),
|
||||
);
|
||||
const activationOrder: string[] = [];
|
||||
const activateRuntimeSecrets = vi.fn().mockImplementationOnce(async () => {
|
||||
activationOrder.push("first-start");
|
||||
// Yield the event loop to let a concurrent caller enter if the
|
||||
// handler were not serialized.
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
activateSecretsRuntimeSnapshot(preparedFirst);
|
||||
activationOrder.push("first-end");
|
||||
return preparedFirst;
|
||||
});
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: vi.fn() },
|
||||
});
|
||||
|
||||
await Promise.all([
|
||||
invokeSecretsReload({ handlers: extraHandlers, respond }),
|
||||
invokeSecretsReload({ handlers: extraHandlers, respond }),
|
||||
]);
|
||||
|
||||
expect(activationOrder).toEqual(["first-start", "first-end"]);
|
||||
expect(activateRuntimeSecrets).toHaveBeenCalledTimes(1);
|
||||
expect(stopChannel.mock.calls).toEqual([["slack"]]);
|
||||
expect(startChannel.mock.calls).toEqual([["slack"]]);
|
||||
expect(respond).toHaveBeenNthCalledWith(1, true, { ok: true, warningCount: 0 });
|
||||
expect(respond).toHaveBeenNthCalledWith(2, true, { ok: true, warningCount: 0 });
|
||||
});
|
||||
|
||||
it("rolls back stopped channels when a later restart fails", async () => {
|
||||
const buildReloadPlan = () =>
|
||||
createReloadPlan({
|
||||
restartChannels: new Set(["slack", "zalo"]),
|
||||
});
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
zalo: { webhookSecret: "old-zalo-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockResolvedValue(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "new-slack-secret" },
|
||||
zalo: { webhookSecret: "new-zalo-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(undefined)
|
||||
.mockImplementationOnce(async () => {
|
||||
throw new Error("zalo refused to start");
|
||||
})
|
||||
.mockResolvedValue(undefined);
|
||||
const logChannelsInfo = vi.fn();
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: logChannelsInfo },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
expect(stopChannel.mock.calls).toEqual([["slack"], ["zalo"], ["slack"]]);
|
||||
expect(startChannel.mock.calls).toEqual([["slack"], ["zalo"], ["slack"], ["zalo"]]);
|
||||
expect(
|
||||
logChannelsInfo.mock.calls.some(([msg]) =>
|
||||
String(msg).startsWith("failed to restart zalo channel after secrets reload"),
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
logChannelsInfo.mock.calls.some(([msg]) =>
|
||||
String(msg).startsWith("rolling back slack channel after secrets reload failure"),
|
||||
),
|
||||
).toBe(true);
|
||||
expect(
|
||||
logChannelsInfo.mock.calls.some(([msg]) =>
|
||||
String(msg).startsWith("rolling back zalo channel after secrets reload failure"),
|
||||
),
|
||||
).toBe(true);
|
||||
// The handler surfaces the partial-failure so the caller can retry/alert
|
||||
// instead of treating a swallowed restart error as a successful rotation.
|
||||
expect(respond.mock.calls).toHaveLength(1);
|
||||
const [okFlag, successPayload, errorPayload] = respond.mock.calls[0];
|
||||
expect(okFlag).toBe(false);
|
||||
expect(successPayload).toBeUndefined();
|
||||
expect(String(errorPayload?.message ?? "")).toBe("secrets.reload failed");
|
||||
expect(getActiveSecretsRuntimeSnapshot()?.config).toEqual(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
zalo: { webhookSecret: "old-zalo-secret" },
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("attempts restart on rollback even when stopChannel itself throws mid-reload", async () => {
|
||||
// If stopChannel throws after partially stopping a channel (for example,
|
||||
// a plugin hook rejects after the runtime already closed the socket),
|
||||
// the rollback path must still try to restart that channel; otherwise a
|
||||
// failed secrets.reload can leave it down.
|
||||
const buildReloadPlan = () =>
|
||||
createReloadPlan({
|
||||
restartChannels: new Set(["slack", "zalo"]),
|
||||
});
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
zalo: { webhookSecret: "old-zalo-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockResolvedValue(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "new-slack-secret" },
|
||||
zalo: { webhookSecret: "new-zalo-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const stopChannel = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(undefined)
|
||||
.mockRejectedValueOnce(new Error("zalo stop hook failed after socket close"));
|
||||
const startChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const logChannelsInfo = vi.fn();
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: logChannelsInfo },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
// Both channels appear in the rollback log, including zalo whose
|
||||
// stopChannel rejected.
|
||||
const rollbackLogs = logChannelsInfo.mock.calls
|
||||
.map(([msg]) => String(msg))
|
||||
.filter((msg) => msg.startsWith("rolling back "));
|
||||
expect(rollbackLogs).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.stringContaining("rolling back slack channel"),
|
||||
expect.stringContaining("rolling back zalo channel"),
|
||||
]),
|
||||
);
|
||||
// startChannel was invoked for zalo on rollback even though the original
|
||||
// stopChannel(zalo) rejected.
|
||||
expect(startChannel.mock.calls.map(([ch]) => ch)).toEqual(
|
||||
expect.arrayContaining(["slack", "zalo"]),
|
||||
);
|
||||
expect(respond.mock.calls).toHaveLength(1);
|
||||
expect(respond.mock.calls[0][0]).toBe(false);
|
||||
});
|
||||
|
||||
it("restores both current and required shared-gateway generation on reload failure", async () => {
|
||||
// Locks in the auth-generation rollback contract: a failed reload must
|
||||
// not leave `required` cleared if `setCurrentSharedGatewaySessionGeneration`
|
||||
// cleared it during activation, otherwise stale clients matching `current`
|
||||
// could remain authorized after rollback.
|
||||
const buildReloadPlan = () =>
|
||||
createReloadPlan({
|
||||
restartChannels: new Set(["slack"]),
|
||||
});
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: { slack: { signingSecret: "old-slack-secret" } },
|
||||
}),
|
||||
),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockResolvedValue(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: { slack: { signingSecret: "new-slack-secret" } },
|
||||
}),
|
||||
),
|
||||
);
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi.fn().mockRejectedValue(new Error("slack refused to start"));
|
||||
const respond = vi.fn();
|
||||
|
||||
const sharedGatewaySessionGenerationState = {
|
||||
current: "gen-a" as string | undefined,
|
||||
required: "gen-a" as string | undefined | null,
|
||||
};
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState,
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => "gen-b",
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: vi.fn() },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
expect(sharedGatewaySessionGenerationState.current).toBe("gen-a");
|
||||
expect(sharedGatewaySessionGenerationState.required).toBe("gen-a");
|
||||
expect(respond.mock.calls).toHaveLength(1);
|
||||
expect(respond.mock.calls[0][0]).toBe(false);
|
||||
});
|
||||
|
||||
it("fails reload when channel restarts are required but skip flags block them", async () => {
|
||||
const buildReloadPlan = () =>
|
||||
createReloadPlan({
|
||||
restartChannels: new Set(["slack"]),
|
||||
});
|
||||
process.env.OPENCLAW_SKIP_CHANNELS = "1";
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockResolvedValue(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "new-slack-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: vi.fn() },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
expect(stopChannel).not.toHaveBeenCalled();
|
||||
expect(startChannel).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(
|
||||
false,
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
message: "secrets.reload failed",
|
||||
}),
|
||||
);
|
||||
expect(getActiveSecretsRuntimeSnapshot()?.config).toEqual(
|
||||
asConfig({
|
||||
channels: {
|
||||
slack: { signingSecret: "old-slack-secret" },
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("does not restart channels when resolved secrets do not change channel config", async () => {
|
||||
const buildReloadPlanCalls: string[][] = [];
|
||||
const buildReloadPlan = (changedPaths: string[]) => {
|
||||
buildReloadPlanCalls.push([...changedPaths]);
|
||||
return createReloadPlan();
|
||||
};
|
||||
activateSecretsRuntimeSnapshot(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
gateway: {
|
||||
auth: { mode: "token", token: "old-token" },
|
||||
},
|
||||
channels: {
|
||||
slack: { signingSecret: "same-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const activateRuntimeSecrets = vi.fn().mockResolvedValue(
|
||||
createSnapshot(
|
||||
asConfig({
|
||||
gateway: {
|
||||
auth: { mode: "token", token: "new-token" },
|
||||
},
|
||||
channels: {
|
||||
slack: { signingSecret: "same-secret" },
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
const stopChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const startChannel = vi.fn().mockResolvedValue(undefined);
|
||||
const respond = vi.fn();
|
||||
|
||||
const { extraHandlers } = createGatewayAuxHandlers({
|
||||
log: {},
|
||||
activateRuntimeSecrets,
|
||||
buildReloadPlan,
|
||||
sharedGatewaySessionGenerationState: { current: undefined, required: null },
|
||||
resolveSharedGatewaySessionGenerationForConfig: () => undefined,
|
||||
clients: [],
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels: { info: vi.fn() },
|
||||
});
|
||||
|
||||
await invokeSecretsReload({ handlers: extraHandlers, respond });
|
||||
|
||||
expect(buildReloadPlanCalls).toEqual([["gateway.auth.token"]]);
|
||||
expect(stopChannel).not.toHaveBeenCalled();
|
||||
expect(startChannel).not.toHaveBeenCalled();
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, warningCount: 0 });
|
||||
});
|
||||
});
|
||||
@@ -1,11 +1,21 @@
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js";
|
||||
import { type PluginApprovalRequestPayload } from "../infra/plugin-approvals.js";
|
||||
import {
|
||||
resolveCommandSecretsFromActiveRuntimeSnapshot,
|
||||
type CommandSecretAssignment,
|
||||
} from "../secrets/runtime-command-secrets.js";
|
||||
import { getActiveSecretsRuntimeSnapshot } from "../secrets/runtime.js";
|
||||
import {
|
||||
activateSecretsRuntimeSnapshot,
|
||||
getActiveSecretsRuntimeSnapshot,
|
||||
} from "../secrets/runtime.js";
|
||||
import {
|
||||
buildGatewayReloadPlan,
|
||||
diffConfigPaths,
|
||||
type ChannelKind,
|
||||
type GatewayReloadPlan,
|
||||
} from "./config-reload.js";
|
||||
import { createExecApprovalIosPushDelivery } from "./exec-approval-ios-push.js";
|
||||
import { ExecApprovalManager } from "./exec-approval-manager.js";
|
||||
import { createExecApprovalHandlers } from "./server-methods/exec-approval.js";
|
||||
@@ -25,12 +35,20 @@ type GatewayAuxHandlerLogger = {
|
||||
debug?: (message: string) => void;
|
||||
};
|
||||
|
||||
type ReloadSecretsResult = {
|
||||
warningCount: number;
|
||||
};
|
||||
|
||||
export function createGatewayAuxHandlers(params: {
|
||||
log: GatewayAuxHandlerLogger;
|
||||
activateRuntimeSecrets: ActivateRuntimeSecrets;
|
||||
buildReloadPlan?: (changedPaths: string[]) => GatewayReloadPlan;
|
||||
sharedGatewaySessionGenerationState: SharedGatewaySessionGenerationState;
|
||||
resolveSharedGatewaySessionGenerationForConfig: (config: OpenClawConfig) => string | undefined;
|
||||
clients: Iterable<SharedGatewayAuthClient>;
|
||||
startChannel: (name: ChannelKind) => Promise<void>;
|
||||
stopChannel: (name: ChannelKind) => Promise<void>;
|
||||
logChannels: { info: (msg: string) => void };
|
||||
}) {
|
||||
const execApprovalManager = new ExecApprovalManager();
|
||||
const execApprovalForwarder = createExecApprovalForwarder();
|
||||
@@ -39,36 +57,138 @@ export function createGatewayAuxHandlers(params: {
|
||||
forwarder: execApprovalForwarder,
|
||||
iosPushDelivery: execApprovalIosPushDelivery,
|
||||
});
|
||||
const buildReloadPlan = params.buildReloadPlan ?? buildGatewayReloadPlan;
|
||||
const pluginApprovalManager = new ExecApprovalManager<PluginApprovalRequestPayload>();
|
||||
const pluginApprovalHandlers = createPluginApprovalHandlers(pluginApprovalManager, {
|
||||
forwarder: execApprovalForwarder,
|
||||
});
|
||||
// Serialize the entire `secrets.reload` path (activation + channel restart)
|
||||
// so concurrent callers cannot overlap the stop/start loop and so the
|
||||
// "before" snapshot used for the reload-plan diff is always the snapshot
|
||||
// replaced by this call's activation, not one captured by a prior caller.
|
||||
let reloadInFlight: Promise<ReloadSecretsResult> | null = null;
|
||||
const runExclusiveReload = (
|
||||
fn: () => Promise<ReloadSecretsResult>,
|
||||
): Promise<ReloadSecretsResult> => {
|
||||
if (reloadInFlight) {
|
||||
return reloadInFlight;
|
||||
}
|
||||
const run = (async () => {
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
reloadInFlight = null;
|
||||
}
|
||||
})();
|
||||
reloadInFlight = run;
|
||||
return run;
|
||||
};
|
||||
const secretsHandlers = createSecretsHandlers({
|
||||
reloadSecrets: async () => {
|
||||
const active = getActiveSecretsRuntimeSnapshot();
|
||||
if (!active) {
|
||||
throw new Error("Secrets runtime snapshot is not active.");
|
||||
}
|
||||
const previousSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.current;
|
||||
const prepared = await params.activateRuntimeSecrets(active.sourceConfig, {
|
||||
reason: "reload",
|
||||
activate: true,
|
||||
});
|
||||
const nextSharedGatewaySessionGeneration =
|
||||
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
|
||||
setCurrentSharedGatewaySessionGeneration(
|
||||
params.sharedGatewaySessionGenerationState,
|
||||
nextSharedGatewaySessionGeneration,
|
||||
);
|
||||
if (previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
return { warningCount: prepared.warnings.length };
|
||||
},
|
||||
reloadSecrets: () =>
|
||||
runExclusiveReload(async () => {
|
||||
const previousSnapshot = getActiveSecretsRuntimeSnapshot();
|
||||
if (!previousSnapshot) {
|
||||
throw new Error("Secrets runtime snapshot is not active.");
|
||||
}
|
||||
// Snapshot both `current` and `required` because
|
||||
// `setCurrentSharedGatewaySessionGeneration` can clear `required` as
|
||||
// a side effect of activating a new generation. Restoring only
|
||||
// `current` on rollback would leave `required` cleared and weaken
|
||||
// shared-gateway auth-generation enforcement after a failed reload.
|
||||
const previousSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.current;
|
||||
const previousSharedGatewaySessionGenerationRequired =
|
||||
params.sharedGatewaySessionGenerationState.required;
|
||||
let nextSharedGatewaySessionGeneration = previousSharedGatewaySessionGeneration;
|
||||
let sharedGatewaySessionGenerationChanged = false;
|
||||
const stoppedChannels: ChannelKind[] = [];
|
||||
const restartedChannels = new Set<ChannelKind>();
|
||||
try {
|
||||
const prepared = await params.activateRuntimeSecrets(previousSnapshot.sourceConfig, {
|
||||
reason: "reload",
|
||||
activate: true,
|
||||
});
|
||||
nextSharedGatewaySessionGeneration =
|
||||
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
|
||||
const plan = buildReloadPlan(diffConfigPaths(previousSnapshot.config, prepared.config));
|
||||
setCurrentSharedGatewaySessionGeneration(
|
||||
params.sharedGatewaySessionGenerationState,
|
||||
nextSharedGatewaySessionGeneration,
|
||||
);
|
||||
sharedGatewaySessionGenerationChanged =
|
||||
previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration;
|
||||
if (sharedGatewaySessionGenerationChanged) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
if (plan.restartChannels.size > 0) {
|
||||
const restartChannels = [...plan.restartChannels];
|
||||
if (
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS)
|
||||
) {
|
||||
throw new Error(
|
||||
`secrets.reload requires restarting channels: ${restartChannels.join(", ")}`,
|
||||
);
|
||||
}
|
||||
const restartFailures: ChannelKind[] = [];
|
||||
for (const channel of restartChannels) {
|
||||
params.logChannels.info(`restarting ${channel} channel after secrets reload`);
|
||||
// Track for rollback before awaiting stopChannel: if stopChannel
|
||||
// throws after partially stopping the channel (for example, a
|
||||
// plugin hook rejects after the runtime already closed the
|
||||
// socket), we still need the outer catch to attempt restart so
|
||||
// the channel is not left down after a failed reload.
|
||||
stoppedChannels.push(channel);
|
||||
try {
|
||||
await params.stopChannel(channel);
|
||||
await params.startChannel(channel);
|
||||
restartedChannels.add(channel);
|
||||
} catch {
|
||||
params.logChannels.info(
|
||||
`failed to restart ${channel} channel after secrets reload`,
|
||||
);
|
||||
restartFailures.push(channel);
|
||||
}
|
||||
}
|
||||
if (restartFailures.length > 0) {
|
||||
throw new Error(
|
||||
`failed to restart channels after secrets reload: ${restartFailures.join(", ")}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
return { warningCount: prepared.warnings.length };
|
||||
} catch (err) {
|
||||
activateSecretsRuntimeSnapshot(previousSnapshot);
|
||||
params.sharedGatewaySessionGenerationState.current =
|
||||
previousSharedGatewaySessionGeneration;
|
||||
params.sharedGatewaySessionGenerationState.required =
|
||||
previousSharedGatewaySessionGenerationRequired;
|
||||
if (sharedGatewaySessionGenerationChanged) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: previousSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
for (const channel of stoppedChannels) {
|
||||
params.logChannels.info(`rolling back ${channel} channel after secrets reload failure`);
|
||||
try {
|
||||
if (restartedChannels.has(channel)) {
|
||||
await params.stopChannel(channel);
|
||||
}
|
||||
await params.startChannel(channel);
|
||||
} catch {
|
||||
params.logChannels.info(
|
||||
`failed to roll back ${channel} channel after secrets reload`,
|
||||
);
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}),
|
||||
log: params.log,
|
||||
resolveSecrets: async ({ commandName, targetIds }) => {
|
||||
const { assignments, diagnostics, inactiveRefPaths } =
|
||||
resolveCommandSecretsFromActiveRuntimeSnapshot({
|
||||
|
||||
@@ -85,7 +85,7 @@ describe("secrets handlers", () => {
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
message: "Error: reload failed",
|
||||
message: "secrets.reload failed",
|
||||
}),
|
||||
);
|
||||
});
|
||||
@@ -207,6 +207,7 @@ describe("secrets handlers", () => {
|
||||
undefined,
|
||||
expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
message: "secrets.resolve failed",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -34,14 +34,18 @@ export function createSecretsHandlers(params: {
|
||||
diagnostics: string[];
|
||||
inactiveRefPaths: string[];
|
||||
}>;
|
||||
log?: {
|
||||
warn?: (message: string) => void;
|
||||
};
|
||||
}): GatewayRequestHandlers {
|
||||
return {
|
||||
"secrets.reload": async ({ respond }) => {
|
||||
try {
|
||||
const result = await params.reloadSecrets();
|
||||
respond(true, { ok: true, warningCount: result.warningCount });
|
||||
} catch (err) {
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, String(err)));
|
||||
} catch {
|
||||
params.log?.warn?.("secrets.reload failed");
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "secrets.reload failed"));
|
||||
}
|
||||
},
|
||||
"secrets.resolve": async ({ params: requestParams, respond }) => {
|
||||
@@ -96,8 +100,9 @@ export function createSecretsHandlers(params: {
|
||||
throw new Error("secrets.resolve returned invalid payload.");
|
||||
}
|
||||
respond(true, payload);
|
||||
} catch (err) {
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, String(err)));
|
||||
} catch {
|
||||
params.log?.warn?.("secrets.resolve failed");
|
||||
respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "secrets.resolve failed"));
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
@@ -697,6 +697,9 @@ export async function startGatewayServer(
|
||||
sharedGatewaySessionGenerationState,
|
||||
resolveSharedGatewaySessionGenerationForConfig,
|
||||
clients,
|
||||
startChannel,
|
||||
stopChannel,
|
||||
logChannels,
|
||||
});
|
||||
|
||||
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
|
||||
|
||||
@@ -242,9 +242,13 @@ vi.mock("./server-channels.js", () => ({
|
||||
createChannelManager: hoisted.createChannelManager,
|
||||
}));
|
||||
|
||||
vi.mock("./config-reload.js", () => ({
|
||||
startGatewayConfigReloader: hoisted.startGatewayConfigReloader,
|
||||
}));
|
||||
vi.mock("./config-reload.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("./config-reload.js")>();
|
||||
return {
|
||||
...actual,
|
||||
startGatewayConfigReloader: hoisted.startGatewayConfigReloader,
|
||||
};
|
||||
});
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
@@ -718,7 +722,7 @@ describe("gateway hot reload", () => {
|
||||
const reload = await rpcReq<{ warningCount?: number }>(ws, "secrets.reload", {});
|
||||
expect(reload.ok).toBe(false);
|
||||
expect(reload.error?.code).toBe("UNAVAILABLE");
|
||||
expect(reload.error?.message ?? "").toContain(refId);
|
||||
expect(reload.error?.message).toBe("secrets.reload failed");
|
||||
|
||||
const postResolve = await rpcReq<{
|
||||
assignments?: Array<{ path: string; pathSegments: string[]; value: unknown }>;
|
||||
@@ -820,7 +824,7 @@ process.stdin.on("end", () => {
|
||||
const reload = await rpcReq<{ warningCount?: number }>(ws, "secrets.reload", {});
|
||||
expect(reload.ok).toBe(false);
|
||||
expect(reload.error?.code).toBe("UNAVAILABLE");
|
||||
expect(reload.error?.message ?? "").toContain("forced failure");
|
||||
expect(reload.error?.message).toBe("secrets.reload failed");
|
||||
|
||||
const postResolve = await rpcReq<{
|
||||
assignments?: Array<{ path: string; pathSegments: string[]; value: unknown }>;
|
||||
|
||||
Reference in New Issue
Block a user