refactor: share passive account lifecycle helpers

This commit is contained in:
Peter Steinberger
2026-03-10 20:41:06 +00:00
parent 50ded5052f
commit a455c0cc3d
14 changed files with 269 additions and 55 deletions

View File

@@ -21,6 +21,7 @@ import {
import {
buildAccountScopedDmSecurityPolicy,
collectOpenGroupPolicyRestrictSendersWarnings,
createAccountStatusSink,
formatNormalizedAllowFromEntries,
mapAllowFromEntries,
} from "openclaw/plugin-sdk/compat";
@@ -369,8 +370,11 @@ export const bluebubblesPlugin: ChannelPlugin<ResolvedBlueBubblesAccount> = {
startAccount: async (ctx) => {
const account = ctx.account;
const webhookPath = resolveWebhookPathFromConfig(account.config);
ctx.setStatus({
accountId: account.accountId,
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
statusSink({
baseUrl: account.baseUrl,
});
ctx.log?.info(`[${account.accountId}] starting provider (webhook=${webhookPath})`);
@@ -379,7 +383,7 @@ export const bluebubblesPlugin: ChannelPlugin<ResolvedBlueBubblesAccount> = {
config: ctx.cfg,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
webhookPath,
});
},

View File

@@ -12,6 +12,7 @@ import {
buildComputedAccountStatusSnapshot,
buildChannelConfigSchema,
DEFAULT_ACCOUNT_ID,
createAccountStatusSink,
getChatChannelMeta,
listDirectoryGroupEntriesFromMapKeys,
listDirectoryUserEntriesFromAllowFrom,
@@ -21,6 +22,7 @@ import {
PAIRING_APPROVED_MESSAGE,
resolveChannelMediaMaxBytes,
resolveGoogleChatGroupRequireMention,
runPassiveAccountLifecycle,
type ChannelDock,
type ChannelMessageActionAdapter,
type ChannelPlugin,
@@ -509,38 +511,40 @@ export const googlechatPlugin: ChannelPlugin<ResolvedGoogleChatAccount> = {
gateway: {
startAccount: async (ctx) => {
const account = ctx.account;
ctx.log?.info(`[${account.accountId}] starting Google Chat webhook`);
ctx.setStatus({
const statusSink = createAccountStatusSink({
accountId: account.accountId,
setStatus: ctx.setStatus,
});
ctx.log?.info(`[${account.accountId}] starting Google Chat webhook`);
statusSink({
running: true,
lastStartAt: Date.now(),
webhookPath: resolveGoogleChatWebhookPath({ account }),
audienceType: account.config.audienceType,
audience: account.config.audience,
});
const unregister = await startGoogleChatMonitor({
await runPassiveAccountLifecycle({
abortSignal: ctx.abortSignal,
start: async () =>
await startGoogleChatMonitor({
account,
config: ctx.cfg,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
webhookPath: account.config.webhookPath,
webhookUrl: account.config.webhookUrl,
statusSink: (patch) => ctx.setStatus({ accountId: account.accountId, ...patch }),
});
// Keep the promise pending until abort (webhook mode is passive).
await new Promise<void>((resolve) => {
if (ctx.abortSignal.aborted) {
resolve();
return;
}
ctx.abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
statusSink,
}),
stop: async (unregister) => {
unregister?.();
ctx.setStatus({
accountId: account.accountId,
},
onStop: async () => {
statusSink({
running: false,
lastStopAt: Date.now(),
});
},
});
},
},
};

View File

@@ -0,0 +1,67 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createStartAccountContext } from "../../test-utils/start-account-context.js";
import type { ResolvedIrcAccount } from "./accounts.js";
const hoisted = vi.hoisted(() => ({
monitorIrcProvider: vi.fn(),
}));
vi.mock("./monitor.js", async () => {
const actual = await vi.importActual<typeof import("./monitor.js")>("./monitor.js");
return {
...actual,
monitorIrcProvider: hoisted.monitorIrcProvider,
};
});
import { ircPlugin } from "./channel.js";
describe("ircPlugin gateway.startAccount", () => {
afterEach(() => {
vi.clearAllMocks();
});
it("keeps startAccount pending until abort, then stops the monitor", async () => {
const stop = vi.fn();
hoisted.monitorIrcProvider.mockResolvedValue({ stop });
const account: ResolvedIrcAccount = {
accountId: "default",
enabled: true,
name: "default",
configured: true,
host: "irc.example.com",
port: 6697,
tls: true,
nick: "openclaw",
username: "openclaw",
realname: "OpenClaw",
password: "",
passwordSource: "none",
config: {} as ResolvedIrcAccount["config"],
};
const abort = new AbortController();
const task = ircPlugin.gateway!.startAccount!(
createStartAccountContext({
account,
abortSignal: abort.signal,
}),
);
let settled = false;
void task.then(() => {
settled = true;
});
await vi.waitFor(() => {
expect(hoisted.monitorIrcProvider).toHaveBeenCalledOnce();
});
expect(settled).toBe(false);
expect(stop).not.toHaveBeenCalled();
abort.abort();
await task;
expect(stop).toHaveBeenCalledOnce();
});
});

View File

@@ -9,10 +9,12 @@ import {
buildBaseAccountStatusSnapshot,
buildBaseChannelStatusSummary,
buildChannelConfigSchema,
createAccountStatusSink,
DEFAULT_ACCOUNT_ID,
deleteAccountFromConfigSection,
getChatChannelMeta,
PAIRING_APPROVED_MESSAGE,
runPassiveAccountLifecycle,
setAccountEnabledInConfigSection,
type ChannelPlugin,
} from "openclaw/plugin-sdk/irc";
@@ -353,6 +355,10 @@ export const ircPlugin: ChannelPlugin<ResolvedIrcAccount, IrcProbe> = {
gateway: {
startAccount: async (ctx) => {
const account = ctx.account;
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
if (!account.configured) {
throw new Error(
`IRC is not configured for account "${account.accountId}" (need host and nick in channels.irc).`,
@@ -361,14 +367,20 @@ export const ircPlugin: ChannelPlugin<ResolvedIrcAccount, IrcProbe> = {
ctx.log?.info(
`[${account.accountId}] starting IRC provider (${account.host}:${account.port}${account.tls ? " tls" : ""})`,
);
const { stop } = await monitorIrcProvider({
await runPassiveAccountLifecycle({
abortSignal: ctx.abortSignal,
start: async () =>
await monitorIrcProvider({
accountId: account.accountId,
config: ctx.cfg as CoreConfig,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
}),
stop: async (monitor) => {
monitor.stop();
},
});
return { stop };
},
},
};

View File

@@ -9,6 +9,7 @@ import {
applySetupAccountConfigPatch,
buildComputedAccountStatusSnapshot,
buildChannelConfigSchema,
createAccountStatusSink,
DEFAULT_ACCOUNT_ID,
deleteAccountFromConfigSection,
migrateBaseNameToDefaultAccount,
@@ -500,8 +501,11 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
gateway: {
startAccount: async (ctx) => {
const account = ctx.account;
ctx.setStatus({
accountId: account.accountId,
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
statusSink({
baseUrl: account.baseUrl,
botTokenSource: account.botTokenSource,
});
@@ -513,7 +517,7 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
config: ctx.cfg,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
});
},
},

View File

@@ -2,8 +2,10 @@ import {
buildAccountScopedDmSecurityPolicy,
collectAllowlistProviderGroupPolicyWarnings,
collectOpenGroupPolicyRouteAllowlistWarnings,
createAccountStatusSink,
formatAllowFromLowercase,
mapAllowFromEntries,
runPassiveAccountLifecycle,
} from "openclaw/plugin-sdk/compat";
import {
applyAccountNameToChannelSection,
@@ -15,7 +17,6 @@ import {
deleteAccountFromConfigSection,
normalizeAccountId,
setAccountEnabledInConfigSection,
waitForAbortSignal,
type ChannelPlugin,
type OpenClawConfig,
type ChannelSetupInput,
@@ -338,17 +339,25 @@ export const nextcloudTalkPlugin: ChannelPlugin<ResolvedNextcloudTalkAccount> =
ctx.log?.info(`[${account.accountId}] starting Nextcloud Talk webhook server`);
const { stop } = await monitorNextcloudTalkProvider({
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
await runPassiveAccountLifecycle({
abortSignal: ctx.abortSignal,
start: async () =>
await monitorNextcloudTalkProvider({
accountId: account.accountId,
config: ctx.cfg as CoreConfig,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
}),
stop: async (monitor) => {
monitor.stop();
},
});
// Keep webhook channels pending for the account lifecycle.
await waitForAbortSignal(ctx.abortSignal);
stop();
},
logoutAccount: async ({ accountId, cfg }) => {
const nextCfg = { ...cfg } as OpenClawConfig;

View File

@@ -1,8 +1,9 @@
import {
buildAccountScopedDmSecurityPolicy,
collectOpenProviderGroupPolicyWarnings,
buildOpenGroupPolicyRestrictSendersWarning,
buildOpenGroupPolicyWarning,
collectOpenProviderGroupPolicyWarnings,
createAccountStatusSink,
mapAllowFromEntries,
} from "openclaw/plugin-sdk/compat";
import type {
@@ -357,6 +358,10 @@ export const zaloPlugin: ChannelPlugin<ResolvedZaloAccount> = {
`[${account.accountId}] Zalo probe threw before provider start: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`,
);
}
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel} mode=${mode}`);
const { monitorZaloProvider } = await import("./monitor.js");
return monitorZaloProvider({
@@ -370,7 +375,7 @@ export const zaloPlugin: ChannelPlugin<ResolvedZaloAccount> = {
webhookSecret: normalizeSecretInputString(account.config.webhookSecret),
webhookPath: account.config.webhookPath,
fetcher,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
});
},
},

View File

@@ -1,5 +1,6 @@
import {
buildAccountScopedDmSecurityPolicy,
createAccountStatusSink,
mapAllowFromEntries,
} from "openclaw/plugin-sdk/compat";
import type {
@@ -682,6 +683,10 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
} catch {
// ignore probe errors
}
const statusSink = createAccountStatusSink({
accountId: ctx.accountId,
setStatus: ctx.setStatus,
});
ctx.log?.info(`[${account.accountId}] starting zalouser provider${userLabel}`);
const { monitorZalouserProvider } = await import("./monitor.js");
return monitorZalouserProvider({
@@ -689,7 +694,7 @@ export const zalouserPlugin: ChannelPlugin<ResolvedZalouserAccount> = {
config: ctx.cfg,
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }),
statusSink,
});
},
loginWithQrStart: async (params) => {

View File

@@ -1,6 +1,11 @@
import { EventEmitter } from "node:events";
import { describe, expect, it, vi } from "vitest";
import { keepHttpServerTaskAlive, waitUntilAbort } from "./channel-lifecycle.js";
import {
createAccountStatusSink,
keepHttpServerTaskAlive,
runPassiveAccountLifecycle,
waitUntilAbort,
} from "./channel-lifecycle.js";
type FakeServer = EventEmitter & {
close: (callback?: () => void) => void;
@@ -18,6 +23,22 @@ function createFakeServer(): FakeServer {
}
describe("plugin-sdk channel lifecycle helpers", () => {
it("binds account id onto status patches", () => {
const setStatus = vi.fn();
const statusSink = createAccountStatusSink({
accountId: "default",
setStatus,
});
statusSink({ running: true, lastStartAt: 123 });
expect(setStatus).toHaveBeenCalledWith({
accountId: "default",
running: true,
lastStartAt: 123,
});
});
it("resolves waitUntilAbort when signal aborts", async () => {
const abort = new AbortController();
const task = waitUntilAbort(abort.signal);
@@ -32,6 +53,40 @@ describe("plugin-sdk channel lifecycle helpers", () => {
await expect(task).resolves.toBeUndefined();
});
it("runs abort cleanup before resolving", async () => {
const abort = new AbortController();
const onAbort = vi.fn(async () => undefined);
const task = waitUntilAbort(abort.signal, onAbort);
abort.abort();
await expect(task).resolves.toBeUndefined();
expect(onAbort).toHaveBeenCalledOnce();
});
it("keeps passive account lifecycle pending until abort, then stops once", async () => {
const abort = new AbortController();
const stop = vi.fn();
const task = runPassiveAccountLifecycle({
abortSignal: abort.signal,
start: async () => ({ stop }),
stop: async (handle) => {
handle.stop();
},
});
const early = await Promise.race([
task.then(() => "resolved"),
new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 25)),
]);
expect(early).toBe("pending");
expect(stop).not.toHaveBeenCalled();
abort.abort();
await expect(task).resolves.toBeUndefined();
expect(stop).toHaveBeenCalledOnce();
});
it("keeps server task pending until close, then resolves", async () => {
const server = createFakeServer();
const task = keepHttpServerTaskAlive({ server });

View File

@@ -1,25 +1,66 @@
import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js";
type CloseAwareServer = {
once: (event: "close", listener: () => void) => unknown;
};
type PassiveAccountLifecycleParams<Handle> = {
abortSignal?: AbortSignal;
start: () => Promise<Handle>;
stop?: (handle: Handle) => void | Promise<void>;
onStop?: () => void | Promise<void>;
};
export function createAccountStatusSink(params: {
accountId: string;
setStatus: (next: ChannelAccountSnapshot) => void;
}): (patch: Omit<ChannelAccountSnapshot, "accountId">) => void {
return (patch) => {
params.setStatus({ accountId: params.accountId, ...patch });
};
}
/**
* Return a promise that resolves when the signal is aborted.
*
* If no signal is provided, the promise stays pending forever.
* If no signal is provided, the promise stays pending forever. When provided,
* `onAbort` runs once before the promise resolves.
*/
export function waitUntilAbort(signal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
export function waitUntilAbort(
signal?: AbortSignal,
onAbort?: () => void | Promise<void>,
): Promise<void> {
return new Promise<void>((resolve, reject) => {
const complete = () => {
Promise.resolve(onAbort?.()).then(() => resolve(), reject);
};
if (!signal) {
return;
}
if (signal.aborted) {
resolve();
complete();
return;
}
signal.addEventListener("abort", () => resolve(), { once: true });
signal.addEventListener("abort", complete, { once: true });
});
}
/**
* Keep a passive account task alive until abort, then run optional cleanup.
*/
export async function runPassiveAccountLifecycle<Handle>(
params: PassiveAccountLifecycleParams<Handle>,
): Promise<void> {
const handle = await params.start();
try {
await waitUntilAbort(params.abortSignal);
} finally {
await params.stop?.(handle);
await params.onStop?.();
}
}
/**
* Keep a channel/provider task pending until the HTTP server closes.
*

View File

@@ -20,6 +20,7 @@ export {
} from "../channels/plugins/directory-config-helpers.js";
export { buildComputedAccountStatusSnapshot } from "./status-helpers.js";
export { buildChannelConfigSchema } from "../channels/plugins/config-schema.js";
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js";
export { resolveGoogleChatGroupRequireMention } from "../channels/plugins/group-mentions.js";
export { formatPairingApproveHint } from "../channels/plugins/helpers.js";
export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js";

View File

@@ -173,7 +173,12 @@ export {
WEBHOOK_IN_FLIGHT_DEFAULTS,
} from "./webhook-request-guards.js";
export type { WebhookBodyReadProfile, WebhookInFlightLimiter } from "./webhook-request-guards.js";
export { keepHttpServerTaskAlive, waitUntilAbort } from "./channel-lifecycle.js";
export {
createAccountStatusSink,
keepHttpServerTaskAlive,
runPassiveAccountLifecycle,
waitUntilAbort,
} from "./channel-lifecycle.js";
export type { AgentMediaPayload } from "./agent-media-payload.js";
export { buildAgentMediaPayload } from "./agent-media-payload.js";
export {

View File

@@ -61,6 +61,7 @@ export type { PluginRuntime } from "../plugins/runtime/types.js";
export type { OpenClawPluginApi } from "../plugins/types.js";
export { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js";
export type { RuntimeEnv } from "../runtime.js";
export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js";
export {
readStoreAllowFromForDmPolicy,
resolveEffectiveAllowFromLists,

View File

@@ -41,6 +41,7 @@ export {
applySetupAccountConfigPatch,
migrateBaseNameToDefaultAccount,
} from "../channels/plugins/setup-helpers.js";
export { createAccountStatusSink } from "./channel-lifecycle.js";
export { buildComputedAccountStatusSnapshot } from "./status-helpers.js";
export { createAccountListHelpers } from "../channels/plugins/account-helpers.js";
export type {