From a455c0cc3d7fb27e273205fa873835fe23276976 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 10 Mar 2026 20:41:06 +0000 Subject: [PATCH] refactor: share passive account lifecycle helpers --- extensions/bluebubbles/src/channel.ts | 10 +++- extensions/googlechat/src/channel.ts | 50 ++++++++-------- extensions/irc/src/channel.startup.test.ts | 67 ++++++++++++++++++++++ extensions/irc/src/channel.ts | 24 ++++++-- extensions/mattermost/src/channel.ts | 10 +++- extensions/nextcloud-talk/src/channel.ts | 29 ++++++---- extensions/zalo/src/channel.ts | 9 ++- extensions/zalouser/src/channel.ts | 7 ++- src/plugin-sdk/channel-lifecycle.test.ts | 57 +++++++++++++++++- src/plugin-sdk/channel-lifecycle.ts | 51 ++++++++++++++-- src/plugin-sdk/googlechat.ts | 1 + src/plugin-sdk/index.ts | 7 ++- src/plugin-sdk/irc.ts | 1 + src/plugin-sdk/mattermost.ts | 1 + 14 files changed, 269 insertions(+), 55 deletions(-) create mode 100644 extensions/irc/src/channel.startup.test.ts diff --git a/extensions/bluebubbles/src/channel.ts b/extensions/bluebubbles/src/channel.ts index d0f076f6e84..747fba5b67b 100644 --- a/extensions/bluebubbles/src/channel.ts +++ b/extensions/bluebubbles/src/channel.ts @@ -21,6 +21,7 @@ import { import { buildAccountScopedDmSecurityPolicy, collectOpenGroupPolicyRestrictSendersWarnings, + createAccountStatusSink, formatNormalizedAllowFromEntries, mapAllowFromEntries, } from "openclaw/plugin-sdk/compat"; @@ -369,8 +370,11 @@ export const bluebubblesPlugin: ChannelPlugin = { startAccount: async (ctx) => { const account = ctx.account; const webhookPath = resolveWebhookPathFromConfig(account.config); - ctx.setStatus({ - accountId: account.accountId, + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, + }); + statusSink({ baseUrl: account.baseUrl, }); ctx.log?.info(`[${account.accountId}] starting provider (webhook=${webhookPath})`); @@ -379,7 +383,7 @@ export const bluebubblesPlugin: ChannelPlugin = { config: ctx.cfg, runtime: ctx.runtime, abortSignal: ctx.abortSignal, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + statusSink, webhookPath, }); }, diff --git a/extensions/googlechat/src/channel.ts b/extensions/googlechat/src/channel.ts index 4ba4d30eae4..47980f97d92 100644 --- a/extensions/googlechat/src/channel.ts +++ b/extensions/googlechat/src/channel.ts @@ -12,6 +12,7 @@ import { buildComputedAccountStatusSnapshot, buildChannelConfigSchema, DEFAULT_ACCOUNT_ID, + createAccountStatusSink, getChatChannelMeta, listDirectoryGroupEntriesFromMapKeys, listDirectoryUserEntriesFromAllowFrom, @@ -21,6 +22,7 @@ import { PAIRING_APPROVED_MESSAGE, resolveChannelMediaMaxBytes, resolveGoogleChatGroupRequireMention, + runPassiveAccountLifecycle, type ChannelDock, type ChannelMessageActionAdapter, type ChannelPlugin, @@ -509,37 +511,39 @@ export const googlechatPlugin: ChannelPlugin = { gateway: { startAccount: async (ctx) => { const account = ctx.account; - ctx.log?.info(`[${account.accountId}] starting Google Chat webhook`); - ctx.setStatus({ + const statusSink = createAccountStatusSink({ accountId: account.accountId, + setStatus: ctx.setStatus, + }); + ctx.log?.info(`[${account.accountId}] starting Google Chat webhook`); + statusSink({ running: true, lastStartAt: Date.now(), webhookPath: resolveGoogleChatWebhookPath({ account }), audienceType: account.config.audienceType, audience: account.config.audience, }); - const unregister = await startGoogleChatMonitor({ - account, - config: ctx.cfg, - runtime: ctx.runtime, + await runPassiveAccountLifecycle({ abortSignal: ctx.abortSignal, - webhookPath: account.config.webhookPath, - webhookUrl: account.config.webhookUrl, - statusSink: (patch) => ctx.setStatus({ accountId: account.accountId, ...patch }), - }); - // Keep the promise pending until abort (webhook mode is passive). - await new Promise((resolve) => { - if (ctx.abortSignal.aborted) { - resolve(); - return; - } - ctx.abortSignal.addEventListener("abort", () => resolve(), { once: true }); - }); - unregister?.(); - ctx.setStatus({ - accountId: account.accountId, - running: false, - lastStopAt: Date.now(), + start: async () => + await startGoogleChatMonitor({ + account, + config: ctx.cfg, + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + webhookPath: account.config.webhookPath, + webhookUrl: account.config.webhookUrl, + statusSink, + }), + stop: async (unregister) => { + unregister?.(); + }, + onStop: async () => { + statusSink({ + running: false, + lastStopAt: Date.now(), + }); + }, }); }, }, diff --git a/extensions/irc/src/channel.startup.test.ts b/extensions/irc/src/channel.startup.test.ts new file mode 100644 index 00000000000..ef972f64c0e --- /dev/null +++ b/extensions/irc/src/channel.startup.test.ts @@ -0,0 +1,67 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createStartAccountContext } from "../../test-utils/start-account-context.js"; +import type { ResolvedIrcAccount } from "./accounts.js"; + +const hoisted = vi.hoisted(() => ({ + monitorIrcProvider: vi.fn(), +})); + +vi.mock("./monitor.js", async () => { + const actual = await vi.importActual("./monitor.js"); + return { + ...actual, + monitorIrcProvider: hoisted.monitorIrcProvider, + }; +}); + +import { ircPlugin } from "./channel.js"; + +describe("ircPlugin gateway.startAccount", () => { + afterEach(() => { + vi.clearAllMocks(); + }); + + it("keeps startAccount pending until abort, then stops the monitor", async () => { + const stop = vi.fn(); + hoisted.monitorIrcProvider.mockResolvedValue({ stop }); + + const account: ResolvedIrcAccount = { + accountId: "default", + enabled: true, + name: "default", + configured: true, + host: "irc.example.com", + port: 6697, + tls: true, + nick: "openclaw", + username: "openclaw", + realname: "OpenClaw", + password: "", + passwordSource: "none", + config: {} as ResolvedIrcAccount["config"], + }; + + const abort = new AbortController(); + const task = ircPlugin.gateway!.startAccount!( + createStartAccountContext({ + account, + abortSignal: abort.signal, + }), + ); + let settled = false; + void task.then(() => { + settled = true; + }); + + await vi.waitFor(() => { + expect(hoisted.monitorIrcProvider).toHaveBeenCalledOnce(); + }); + expect(settled).toBe(false); + expect(stop).not.toHaveBeenCalled(); + + abort.abort(); + await task; + + expect(stop).toHaveBeenCalledOnce(); + }); +}); diff --git a/extensions/irc/src/channel.ts b/extensions/irc/src/channel.ts index 03d86da4c54..c598a9a0ef3 100644 --- a/extensions/irc/src/channel.ts +++ b/extensions/irc/src/channel.ts @@ -9,10 +9,12 @@ import { buildBaseAccountStatusSnapshot, buildBaseChannelStatusSummary, buildChannelConfigSchema, + createAccountStatusSink, DEFAULT_ACCOUNT_ID, deleteAccountFromConfigSection, getChatChannelMeta, PAIRING_APPROVED_MESSAGE, + runPassiveAccountLifecycle, setAccountEnabledInConfigSection, type ChannelPlugin, } from "openclaw/plugin-sdk/irc"; @@ -353,6 +355,10 @@ export const ircPlugin: ChannelPlugin = { gateway: { startAccount: async (ctx) => { const account = ctx.account; + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, + }); if (!account.configured) { throw new Error( `IRC is not configured for account "${account.accountId}" (need host and nick in channels.irc).`, @@ -361,14 +367,20 @@ export const ircPlugin: ChannelPlugin = { ctx.log?.info( `[${account.accountId}] starting IRC provider (${account.host}:${account.port}${account.tls ? " tls" : ""})`, ); - const { stop } = await monitorIrcProvider({ - accountId: account.accountId, - config: ctx.cfg as CoreConfig, - runtime: ctx.runtime, + await runPassiveAccountLifecycle({ abortSignal: ctx.abortSignal, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + start: async () => + await monitorIrcProvider({ + accountId: account.accountId, + config: ctx.cfg as CoreConfig, + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + statusSink, + }), + stop: async (monitor) => { + monitor.stop(); + }, }); - return { stop }; }, }, }; diff --git a/extensions/mattermost/src/channel.ts b/extensions/mattermost/src/channel.ts index b62231ac997..2dffaa6f3cf 100644 --- a/extensions/mattermost/src/channel.ts +++ b/extensions/mattermost/src/channel.ts @@ -9,6 +9,7 @@ import { applySetupAccountConfigPatch, buildComputedAccountStatusSnapshot, buildChannelConfigSchema, + createAccountStatusSink, DEFAULT_ACCOUNT_ID, deleteAccountFromConfigSection, migrateBaseNameToDefaultAccount, @@ -500,8 +501,11 @@ export const mattermostPlugin: ChannelPlugin = { gateway: { startAccount: async (ctx) => { const account = ctx.account; - ctx.setStatus({ - accountId: account.accountId, + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, + }); + statusSink({ baseUrl: account.baseUrl, botTokenSource: account.botTokenSource, }); @@ -513,7 +517,7 @@ export const mattermostPlugin: ChannelPlugin = { config: ctx.cfg, runtime: ctx.runtime, abortSignal: ctx.abortSignal, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + statusSink, }); }, }, diff --git a/extensions/nextcloud-talk/src/channel.ts b/extensions/nextcloud-talk/src/channel.ts index 6fdf36e9f8c..8a908b7e0ac 100644 --- a/extensions/nextcloud-talk/src/channel.ts +++ b/extensions/nextcloud-talk/src/channel.ts @@ -2,8 +2,10 @@ import { buildAccountScopedDmSecurityPolicy, collectAllowlistProviderGroupPolicyWarnings, collectOpenGroupPolicyRouteAllowlistWarnings, + createAccountStatusSink, formatAllowFromLowercase, mapAllowFromEntries, + runPassiveAccountLifecycle, } from "openclaw/plugin-sdk/compat"; import { applyAccountNameToChannelSection, @@ -15,7 +17,6 @@ import { deleteAccountFromConfigSection, normalizeAccountId, setAccountEnabledInConfigSection, - waitForAbortSignal, type ChannelPlugin, type OpenClawConfig, type ChannelSetupInput, @@ -338,17 +339,25 @@ export const nextcloudTalkPlugin: ChannelPlugin = ctx.log?.info(`[${account.accountId}] starting Nextcloud Talk webhook server`); - const { stop } = await monitorNextcloudTalkProvider({ - accountId: account.accountId, - config: ctx.cfg as CoreConfig, - runtime: ctx.runtime, - abortSignal: ctx.abortSignal, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, }); - // Keep webhook channels pending for the account lifecycle. - await waitForAbortSignal(ctx.abortSignal); - stop(); + await runPassiveAccountLifecycle({ + abortSignal: ctx.abortSignal, + start: async () => + await monitorNextcloudTalkProvider({ + accountId: account.accountId, + config: ctx.cfg as CoreConfig, + runtime: ctx.runtime, + abortSignal: ctx.abortSignal, + statusSink, + }), + stop: async (monitor) => { + monitor.stop(); + }, + }); }, logoutAccount: async ({ accountId, cfg }) => { const nextCfg = { ...cfg } as OpenClawConfig; diff --git a/extensions/zalo/src/channel.ts b/extensions/zalo/src/channel.ts index e4671bb90c1..b374ecfbd63 100644 --- a/extensions/zalo/src/channel.ts +++ b/extensions/zalo/src/channel.ts @@ -1,8 +1,9 @@ import { buildAccountScopedDmSecurityPolicy, - collectOpenProviderGroupPolicyWarnings, buildOpenGroupPolicyRestrictSendersWarning, buildOpenGroupPolicyWarning, + collectOpenProviderGroupPolicyWarnings, + createAccountStatusSink, mapAllowFromEntries, } from "openclaw/plugin-sdk/compat"; import type { @@ -357,6 +358,10 @@ export const zaloPlugin: ChannelPlugin = { `[${account.accountId}] Zalo probe threw before provider start: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`, ); } + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, + }); ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel} mode=${mode}`); const { monitorZaloProvider } = await import("./monitor.js"); return monitorZaloProvider({ @@ -370,7 +375,7 @@ export const zaloPlugin: ChannelPlugin = { webhookSecret: normalizeSecretInputString(account.config.webhookSecret), webhookPath: account.config.webhookPath, fetcher, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + statusSink, }); }, }, diff --git a/extensions/zalouser/src/channel.ts b/extensions/zalouser/src/channel.ts index e01775d0dbb..2091124be6e 100644 --- a/extensions/zalouser/src/channel.ts +++ b/extensions/zalouser/src/channel.ts @@ -1,5 +1,6 @@ import { buildAccountScopedDmSecurityPolicy, + createAccountStatusSink, mapAllowFromEntries, } from "openclaw/plugin-sdk/compat"; import type { @@ -682,6 +683,10 @@ export const zalouserPlugin: ChannelPlugin = { } catch { // ignore probe errors } + const statusSink = createAccountStatusSink({ + accountId: ctx.accountId, + setStatus: ctx.setStatus, + }); ctx.log?.info(`[${account.accountId}] starting zalouser provider${userLabel}`); const { monitorZalouserProvider } = await import("./monitor.js"); return monitorZalouserProvider({ @@ -689,7 +694,7 @@ export const zalouserPlugin: ChannelPlugin = { config: ctx.cfg, runtime: ctx.runtime, abortSignal: ctx.abortSignal, - statusSink: (patch) => ctx.setStatus({ accountId: ctx.accountId, ...patch }), + statusSink, }); }, loginWithQrStart: async (params) => { diff --git a/src/plugin-sdk/channel-lifecycle.test.ts b/src/plugin-sdk/channel-lifecycle.test.ts index 020510c914a..6295a5aedf9 100644 --- a/src/plugin-sdk/channel-lifecycle.test.ts +++ b/src/plugin-sdk/channel-lifecycle.test.ts @@ -1,6 +1,11 @@ import { EventEmitter } from "node:events"; import { describe, expect, it, vi } from "vitest"; -import { keepHttpServerTaskAlive, waitUntilAbort } from "./channel-lifecycle.js"; +import { + createAccountStatusSink, + keepHttpServerTaskAlive, + runPassiveAccountLifecycle, + waitUntilAbort, +} from "./channel-lifecycle.js"; type FakeServer = EventEmitter & { close: (callback?: () => void) => void; @@ -18,6 +23,22 @@ function createFakeServer(): FakeServer { } describe("plugin-sdk channel lifecycle helpers", () => { + it("binds account id onto status patches", () => { + const setStatus = vi.fn(); + const statusSink = createAccountStatusSink({ + accountId: "default", + setStatus, + }); + + statusSink({ running: true, lastStartAt: 123 }); + + expect(setStatus).toHaveBeenCalledWith({ + accountId: "default", + running: true, + lastStartAt: 123, + }); + }); + it("resolves waitUntilAbort when signal aborts", async () => { const abort = new AbortController(); const task = waitUntilAbort(abort.signal); @@ -32,6 +53,40 @@ describe("plugin-sdk channel lifecycle helpers", () => { await expect(task).resolves.toBeUndefined(); }); + it("runs abort cleanup before resolving", async () => { + const abort = new AbortController(); + const onAbort = vi.fn(async () => undefined); + + const task = waitUntilAbort(abort.signal, onAbort); + abort.abort(); + + await expect(task).resolves.toBeUndefined(); + expect(onAbort).toHaveBeenCalledOnce(); + }); + + it("keeps passive account lifecycle pending until abort, then stops once", async () => { + const abort = new AbortController(); + const stop = vi.fn(); + const task = runPassiveAccountLifecycle({ + abortSignal: abort.signal, + start: async () => ({ stop }), + stop: async (handle) => { + handle.stop(); + }, + }); + + const early = await Promise.race([ + task.then(() => "resolved"), + new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 25)), + ]); + expect(early).toBe("pending"); + expect(stop).not.toHaveBeenCalled(); + + abort.abort(); + await expect(task).resolves.toBeUndefined(); + expect(stop).toHaveBeenCalledOnce(); + }); + it("keeps server task pending until close, then resolves", async () => { const server = createFakeServer(); const task = keepHttpServerTaskAlive({ server }); diff --git a/src/plugin-sdk/channel-lifecycle.ts b/src/plugin-sdk/channel-lifecycle.ts index 4687e167352..7d4fea578d5 100644 --- a/src/plugin-sdk/channel-lifecycle.ts +++ b/src/plugin-sdk/channel-lifecycle.ts @@ -1,25 +1,66 @@ +import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js"; + type CloseAwareServer = { once: (event: "close", listener: () => void) => unknown; }; +type PassiveAccountLifecycleParams = { + abortSignal?: AbortSignal; + start: () => Promise; + stop?: (handle: Handle) => void | Promise; + onStop?: () => void | Promise; +}; + +export function createAccountStatusSink(params: { + accountId: string; + setStatus: (next: ChannelAccountSnapshot) => void; +}): (patch: Omit) => void { + return (patch) => { + params.setStatus({ accountId: params.accountId, ...patch }); + }; +} + /** * Return a promise that resolves when the signal is aborted. * - * If no signal is provided, the promise stays pending forever. + * If no signal is provided, the promise stays pending forever. When provided, + * `onAbort` runs once before the promise resolves. */ -export function waitUntilAbort(signal?: AbortSignal): Promise { - return new Promise((resolve) => { +export function waitUntilAbort( + signal?: AbortSignal, + onAbort?: () => void | Promise, +): Promise { + return new Promise((resolve, reject) => { + const complete = () => { + Promise.resolve(onAbort?.()).then(() => resolve(), reject); + }; if (!signal) { return; } if (signal.aborted) { - resolve(); + complete(); return; } - signal.addEventListener("abort", () => resolve(), { once: true }); + signal.addEventListener("abort", complete, { once: true }); }); } +/** + * Keep a passive account task alive until abort, then run optional cleanup. + */ +export async function runPassiveAccountLifecycle( + params: PassiveAccountLifecycleParams, +): Promise { + const handle = await params.start(); + + try { + await waitUntilAbort(params.abortSignal); + } finally { + await params.stop?.(handle); + await params.onStop?.(); + } +} + /** * Keep a channel/provider task pending until the HTTP server closes. * diff --git a/src/plugin-sdk/googlechat.ts b/src/plugin-sdk/googlechat.ts index 38d1594406a..17bc36daab1 100644 --- a/src/plugin-sdk/googlechat.ts +++ b/src/plugin-sdk/googlechat.ts @@ -20,6 +20,7 @@ export { } from "../channels/plugins/directory-config-helpers.js"; export { buildComputedAccountStatusSnapshot } from "./status-helpers.js"; export { buildChannelConfigSchema } from "../channels/plugins/config-schema.js"; +export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js"; export { resolveGoogleChatGroupRequireMention } from "../channels/plugins/group-mentions.js"; export { formatPairingApproveHint } from "../channels/plugins/helpers.js"; export { resolveChannelMediaMaxBytes } from "../channels/plugins/media-limits.js"; diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index b08d4ed1e85..d16f077c13f 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -173,7 +173,12 @@ export { WEBHOOK_IN_FLIGHT_DEFAULTS, } from "./webhook-request-guards.js"; export type { WebhookBodyReadProfile, WebhookInFlightLimiter } from "./webhook-request-guards.js"; -export { keepHttpServerTaskAlive, waitUntilAbort } from "./channel-lifecycle.js"; +export { + createAccountStatusSink, + keepHttpServerTaskAlive, + runPassiveAccountLifecycle, + waitUntilAbort, +} from "./channel-lifecycle.js"; export type { AgentMediaPayload } from "./agent-media-payload.js"; export { buildAgentMediaPayload } from "./agent-media-payload.js"; export { diff --git a/src/plugin-sdk/irc.ts b/src/plugin-sdk/irc.ts index 51aac8407e0..7b2e6d07c8a 100644 --- a/src/plugin-sdk/irc.ts +++ b/src/plugin-sdk/irc.ts @@ -61,6 +61,7 @@ export type { PluginRuntime } from "../plugins/runtime/types.js"; export type { OpenClawPluginApi } from "../plugins/types.js"; export { DEFAULT_ACCOUNT_ID } from "../routing/session-key.js"; export type { RuntimeEnv } from "../runtime.js"; +export { createAccountStatusSink, runPassiveAccountLifecycle } from "./channel-lifecycle.js"; export { readStoreAllowFromForDmPolicy, resolveEffectiveAllowFromLists, diff --git a/src/plugin-sdk/mattermost.ts b/src/plugin-sdk/mattermost.ts index fb77580359b..ac4c8a9b437 100644 --- a/src/plugin-sdk/mattermost.ts +++ b/src/plugin-sdk/mattermost.ts @@ -41,6 +41,7 @@ export { applySetupAccountConfigPatch, migrateBaseNameToDefaultAccount, } from "../channels/plugins/setup-helpers.js"; +export { createAccountStatusSink } from "./channel-lifecycle.js"; export { buildComputedAccountStatusSnapshot } from "./status-helpers.js"; export { createAccountListHelpers } from "../channels/plugins/account-helpers.js"; export type {