diff --git a/extensions/line/src/monitor.lifecycle.test.ts b/extensions/line/src/monitor.lifecycle.test.ts index ac3a55528a2..bcbdef2efca 100644 --- a/extensions/line/src/monitor.lifecycle.test.ts +++ b/extensions/line/src/monitor.lifecycle.test.ts @@ -1,15 +1,18 @@ +import crypto from "node:crypto"; +import { EventEmitter } from "node:events"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { WEBHOOK_IN_FLIGHT_DEFAULTS } from "openclaw/plugin-sdk/webhook-request-guards"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { createMockIncomingRequest } from "openclaw/plugin-sdk/test-env"; type LineNodeWebhookHandler = (req: IncomingMessage, res: ServerResponse) => Promise; const { createLineBotMock, createLineNodeWebhookHandlerMock, - registerPluginHttpRouteMock, + registerWebhookTargetWithPluginRouteMock, unregisterHttpMock, } = vi.hoisted(() => ({ createLineBotMock: vi.fn(() => ({ @@ -19,7 +22,7 @@ const { createLineNodeWebhookHandlerMock: vi.fn<() => LineNodeWebhookHandler>(() => vi.fn(async () => {}), ), - registerPluginHttpRouteMock: vi.fn(), + registerWebhookTargetWithPluginRouteMock: vi.fn(), unregisterHttpMock: vi.fn(), })); @@ -53,14 +56,24 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({ createChannelReplyPipeline: vi.fn(() => ({})), })); -vi.mock("openclaw/plugin-sdk/webhook-ingress", () => ({ - normalizePluginHttpPath: (_path: string | undefined, fallback: string) => fallback, - registerPluginHttpRoute: registerPluginHttpRouteMock, -})); +vi.mock("openclaw/plugin-sdk/webhook-ingress", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/webhook-ingress", + ); + return { + ...actual, + normalizePluginHttpPath: (path: string | undefined, fallback: string) => path ?? fallback, + registerWebhookTargetWithPluginRoute: registerWebhookTargetWithPluginRouteMock, + }; +}); -vi.mock("./webhook-node.js", () => ({ - createLineNodeWebhookHandler: createLineNodeWebhookHandlerMock, -})); +vi.mock("./webhook-node.js", async () => { + const actual = await vi.importActual("./webhook-node.js"); + return { + ...actual, + createLineNodeWebhookHandler: createLineNodeWebhookHandlerMock, + }; +}); vi.mock("./auto-reply-delivery.js", () => ({ deliverLineAutoReply: vi.fn(), @@ -101,16 +114,37 @@ describe("monitorLineProvider lifecycle", () => { beforeEach(() => { clearLineRuntimeStateForTests(); createLineBotMock.mockReset(); - createLineBotMock.mockReturnValue({ + createLineBotMock.mockImplementation(() => ({ account: { accountId: "default" }, handleWebhook: vi.fn(), - }); + })); innerLineWebhookHandlerMock = vi.fn(async () => {}); createLineNodeWebhookHandlerMock .mockReset() .mockImplementation(() => innerLineWebhookHandlerMock); unregisterHttpMock.mockReset(); - registerPluginHttpRouteMock.mockReset().mockReturnValue(unregisterHttpMock); + registerWebhookTargetWithPluginRouteMock.mockReset().mockImplementation((params) => { + const key = params.target.path.startsWith("/") + ? params.target.path + : `/${params.target.path}`; + const normalizedTarget = { ...params.target, path: key }; + const existing = params.targetsByPath.get(key) ?? []; + params.targetsByPath.set(key, [...existing, normalizedTarget]); + return { + target: normalizedTarget, + unregister: () => { + unregisterHttpMock(); + const updated = (params.targetsByPath.get(key) ?? []).filter( + (entry: unknown) => entry !== normalizedTarget, + ); + if (updated.length > 0) { + params.targetsByPath.set(key, updated); + } else { + params.targetsByPath.delete(key); + } + }, + }; + }); }); const createRouteResponse = () => { @@ -140,9 +174,9 @@ describe("monitorLineProvider lifecycle", () => { return monitor; }); - expect(registerPluginHttpRouteMock).toHaveBeenCalledTimes(1); - expect(registerPluginHttpRouteMock).toHaveBeenCalledWith( - expect.objectContaining({ auth: "plugin" }), + expect(registerWebhookTargetWithPluginRouteMock).toHaveBeenCalledTimes(1); + expect(registerWebhookTargetWithPluginRouteMock).toHaveBeenCalledWith( + expect.objectContaining({ route: expect.objectContaining({ auth: "plugin" }) }), ); expect(resolved).toBe(false); @@ -151,6 +185,31 @@ describe("monitorLineProvider lifecycle", () => { expect(unregisterHttpMock).toHaveBeenCalledTimes(1); }); + it("registers an account target without replacing existing route ownership", async () => { + const monitor = await monitorLineProvider({ + channelAccessToken: "token", + channelSecret: "secret", // pragma: allowlist secret + accountId: "work", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + }); + + const registration = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]; + expect(registration).toEqual( + expect.objectContaining({ + target: expect.objectContaining({ accountId: "work", path: "/line/webhook" }), + route: expect.objectContaining({ + accountId: "work", + auth: "plugin", + pluginId: "line", + }), + }), + ); + expect(registration?.route).not.toHaveProperty("path"); + expect(registration?.route).not.toHaveProperty("replaceExisting"); + monitor.stop(); + }); + it("stops immediately when signal is already aborted", async () => { const abort = new AbortController(); abort.abort(); @@ -210,26 +269,100 @@ describe("monitorLineProvider lifecycle", () => { monitor.stop(); }); - it("rejects webhook requests above the shared in-flight limit before body handling", async () => { - const limit = WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey; - const releaseRequests: Array<() => void> = []; - let reachLimit!: () => void; - const reachedLimit = new Promise((resolve) => { - reachLimit = resolve; + it("dispatches shared-path webhook posts to the account matching the signature", async () => { + const firstMonitor = await monitorLineProvider({ + channelAccessToken: "first-token", + channelSecret: "first-secret", // pragma: allowlist secret + accountId: "first", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + }); + const secondMonitor = await monitorLineProvider({ + channelAccessToken: "second-token", + channelSecret: "second-secret", // pragma: allowlist secret + accountId: "second", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, }); - innerLineWebhookHandlerMock.mockImplementation( - async (_req: IncomingMessage, res: ServerResponse) => { - if (releaseRequests.length === limit - 1) { - reachLimit(); - } - await new Promise((resolve) => { - releaseRequests.push(resolve); - }); - res.statusCode = 200; - res.end(); - }, - ); + const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as + | { handler: (req: IncomingMessage, res: ServerResponse) => Promise } + | undefined; + expect(route).toBeDefined(); + + const payload = JSON.stringify({ events: [{ type: "message" }] }); + const signature = crypto.createHmac("SHA256", "second-secret").update(payload).digest("base64"); + const req = Object.assign(createMockIncomingRequest([payload]), { + method: "POST", + headers: { "x-line-signature": signature }, + }) as unknown as IncomingMessage; + const res = createRouteResponse(); + + await route!.handler(req, res); + + const firstBot = createLineBotMock.mock.results[0]?.value as { + handleWebhook: ReturnType; + }; + const secondBot = createLineBotMock.mock.results[1]?.value as { + handleWebhook: ReturnType; + }; + expect(res.statusCode).toBe(200); + expect(firstBot.handleWebhook).not.toHaveBeenCalled(); + expect(secondBot.handleWebhook).toHaveBeenCalledTimes(1); + + firstMonitor.stop(); + secondMonitor.stop(); + }); + + it("rejects ambiguous shared-path webhook signatures", async () => { + const firstMonitor = await monitorLineProvider({ + channelAccessToken: "first-token", + channelSecret: "shared-secret", // pragma: allowlist secret + accountId: "first", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + }); + const secondMonitor = await monitorLineProvider({ + channelAccessToken: "second-token", + channelSecret: "shared-secret", // pragma: allowlist secret + accountId: "second", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + }); + + const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as + | { handler: (req: IncomingMessage, res: ServerResponse) => Promise } + | undefined; + expect(route).toBeDefined(); + + const payload = JSON.stringify({ events: [{ type: "message" }] }); + const signature = crypto.createHmac("SHA256", "shared-secret").update(payload).digest("base64"); + const req = Object.assign(createMockIncomingRequest([payload]), { + method: "POST", + headers: { "x-line-signature": signature }, + }) as unknown as IncomingMessage; + const res = createRouteResponse(); + + await route!.handler(req, res); + + const firstBot = createLineBotMock.mock.results[0]?.value as { + handleWebhook: ReturnType; + }; + const secondBot = createLineBotMock.mock.results[1]?.value as { + handleWebhook: ReturnType; + }; + expect(res.statusCode).toBe(401); + expect(res.end).toHaveBeenCalledWith(JSON.stringify({ error: "Ambiguous webhook target" })); + expect(firstBot.handleWebhook).not.toHaveBeenCalled(); + expect(secondBot.handleWebhook).not.toHaveBeenCalled(); + + firstMonitor.stop(); + secondMonitor.stop(); + }); + + it("rejects webhook requests above the shared in-flight limit before body handling", async () => { + const limit = WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey; + const heldRequests: Array void }> = []; const monitor = await monitorLineProvider({ channelAccessToken: "token", @@ -238,30 +371,51 @@ describe("monitorLineProvider lifecycle", () => { runtime: {} as RuntimeEnv, }); - const route = registerPluginHttpRouteMock.mock.calls[0]?.[0] as + const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as | { handler: (req: IncomingMessage, res: ServerResponse) => Promise } | undefined; expect(route).toBeDefined(); - const createPostRequest = () => - ({ + const createHeldPostRequest = () => { + const req = Object.assign(new EventEmitter(), { + destroyed: false, + destroy(this: EventEmitter & { destroyed: boolean }) { + this.destroyed = true; + this.emit("close"); + }, + }); + heldRequests.push(req); + return Object.assign(req, { method: "POST", - headers: {}, - }) as IncomingMessage; + headers: { "x-line-signature": "pending" }, + }) as unknown as IncomingMessage; + }; + const createSignedPostRequest = () => { + const payload = JSON.stringify({ events: [{ type: "message" }] }); + const signature = crypto.createHmac("SHA256", "secret").update(payload).digest("base64"); + const req = createMockIncomingRequest([payload]); + return Object.assign(req, { + method: "POST", + headers: { "x-line-signature": signature }, + }) as unknown as IncomingMessage; + }; const firstRequests = Array.from({ length: limit }, () => - route!.handler(createPostRequest(), createRouteResponse()), + route!.handler(createHeldPostRequest(), createRouteResponse()), ); - await reachedLimit; + await new Promise((resolve) => setImmediate(resolve)); const overflowResponse = createRouteResponse(); - await route!.handler(createPostRequest(), overflowResponse); + await route!.handler(createSignedPostRequest(), overflowResponse); - expect(innerLineWebhookHandlerMock).toHaveBeenCalledTimes(limit); + const bot = createLineBotMock.mock.results[0]?.value as { + handleWebhook: ReturnType; + }; + expect(bot.handleWebhook).not.toHaveBeenCalled(); expect(overflowResponse.statusCode).toBe(429); expect(overflowResponse.end).toHaveBeenCalledWith("Too Many Requests"); - releaseRequests.splice(0).forEach((release) => release()); - await Promise.all(firstRequests); + heldRequests.splice(0).forEach((req) => req.destroy()); + await Promise.allSettled(firstRequests); monitor.stop(); }); }); diff --git a/extensions/line/src/monitor.ts b/extensions/line/src/monitor.ts index 7f4171d480d..6e63025e4af 100644 --- a/extensions/line/src/monitor.ts +++ b/extensions/line/src/monitor.ts @@ -12,8 +12,11 @@ import { type RuntimeEnv, } from "openclaw/plugin-sdk/runtime-env"; import { + isRequestBodyLimitError, normalizePluginHttpPath, - registerPluginHttpRoute, + registerWebhookTargetWithPluginRoute, + requestBodyErrorToText, + resolveSingleWebhookTarget, } from "openclaw/plugin-sdk/webhook-ingress"; import { beginWebhookRequestPipelineOrReject, @@ -39,7 +42,8 @@ import { } from "./send.js"; import { buildTemplateMessageFromPayload } from "./template-messages.js"; import type { LineChannelData, ResolvedLineAccount } from "./types.js"; -import { createLineNodeWebhookHandler } from "./webhook-node.js"; +import { createLineNodeWebhookHandler, readLineWebhookRequestBody } from "./webhook-node.js"; +import { parseLineWebhookBody, validateLineSignature } from "./webhook-utils.js"; export interface MonitorLineProviderOptions { channelAccessToken: string; @@ -70,6 +74,18 @@ const runtimeState = new Map< } >(); const lineWebhookInFlightLimiter = createWebhookInFlightLimiter(); +const LINE_WEBHOOK_PREAUTH_MAX_BODY_BYTES = 64 * 1024; +const LINE_WEBHOOK_PREAUTH_BODY_TIMEOUT_MS = 5_000; + +type LineWebhookTarget = { + accountId: string; + bot: ReturnType; + channelSecret: string; + path: string; + runtime: RuntimeEnv; +}; + +const lineWebhookTargets = new Map(); function recordChannelRuntimeState(params: { channel: string; @@ -303,41 +319,130 @@ export async function monitorLineProvider( }); const normalizedPath = normalizePluginHttpPath(webhookPath, "/line/webhook") ?? "/line/webhook"; - const createScopedLineWebhookHandler = (onRequestAuthenticated?: () => void) => + const createScopedLineWebhookHandler = (target: LineWebhookTarget) => createLineNodeWebhookHandler({ - channelSecret: secret, - bot, - runtime, - onRequestAuthenticated, + channelSecret: target.channelSecret, + bot: target.bot, + runtime: target.runtime, }); - const unregisterHttp = registerPluginHttpRoute({ - path: normalizedPath, - auth: "plugin", - replaceExisting: true, - pluginId: "line", - accountId: resolvedAccountId, - log: (msg) => logVerbose(msg), - handler: async (req, res) => { - if (req.method !== "POST") { - await createScopedLineWebhookHandler()(req, res); - return; - } + const { unregister: unregisterHttp } = registerWebhookTargetWithPluginRoute({ + targetsByPath: lineWebhookTargets, + target: { + accountId: resolvedAccountId, + bot, + channelSecret: secret, + path: normalizedPath, + runtime, + }, + route: { + auth: "plugin", + pluginId: "line", + accountId: resolvedAccountId, + log: (msg) => logVerbose(msg), + handler: async (req, res) => { + const targets = lineWebhookTargets.get(normalizedPath) ?? []; + const firstTarget = targets[0]; + if (req.method !== "POST") { + if (!firstTarget) { + res.statusCode = 404; + res.end("Not Found"); + return; + } + await createScopedLineWebhookHandler(firstTarget)(req, res); + return; + } - const requestLifecycle = beginWebhookRequestPipelineOrReject({ - req, - res, - inFlightLimiter: lineWebhookInFlightLimiter, - inFlightKey: `line:${resolvedAccountId}`, - }); - if (!requestLifecycle.ok) { - return; - } + const requestLifecycle = beginWebhookRequestPipelineOrReject({ + req, + res, + inFlightLimiter: lineWebhookInFlightLimiter, + inFlightKey: `line:${normalizedPath}`, + }); + if (!requestLifecycle.ok) { + return; + } - try { - await createScopedLineWebhookHandler(requestLifecycle.release)(req, res); - } finally { - requestLifecycle.release(); - } + try { + const signatureHeader = req.headers["x-line-signature"]; + const signature = + typeof signatureHeader === "string" + ? signatureHeader.trim() + : Array.isArray(signatureHeader) + ? (signatureHeader[0] ?? "").trim() + : ""; + + if (!signature) { + logVerbose("line: webhook missing X-Line-Signature header"); + res.statusCode = 400; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Missing X-Line-Signature header" })); + return; + } + + const rawBody = await readLineWebhookRequestBody( + req, + LINE_WEBHOOK_PREAUTH_MAX_BODY_BYTES, + LINE_WEBHOOK_PREAUTH_BODY_TIMEOUT_MS, + ); + const match = resolveSingleWebhookTarget(targets, (target) => + validateLineSignature(rawBody, signature, target.channelSecret), + ); + if (match.kind === "none") { + logVerbose("line: webhook signature validation failed"); + res.statusCode = 401; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Invalid signature" })); + return; + } + if (match.kind === "ambiguous") { + logVerbose("line: webhook signature matched multiple accounts"); + res.statusCode = 401; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Ambiguous webhook target" })); + return; + } + + const body = parseLineWebhookBody(rawBody); + if (!body) { + res.statusCode = 400; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Invalid webhook payload" })); + return; + } + + requestLifecycle.release(); + + if (body.events && body.events.length > 0) { + logVerbose(`line: received ${body.events.length} webhook events`); + await match.target.bot.handleWebhook(body); + } + + res.statusCode = 200; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ status: "ok" })); + } catch (err) { + if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { + res.statusCode = 413; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Payload too large" })); + return; + } + if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { + res.statusCode = 408; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") })); + return; + } + runtime.error?.(danger(`line webhook error: ${String(err)}`)); + if (!res.headersSent) { + res.statusCode = 500; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Internal server error" })); + } + } finally { + requestLifecycle.release(); + } + }, }, }); diff --git a/extensions/zalo/src/monitor.polling.media-reply.test.ts b/extensions/zalo/src/monitor.polling.media-reply.test.ts index 982009b163b..9fe4b91fc11 100644 --- a/extensions/zalo/src/monitor.polling.media-reply.test.ts +++ b/extensions/zalo/src/monitor.polling.media-reply.test.ts @@ -1,8 +1,12 @@ +import { chmod, mkdir, writeFile } from "node:fs/promises"; +import type { ServerResponse } from "node:http"; +import { join } from "node:path"; import { createEmptyPluginRegistry, createRuntimeEnv, setActivePluginRegistry, } from "openclaw/plugin-sdk/plugin-test-runtime"; +import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { afterAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { PluginRuntime } from "../runtime-api.js"; import { @@ -28,6 +32,53 @@ vi.mock("./outbound-media.js", async () => { }; }); +import { clearHostedZaloMediaForTest } from "./outbound-media.js"; + +const ZALO_OUTBOUND_MEDIA_DIR = join( + resolvePreferredOpenClawTmpDir(), + "openclaw-zalo-outbound-media", +); + +async function writeHostedZaloMediaFixture(params: { + id: string; + routePath: string; + token: string; + buffer: Buffer; + contentType?: string; +}): Promise { + await mkdir(ZALO_OUTBOUND_MEDIA_DIR, { recursive: true, mode: 0o700 }); + await chmod(ZALO_OUTBOUND_MEDIA_DIR, 0o700).catch(() => undefined); + await Promise.all([ + writeFile( + join(ZALO_OUTBOUND_MEDIA_DIR, `${params.id}.json`), + JSON.stringify({ + routePath: params.routePath, + token: params.token, + contentType: params.contentType, + expiresAt: Date.now() + 60_000, + }), + { encoding: "utf8", mode: 0o600 }, + ), + writeFile(join(ZALO_OUTBOUND_MEDIA_DIR, `${params.id}.bin`), params.buffer, { mode: 0o600 }), + ]); +} + +function createHostedMediaResponse() { + const headers = new Map(); + const res = { + statusCode: 200, + headersSent: false, + setHeader(name: string, value: string) { + headers.set(name, value); + }, + end: vi.fn((body?: unknown) => { + res.headersSent = true; + return body; + }), + }; + return { headers, res: res as unknown as ServerResponse & { end: ReturnType } }; +} + describe("Zalo polling media replies", () => { const finalizeInboundContextMock = vi.fn((ctx: Record) => ctx); const recordInboundSessionMock = vi.fn(async () => undefined); @@ -43,6 +94,7 @@ describe("Zalo polling media replies", () => { beforeEach(async () => { await resetLifecycleTestState(); + clearHostedZaloMediaForTest(); prepareHostedZaloMediaUrlMock.mockReset(); prepareHostedZaloMediaUrlMock.mockResolvedValue( "https://example.com/hooks/zalo/media/abc123abc123abc123abc123?token=secret", @@ -79,6 +131,7 @@ describe("Zalo polling media replies", () => { }); afterAll(async () => { + clearHostedZaloMediaForTest(); await resetLifecycleTestState(); }); @@ -199,6 +252,123 @@ describe("Zalo polling media replies", () => { } }); + it("shares one hosted media route across accounts on the same path", async () => { + const registry = createEmptyPluginRegistry(); + setActivePluginRegistry(registry); + getUpdatesMock.mockImplementation(() => new Promise(() => {})); + + const { monitorZaloProvider } = await loadCachedLifecycleMonitorModule( + "zalo-polling-media-reply", + ); + const firstAbort = new AbortController(); + const firstRuntime = createRuntimeEnv(); + const firstSetup = createLifecycleMonitorSetup({ + accountId: "acct-zalo-polling-media-one", + dmPolicy: "open", + webhookUrl: "https://example.com/hooks/zalo", + }); + const firstRun = monitorZaloProvider({ + token: "zalo-token-one", + account: firstSetup.account, + config: firstSetup.config, + runtime: firstRuntime, + abortSignal: firstAbort.signal, + }); + + const secondAbort = new AbortController(); + let secondRun: Promise | undefined; + + try { + await settleAsyncWork(); + const firstHostedMediaRoutes = registry.httpRoutes.filter( + (route) => route.source === "zalo-hosted-media", + ); + expect(firstHostedMediaRoutes).toHaveLength(1); + const hostedMediaRoute = firstHostedMediaRoutes[0]; + expect(hostedMediaRoute).toEqual( + expect.objectContaining({ + path: "/hooks/zalo/media", + pluginId: "zalo", + }), + ); + + const secondRuntime = createRuntimeEnv(); + const secondSetup = createLifecycleMonitorSetup({ + accountId: "acct-zalo-polling-media-two", + dmPolicy: "open", + webhookUrl: "https://example.com/hooks/zalo", + }); + secondRun = monitorZaloProvider({ + token: "zalo-token-two", + account: secondSetup.account, + config: secondSetup.config, + runtime: secondRuntime, + abortSignal: secondAbort.signal, + }); + + await settleAsyncWork(); + const hostedMediaRoutes = registry.httpRoutes.filter( + (route) => route.source === "zalo-hosted-media", + ); + expect(hostedMediaRoutes).toHaveLength(1); + expect(hostedMediaRoutes[0]).toBe(hostedMediaRoute); + + await writeHostedZaloMediaFixture({ + id: "abc123abc123abc123abc123", + routePath: "/hooks/zalo/media/", + token: "route-token-one", + buffer: Buffer.from("first-image-bytes"), + contentType: "image/png", + }); + const firstFetch = createHostedMediaResponse(); + await hostedMediaRoute.handler( + { + method: "GET", + url: "/hooks/zalo/media/abc123abc123abc123abc123?token=route-token-one", + } as never, + firstFetch.res as never, + ); + expect(firstFetch.res.statusCode).toBe(200); + expect(firstFetch.headers.get("Content-Type")).toBe("image/png"); + expect(firstFetch.headers.get("Cache-Control")).toBe("no-store"); + expect(firstFetch.res.end).toHaveBeenCalledWith(Buffer.from("first-image-bytes")); + + firstAbort.abort(); + await firstRun; + expect(registry.httpRoutes.filter((route) => route.source === "zalo-hosted-media")).toEqual([ + hostedMediaRoute, + ]); + + await writeHostedZaloMediaFixture({ + id: "def456def456def456def456", + routePath: "/hooks/zalo/media/", + token: "route-token-two", + buffer: Buffer.from("second-image-bytes"), + contentType: "image/jpeg", + }); + const secondFetch = createHostedMediaResponse(); + await hostedMediaRoute.handler( + { + method: "GET", + url: "/hooks/zalo/media/def456def456def456def456?token=route-token-two", + } as never, + secondFetch.res as never, + ); + expect(secondFetch.res.statusCode).toBe(200); + expect(secondFetch.headers.get("Content-Type")).toBe("image/jpeg"); + expect(secondFetch.res.end).toHaveBeenCalledWith(Buffer.from("second-image-bytes")); + } finally { + firstAbort.abort(); + secondAbort.abort(); + await firstRun; + await secondRun; + } + + expect( + registry.httpRoutes.filter((route) => route.source === "zalo-hosted-media"), + ).toHaveLength(0); + }); + it("re-registers the hosted media route after the active registry swaps", async () => { const firstRegistry = createEmptyPluginRegistry(); setActivePluginRegistry(firstRegistry); diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 42024da67ee..a28e9dbaee1 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -108,7 +108,6 @@ function registerSharedHostedMediaRoute(params: { auth: "plugin", match: "prefix", path: params.path, - replaceExisting: true, pluginId: "zalo", source: "zalo-hosted-media", accountId: params.accountId,