From c9d0e345cb33e97bc68bc97f5ee5d413d9f1afee Mon Sep 17 00:00:00 2001 From: chilu18 Date: Mon, 23 Feb 2026 15:27:58 +0000 Subject: [PATCH] fix(msteams): keep monitor alive until shutdown --- .../msteams/src/monitor.lifecycle.test.ts | 190 ++++++++++++++++++ extensions/msteams/src/monitor.ts | 37 +++- 2 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 extensions/msteams/src/monitor.lifecycle.test.ts diff --git a/extensions/msteams/src/monitor.lifecycle.test.ts b/extensions/msteams/src/monitor.lifecycle.test.ts new file mode 100644 index 00000000000..40c69a71b36 --- /dev/null +++ b/extensions/msteams/src/monitor.lifecycle.test.ts @@ -0,0 +1,190 @@ +import { EventEmitter } from "node:events"; +import type { OpenClawConfig, RuntimeEnv } from "openclaw/plugin-sdk"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { MSTeamsConversationStore } from "./conversation-store.js"; +import type { MSTeamsPollStore } from "./polls.js"; + +type FakeServer = EventEmitter & { + close: (callback?: (err?: Error | null) => void) => void; +}; + +const expressControl = vi.hoisted(() => ({ + mode: { value: "listening" as "listening" | "error" }, +})); + +vi.mock("openclaw/plugin-sdk", () => ({ + DEFAULT_WEBHOOK_MAX_BODY_BYTES: 1024 * 1024, + mergeAllowlist: (params: { existing?: string[]; additions?: string[] }) => + Array.from(new Set([...(params.existing ?? []), ...(params.additions ?? [])])), + summarizeMapping: vi.fn(), +})); + +vi.mock("express", () => { + const json = vi.fn(() => { + return (_req: unknown, _res: unknown, next?: (err?: unknown) => void) => { + next?.(); + }; + }); + + const factory = () => ({ + use: vi.fn(), + post: vi.fn(), + listen: vi.fn((_port: number) => { + const server = new EventEmitter() as FakeServer; + server.close = (callback?: (err?: Error | null) => void) => { + queueMicrotask(() => { + server.emit("close"); + callback?.(null); + }); + }; + queueMicrotask(() => { + if (expressControl.mode.value === "error") { + server.emit("error", new Error("listen EADDRINUSE")); + return; + } + server.emit("listening"); + }); + return server; + }), + }); + + return { + default: factory, + json, + }; +}); + +const registerMSTeamsHandlers = vi.hoisted(() => + vi.fn(() => ({ + run: vi.fn(async () => {}), + })), +); +const createMSTeamsAdapter = vi.hoisted(() => + vi.fn(() => ({ + process: vi.fn(async () => {}), + })), +); +const loadMSTeamsSdkWithAuth = vi.hoisted(() => + vi.fn(async () => ({ + sdk: { + ActivityHandler: class {}, + MsalTokenProvider: class {}, + authorizeJWT: + () => (_req: unknown, _res: unknown, next: ((err?: unknown) => void) | undefined) => + next?.(), + }, + authConfig: {}, + })), +); + +vi.mock("./monitor-handler.js", () => ({ + registerMSTeamsHandlers: (...args: unknown[]) => registerMSTeamsHandlers(...args), +})); + +vi.mock("./resolve-allowlist.js", () => ({ + resolveMSTeamsChannelAllowlist: vi.fn(async () => []), + resolveMSTeamsUserAllowlist: vi.fn(async () => []), +})); + +vi.mock("./sdk.js", () => ({ + createMSTeamsAdapter: (...args: unknown[]) => createMSTeamsAdapter(...args), + loadMSTeamsSdkWithAuth: (...args: unknown[]) => loadMSTeamsSdkWithAuth(...args), +})); + +vi.mock("./runtime.js", () => ({ + getMSTeamsRuntime: () => ({ + logging: { + getChildLogger: () => ({ + info: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), + }, + channel: { + text: { + resolveTextChunkLimit: () => 4000, + }, + }, + }), +})); + +import { monitorMSTeamsProvider } from "./monitor.js"; + +function createConfig(port: number): OpenClawConfig { + return { + channels: { + msteams: { + enabled: true, + appId: "app-id", + appPassword: "app-password", + tenantId: "tenant-id", + webhook: { + port, + path: "/api/messages", + }, + }, + }, + } as OpenClawConfig; +} + +function createRuntime(): RuntimeEnv { + return { + log: vi.fn(), + error: vi.fn(), + exit: (code: number): never => { + throw new Error(`exit ${code}`); + }, + }; +} + +function createStores() { + return { + conversationStore: {} as MSTeamsConversationStore, + pollStore: {} as MSTeamsPollStore, + }; +} + +describe("monitorMSTeamsProvider lifecycle", () => { + afterEach(() => { + vi.clearAllMocks(); + expressControl.mode.value = "listening"; + }); + + it("stays active until aborted", async () => { + const abort = new AbortController(); + const stores = createStores(); + const task = monitorMSTeamsProvider({ + cfg: createConfig(0), + runtime: createRuntime(), + abortSignal: abort.signal, + conversationStore: stores.conversationStore, + pollStore: stores.pollStore, + }); + + const early = await Promise.race([ + task.then(() => "resolved"), + new Promise<"pending">((resolve) => setTimeout(() => resolve("pending"), 50)), + ]); + expect(early).toBe("pending"); + + abort.abort(); + await expect(task).resolves.toEqual( + expect.objectContaining({ + shutdown: expect.any(Function), + }), + ); + }); + + it("rejects startup when webhook port is already in use", async () => { + expressControl.mode.value = "error"; + await expect( + monitorMSTeamsProvider({ + cfg: createConfig(3978), + runtime: createRuntime(), + abortSignal: new AbortController().signal, + conversationStore: createStores().conversationStore, + pollStore: createStores().pollStore, + }), + ).rejects.toThrow(/EADDRINUSE/); + }); +}); diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index 02c9674c49e..eab22a890eb 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -273,9 +273,21 @@ export async function monitorMSTeamsProvider( fallback: "/api/messages", }); - // Start listening and capture the HTTP server handle - const httpServer = expressApp.listen(port, () => { - log.info(`msteams provider started on port ${port}`); + // Start listening and fail fast if bind/listen fails. + const httpServer = expressApp.listen(port); + await new Promise((resolve, reject) => { + const onListening = () => { + httpServer.off("error", onError); + log.info(`msteams provider started on port ${port}`); + resolve(); + }; + const onError = (err: unknown) => { + httpServer.off("listening", onListening); + log.error("msteams server error", { error: String(err) }); + reject(err); + }; + httpServer.once("listening", onListening); + httpServer.once("error", onError); }); httpServer.on("error", (err) => { @@ -295,11 +307,24 @@ export async function monitorMSTeamsProvider( }; // Handle abort signal + const onAbort = () => { + void shutdown(); + }; if (opts.abortSignal) { - opts.abortSignal.addEventListener("abort", () => { - void shutdown(); - }); + if (opts.abortSignal.aborted) { + onAbort(); + } else { + opts.abortSignal.addEventListener("abort", onAbort, { once: true }); + } } + // Keep this task alive until shutdown/close so gateway runtime does not treat startup as exit. + await new Promise((resolve) => { + httpServer.once("close", () => { + resolve(); + }); + }); + opts.abortSignal?.removeEventListener("abort", onAbort); + return { app: expressApp, shutdown }; }