mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:10:44 +00:00
fix: preserve plugin route ownership
This commit is contained in:
@@ -1,15 +1,18 @@
|
||||
import crypto from "node:crypto";
|
||||
import { EventEmitter } from "node:events";
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
|
||||
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { WEBHOOK_IN_FLIGHT_DEFAULTS } from "openclaw/plugin-sdk/webhook-request-guards";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createMockIncomingRequest } from "openclaw/plugin-sdk/test-env";
|
||||
|
||||
type LineNodeWebhookHandler = (req: IncomingMessage, res: ServerResponse) => Promise<void>;
|
||||
|
||||
const {
|
||||
createLineBotMock,
|
||||
createLineNodeWebhookHandlerMock,
|
||||
registerPluginHttpRouteMock,
|
||||
registerWebhookTargetWithPluginRouteMock,
|
||||
unregisterHttpMock,
|
||||
} = vi.hoisted(() => ({
|
||||
createLineBotMock: vi.fn(() => ({
|
||||
@@ -19,7 +22,7 @@ const {
|
||||
createLineNodeWebhookHandlerMock: vi.fn<() => LineNodeWebhookHandler>(() =>
|
||||
vi.fn<LineNodeWebhookHandler>(async () => {}),
|
||||
),
|
||||
registerPluginHttpRouteMock: vi.fn(),
|
||||
registerWebhookTargetWithPluginRouteMock: vi.fn(),
|
||||
unregisterHttpMock: vi.fn(),
|
||||
}));
|
||||
|
||||
@@ -53,14 +56,24 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({
|
||||
createChannelReplyPipeline: vi.fn(() => ({})),
|
||||
}));
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/webhook-ingress", () => ({
|
||||
normalizePluginHttpPath: (_path: string | undefined, fallback: string) => fallback,
|
||||
registerPluginHttpRoute: registerPluginHttpRouteMock,
|
||||
}));
|
||||
vi.mock("openclaw/plugin-sdk/webhook-ingress", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/webhook-ingress")>(
|
||||
"openclaw/plugin-sdk/webhook-ingress",
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
normalizePluginHttpPath: (path: string | undefined, fallback: string) => path ?? fallback,
|
||||
registerWebhookTargetWithPluginRoute: registerWebhookTargetWithPluginRouteMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./webhook-node.js", () => ({
|
||||
createLineNodeWebhookHandler: createLineNodeWebhookHandlerMock,
|
||||
}));
|
||||
vi.mock("./webhook-node.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./webhook-node.js")>("./webhook-node.js");
|
||||
return {
|
||||
...actual,
|
||||
createLineNodeWebhookHandler: createLineNodeWebhookHandlerMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./auto-reply-delivery.js", () => ({
|
||||
deliverLineAutoReply: vi.fn(),
|
||||
@@ -101,16 +114,37 @@ describe("monitorLineProvider lifecycle", () => {
|
||||
beforeEach(() => {
|
||||
clearLineRuntimeStateForTests();
|
||||
createLineBotMock.mockReset();
|
||||
createLineBotMock.mockReturnValue({
|
||||
createLineBotMock.mockImplementation(() => ({
|
||||
account: { accountId: "default" },
|
||||
handleWebhook: vi.fn(),
|
||||
});
|
||||
}));
|
||||
innerLineWebhookHandlerMock = vi.fn<LineNodeWebhookHandler>(async () => {});
|
||||
createLineNodeWebhookHandlerMock
|
||||
.mockReset()
|
||||
.mockImplementation(() => innerLineWebhookHandlerMock);
|
||||
unregisterHttpMock.mockReset();
|
||||
registerPluginHttpRouteMock.mockReset().mockReturnValue(unregisterHttpMock);
|
||||
registerWebhookTargetWithPluginRouteMock.mockReset().mockImplementation((params) => {
|
||||
const key = params.target.path.startsWith("/")
|
||||
? params.target.path
|
||||
: `/${params.target.path}`;
|
||||
const normalizedTarget = { ...params.target, path: key };
|
||||
const existing = params.targetsByPath.get(key) ?? [];
|
||||
params.targetsByPath.set(key, [...existing, normalizedTarget]);
|
||||
return {
|
||||
target: normalizedTarget,
|
||||
unregister: () => {
|
||||
unregisterHttpMock();
|
||||
const updated = (params.targetsByPath.get(key) ?? []).filter(
|
||||
(entry: unknown) => entry !== normalizedTarget,
|
||||
);
|
||||
if (updated.length > 0) {
|
||||
params.targetsByPath.set(key, updated);
|
||||
} else {
|
||||
params.targetsByPath.delete(key);
|
||||
}
|
||||
},
|
||||
};
|
||||
});
|
||||
});
|
||||
|
||||
const createRouteResponse = () => {
|
||||
@@ -140,9 +174,9 @@ describe("monitorLineProvider lifecycle", () => {
|
||||
return monitor;
|
||||
});
|
||||
|
||||
expect(registerPluginHttpRouteMock).toHaveBeenCalledTimes(1);
|
||||
expect(registerPluginHttpRouteMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ auth: "plugin" }),
|
||||
expect(registerWebhookTargetWithPluginRouteMock).toHaveBeenCalledTimes(1);
|
||||
expect(registerWebhookTargetWithPluginRouteMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ route: expect.objectContaining({ auth: "plugin" }) }),
|
||||
);
|
||||
expect(resolved).toBe(false);
|
||||
|
||||
@@ -151,6 +185,31 @@ describe("monitorLineProvider lifecycle", () => {
|
||||
expect(unregisterHttpMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("registers an account target without replacing existing route ownership", async () => {
|
||||
const monitor = await monitorLineProvider({
|
||||
channelAccessToken: "token",
|
||||
channelSecret: "secret", // pragma: allowlist secret
|
||||
accountId: "work",
|
||||
config: {} as OpenClawConfig,
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
|
||||
const registration = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0];
|
||||
expect(registration).toEqual(
|
||||
expect.objectContaining({
|
||||
target: expect.objectContaining({ accountId: "work", path: "/line/webhook" }),
|
||||
route: expect.objectContaining({
|
||||
accountId: "work",
|
||||
auth: "plugin",
|
||||
pluginId: "line",
|
||||
}),
|
||||
}),
|
||||
);
|
||||
expect(registration?.route).not.toHaveProperty("path");
|
||||
expect(registration?.route).not.toHaveProperty("replaceExisting");
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("stops immediately when signal is already aborted", async () => {
|
||||
const abort = new AbortController();
|
||||
abort.abort();
|
||||
@@ -210,26 +269,100 @@ describe("monitorLineProvider lifecycle", () => {
|
||||
monitor.stop();
|
||||
});
|
||||
|
||||
it("rejects webhook requests above the shared in-flight limit before body handling", async () => {
|
||||
const limit = WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey;
|
||||
const releaseRequests: Array<() => void> = [];
|
||||
let reachLimit!: () => void;
|
||||
const reachedLimit = new Promise<void>((resolve) => {
|
||||
reachLimit = resolve;
|
||||
it("dispatches shared-path webhook posts to the account matching the signature", async () => {
|
||||
const firstMonitor = await monitorLineProvider({
|
||||
channelAccessToken: "first-token",
|
||||
channelSecret: "first-secret", // pragma: allowlist secret
|
||||
accountId: "first",
|
||||
config: {} as OpenClawConfig,
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
const secondMonitor = await monitorLineProvider({
|
||||
channelAccessToken: "second-token",
|
||||
channelSecret: "second-secret", // pragma: allowlist secret
|
||||
accountId: "second",
|
||||
config: {} as OpenClawConfig,
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
|
||||
innerLineWebhookHandlerMock.mockImplementation(
|
||||
async (_req: IncomingMessage, res: ServerResponse) => {
|
||||
if (releaseRequests.length === limit - 1) {
|
||||
reachLimit();
|
||||
}
|
||||
await new Promise<void>((resolve) => {
|
||||
releaseRequests.push(resolve);
|
||||
});
|
||||
res.statusCode = 200;
|
||||
res.end();
|
||||
},
|
||||
);
|
||||
const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as
|
||||
| { handler: (req: IncomingMessage, res: ServerResponse) => Promise<void> }
|
||||
| undefined;
|
||||
expect(route).toBeDefined();
|
||||
|
||||
const payload = JSON.stringify({ events: [{ type: "message" }] });
|
||||
const signature = crypto.createHmac("SHA256", "second-secret").update(payload).digest("base64");
|
||||
const req = Object.assign(createMockIncomingRequest([payload]), {
|
||||
method: "POST",
|
||||
headers: { "x-line-signature": signature },
|
||||
}) as unknown as IncomingMessage;
|
||||
const res = createRouteResponse();
|
||||
|
||||
await route!.handler(req, res);
|
||||
|
||||
const firstBot = createLineBotMock.mock.results[0]?.value as {
|
||||
handleWebhook: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
const secondBot = createLineBotMock.mock.results[1]?.value as {
|
||||
handleWebhook: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
expect(res.statusCode).toBe(200);
|
||||
expect(firstBot.handleWebhook).not.toHaveBeenCalled();
|
||||
expect(secondBot.handleWebhook).toHaveBeenCalledTimes(1);
|
||||
|
||||
firstMonitor.stop();
|
||||
secondMonitor.stop();
|
||||
});
|
||||
|
||||
it("rejects ambiguous shared-path webhook signatures", async () => {
|
||||
const firstMonitor = await monitorLineProvider({
|
||||
channelAccessToken: "first-token",
|
||||
channelSecret: "shared-secret", // pragma: allowlist secret
|
||||
accountId: "first",
|
||||
config: {} as OpenClawConfig,
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
const secondMonitor = await monitorLineProvider({
|
||||
channelAccessToken: "second-token",
|
||||
channelSecret: "shared-secret", // pragma: allowlist secret
|
||||
accountId: "second",
|
||||
config: {} as OpenClawConfig,
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
|
||||
const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as
|
||||
| { handler: (req: IncomingMessage, res: ServerResponse) => Promise<void> }
|
||||
| undefined;
|
||||
expect(route).toBeDefined();
|
||||
|
||||
const payload = JSON.stringify({ events: [{ type: "message" }] });
|
||||
const signature = crypto.createHmac("SHA256", "shared-secret").update(payload).digest("base64");
|
||||
const req = Object.assign(createMockIncomingRequest([payload]), {
|
||||
method: "POST",
|
||||
headers: { "x-line-signature": signature },
|
||||
}) as unknown as IncomingMessage;
|
||||
const res = createRouteResponse();
|
||||
|
||||
await route!.handler(req, res);
|
||||
|
||||
const firstBot = createLineBotMock.mock.results[0]?.value as {
|
||||
handleWebhook: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
const secondBot = createLineBotMock.mock.results[1]?.value as {
|
||||
handleWebhook: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
expect(res.statusCode).toBe(401);
|
||||
expect(res.end).toHaveBeenCalledWith(JSON.stringify({ error: "Ambiguous webhook target" }));
|
||||
expect(firstBot.handleWebhook).not.toHaveBeenCalled();
|
||||
expect(secondBot.handleWebhook).not.toHaveBeenCalled();
|
||||
|
||||
firstMonitor.stop();
|
||||
secondMonitor.stop();
|
||||
});
|
||||
|
||||
it("rejects webhook requests above the shared in-flight limit before body handling", async () => {
|
||||
const limit = WEBHOOK_IN_FLIGHT_DEFAULTS.maxInFlightPerKey;
|
||||
const heldRequests: Array<EventEmitter & { destroy: () => void }> = [];
|
||||
|
||||
const monitor = await monitorLineProvider({
|
||||
channelAccessToken: "token",
|
||||
@@ -238,30 +371,51 @@ describe("monitorLineProvider lifecycle", () => {
|
||||
runtime: {} as RuntimeEnv,
|
||||
});
|
||||
|
||||
const route = registerPluginHttpRouteMock.mock.calls[0]?.[0] as
|
||||
const route = registerWebhookTargetWithPluginRouteMock.mock.calls[0]?.[0]?.route as
|
||||
| { handler: (req: IncomingMessage, res: ServerResponse) => Promise<void> }
|
||||
| undefined;
|
||||
expect(route).toBeDefined();
|
||||
const createPostRequest = () =>
|
||||
({
|
||||
const createHeldPostRequest = () => {
|
||||
const req = Object.assign(new EventEmitter(), {
|
||||
destroyed: false,
|
||||
destroy(this: EventEmitter & { destroyed: boolean }) {
|
||||
this.destroyed = true;
|
||||
this.emit("close");
|
||||
},
|
||||
});
|
||||
heldRequests.push(req);
|
||||
return Object.assign(req, {
|
||||
method: "POST",
|
||||
headers: {},
|
||||
}) as IncomingMessage;
|
||||
headers: { "x-line-signature": "pending" },
|
||||
}) as unknown as IncomingMessage;
|
||||
};
|
||||
const createSignedPostRequest = () => {
|
||||
const payload = JSON.stringify({ events: [{ type: "message" }] });
|
||||
const signature = crypto.createHmac("SHA256", "secret").update(payload).digest("base64");
|
||||
const req = createMockIncomingRequest([payload]);
|
||||
return Object.assign(req, {
|
||||
method: "POST",
|
||||
headers: { "x-line-signature": signature },
|
||||
}) as unknown as IncomingMessage;
|
||||
};
|
||||
|
||||
const firstRequests = Array.from({ length: limit }, () =>
|
||||
route!.handler(createPostRequest(), createRouteResponse()),
|
||||
route!.handler(createHeldPostRequest(), createRouteResponse()),
|
||||
);
|
||||
await reachedLimit;
|
||||
await new Promise((resolve) => setImmediate(resolve));
|
||||
|
||||
const overflowResponse = createRouteResponse();
|
||||
await route!.handler(createPostRequest(), overflowResponse);
|
||||
await route!.handler(createSignedPostRequest(), overflowResponse);
|
||||
|
||||
expect(innerLineWebhookHandlerMock).toHaveBeenCalledTimes(limit);
|
||||
const bot = createLineBotMock.mock.results[0]?.value as {
|
||||
handleWebhook: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
expect(bot.handleWebhook).not.toHaveBeenCalled();
|
||||
expect(overflowResponse.statusCode).toBe(429);
|
||||
expect(overflowResponse.end).toHaveBeenCalledWith("Too Many Requests");
|
||||
|
||||
releaseRequests.splice(0).forEach((release) => release());
|
||||
await Promise.all(firstRequests);
|
||||
heldRequests.splice(0).forEach((req) => req.destroy());
|
||||
await Promise.allSettled(firstRequests);
|
||||
monitor.stop();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -12,8 +12,11 @@ import {
|
||||
type RuntimeEnv,
|
||||
} from "openclaw/plugin-sdk/runtime-env";
|
||||
import {
|
||||
isRequestBodyLimitError,
|
||||
normalizePluginHttpPath,
|
||||
registerPluginHttpRoute,
|
||||
registerWebhookTargetWithPluginRoute,
|
||||
requestBodyErrorToText,
|
||||
resolveSingleWebhookTarget,
|
||||
} from "openclaw/plugin-sdk/webhook-ingress";
|
||||
import {
|
||||
beginWebhookRequestPipelineOrReject,
|
||||
@@ -39,7 +42,8 @@ import {
|
||||
} from "./send.js";
|
||||
import { buildTemplateMessageFromPayload } from "./template-messages.js";
|
||||
import type { LineChannelData, ResolvedLineAccount } from "./types.js";
|
||||
import { createLineNodeWebhookHandler } from "./webhook-node.js";
|
||||
import { createLineNodeWebhookHandler, readLineWebhookRequestBody } from "./webhook-node.js";
|
||||
import { parseLineWebhookBody, validateLineSignature } from "./webhook-utils.js";
|
||||
|
||||
export interface MonitorLineProviderOptions {
|
||||
channelAccessToken: string;
|
||||
@@ -70,6 +74,18 @@ const runtimeState = new Map<
|
||||
}
|
||||
>();
|
||||
const lineWebhookInFlightLimiter = createWebhookInFlightLimiter();
|
||||
const LINE_WEBHOOK_PREAUTH_MAX_BODY_BYTES = 64 * 1024;
|
||||
const LINE_WEBHOOK_PREAUTH_BODY_TIMEOUT_MS = 5_000;
|
||||
|
||||
type LineWebhookTarget = {
|
||||
accountId: string;
|
||||
bot: ReturnType<typeof createLineBot>;
|
||||
channelSecret: string;
|
||||
path: string;
|
||||
runtime: RuntimeEnv;
|
||||
};
|
||||
|
||||
const lineWebhookTargets = new Map<string, LineWebhookTarget[]>();
|
||||
|
||||
function recordChannelRuntimeState(params: {
|
||||
channel: string;
|
||||
@@ -303,41 +319,130 @@ export async function monitorLineProvider(
|
||||
});
|
||||
|
||||
const normalizedPath = normalizePluginHttpPath(webhookPath, "/line/webhook") ?? "/line/webhook";
|
||||
const createScopedLineWebhookHandler = (onRequestAuthenticated?: () => void) =>
|
||||
const createScopedLineWebhookHandler = (target: LineWebhookTarget) =>
|
||||
createLineNodeWebhookHandler({
|
||||
channelSecret: secret,
|
||||
bot,
|
||||
runtime,
|
||||
onRequestAuthenticated,
|
||||
channelSecret: target.channelSecret,
|
||||
bot: target.bot,
|
||||
runtime: target.runtime,
|
||||
});
|
||||
const unregisterHttp = registerPluginHttpRoute({
|
||||
path: normalizedPath,
|
||||
auth: "plugin",
|
||||
replaceExisting: true,
|
||||
pluginId: "line",
|
||||
accountId: resolvedAccountId,
|
||||
log: (msg) => logVerbose(msg),
|
||||
handler: async (req, res) => {
|
||||
if (req.method !== "POST") {
|
||||
await createScopedLineWebhookHandler()(req, res);
|
||||
return;
|
||||
}
|
||||
const { unregister: unregisterHttp } = registerWebhookTargetWithPluginRoute({
|
||||
targetsByPath: lineWebhookTargets,
|
||||
target: {
|
||||
accountId: resolvedAccountId,
|
||||
bot,
|
||||
channelSecret: secret,
|
||||
path: normalizedPath,
|
||||
runtime,
|
||||
},
|
||||
route: {
|
||||
auth: "plugin",
|
||||
pluginId: "line",
|
||||
accountId: resolvedAccountId,
|
||||
log: (msg) => logVerbose(msg),
|
||||
handler: async (req, res) => {
|
||||
const targets = lineWebhookTargets.get(normalizedPath) ?? [];
|
||||
const firstTarget = targets[0];
|
||||
if (req.method !== "POST") {
|
||||
if (!firstTarget) {
|
||||
res.statusCode = 404;
|
||||
res.end("Not Found");
|
||||
return;
|
||||
}
|
||||
await createScopedLineWebhookHandler(firstTarget)(req, res);
|
||||
return;
|
||||
}
|
||||
|
||||
const requestLifecycle = beginWebhookRequestPipelineOrReject({
|
||||
req,
|
||||
res,
|
||||
inFlightLimiter: lineWebhookInFlightLimiter,
|
||||
inFlightKey: `line:${resolvedAccountId}`,
|
||||
});
|
||||
if (!requestLifecycle.ok) {
|
||||
return;
|
||||
}
|
||||
const requestLifecycle = beginWebhookRequestPipelineOrReject({
|
||||
req,
|
||||
res,
|
||||
inFlightLimiter: lineWebhookInFlightLimiter,
|
||||
inFlightKey: `line:${normalizedPath}`,
|
||||
});
|
||||
if (!requestLifecycle.ok) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await createScopedLineWebhookHandler(requestLifecycle.release)(req, res);
|
||||
} finally {
|
||||
requestLifecycle.release();
|
||||
}
|
||||
try {
|
||||
const signatureHeader = req.headers["x-line-signature"];
|
||||
const signature =
|
||||
typeof signatureHeader === "string"
|
||||
? signatureHeader.trim()
|
||||
: Array.isArray(signatureHeader)
|
||||
? (signatureHeader[0] ?? "").trim()
|
||||
: "";
|
||||
|
||||
if (!signature) {
|
||||
logVerbose("line: webhook missing X-Line-Signature header");
|
||||
res.statusCode = 400;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Missing X-Line-Signature header" }));
|
||||
return;
|
||||
}
|
||||
|
||||
const rawBody = await readLineWebhookRequestBody(
|
||||
req,
|
||||
LINE_WEBHOOK_PREAUTH_MAX_BODY_BYTES,
|
||||
LINE_WEBHOOK_PREAUTH_BODY_TIMEOUT_MS,
|
||||
);
|
||||
const match = resolveSingleWebhookTarget(targets, (target) =>
|
||||
validateLineSignature(rawBody, signature, target.channelSecret),
|
||||
);
|
||||
if (match.kind === "none") {
|
||||
logVerbose("line: webhook signature validation failed");
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Invalid signature" }));
|
||||
return;
|
||||
}
|
||||
if (match.kind === "ambiguous") {
|
||||
logVerbose("line: webhook signature matched multiple accounts");
|
||||
res.statusCode = 401;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Ambiguous webhook target" }));
|
||||
return;
|
||||
}
|
||||
|
||||
const body = parseLineWebhookBody(rawBody);
|
||||
if (!body) {
|
||||
res.statusCode = 400;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Invalid webhook payload" }));
|
||||
return;
|
||||
}
|
||||
|
||||
requestLifecycle.release();
|
||||
|
||||
if (body.events && body.events.length > 0) {
|
||||
logVerbose(`line: received ${body.events.length} webhook events`);
|
||||
await match.target.bot.handleWebhook(body);
|
||||
}
|
||||
|
||||
res.statusCode = 200;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ status: "ok" }));
|
||||
} catch (err) {
|
||||
if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) {
|
||||
res.statusCode = 413;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Payload too large" }));
|
||||
return;
|
||||
}
|
||||
if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) {
|
||||
res.statusCode = 408;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") }));
|
||||
return;
|
||||
}
|
||||
runtime.error?.(danger(`line webhook error: ${String(err)}`));
|
||||
if (!res.headersSent) {
|
||||
res.statusCode = 500;
|
||||
res.setHeader("Content-Type", "application/json");
|
||||
res.end(JSON.stringify({ error: "Internal server error" }));
|
||||
}
|
||||
} finally {
|
||||
requestLifecycle.release();
|
||||
}
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import { chmod, mkdir, writeFile } from "node:fs/promises";
|
||||
import type { ServerResponse } from "node:http";
|
||||
import { join } from "node:path";
|
||||
import {
|
||||
createEmptyPluginRegistry,
|
||||
createRuntimeEnv,
|
||||
setActivePluginRegistry,
|
||||
} from "openclaw/plugin-sdk/plugin-test-runtime";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { afterAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { PluginRuntime } from "../runtime-api.js";
|
||||
import {
|
||||
@@ -28,6 +32,53 @@ vi.mock("./outbound-media.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
import { clearHostedZaloMediaForTest } from "./outbound-media.js";
|
||||
|
||||
const ZALO_OUTBOUND_MEDIA_DIR = join(
|
||||
resolvePreferredOpenClawTmpDir(),
|
||||
"openclaw-zalo-outbound-media",
|
||||
);
|
||||
|
||||
async function writeHostedZaloMediaFixture(params: {
|
||||
id: string;
|
||||
routePath: string;
|
||||
token: string;
|
||||
buffer: Buffer;
|
||||
contentType?: string;
|
||||
}): Promise<void> {
|
||||
await mkdir(ZALO_OUTBOUND_MEDIA_DIR, { recursive: true, mode: 0o700 });
|
||||
await chmod(ZALO_OUTBOUND_MEDIA_DIR, 0o700).catch(() => undefined);
|
||||
await Promise.all([
|
||||
writeFile(
|
||||
join(ZALO_OUTBOUND_MEDIA_DIR, `${params.id}.json`),
|
||||
JSON.stringify({
|
||||
routePath: params.routePath,
|
||||
token: params.token,
|
||||
contentType: params.contentType,
|
||||
expiresAt: Date.now() + 60_000,
|
||||
}),
|
||||
{ encoding: "utf8", mode: 0o600 },
|
||||
),
|
||||
writeFile(join(ZALO_OUTBOUND_MEDIA_DIR, `${params.id}.bin`), params.buffer, { mode: 0o600 }),
|
||||
]);
|
||||
}
|
||||
|
||||
function createHostedMediaResponse() {
|
||||
const headers = new Map<string, string>();
|
||||
const res = {
|
||||
statusCode: 200,
|
||||
headersSent: false,
|
||||
setHeader(name: string, value: string) {
|
||||
headers.set(name, value);
|
||||
},
|
||||
end: vi.fn((body?: unknown) => {
|
||||
res.headersSent = true;
|
||||
return body;
|
||||
}),
|
||||
};
|
||||
return { headers, res: res as unknown as ServerResponse & { end: ReturnType<typeof vi.fn> } };
|
||||
}
|
||||
|
||||
describe("Zalo polling media replies", () => {
|
||||
const finalizeInboundContextMock = vi.fn((ctx: Record<string, unknown>) => ctx);
|
||||
const recordInboundSessionMock = vi.fn(async () => undefined);
|
||||
@@ -43,6 +94,7 @@ describe("Zalo polling media replies", () => {
|
||||
|
||||
beforeEach(async () => {
|
||||
await resetLifecycleTestState();
|
||||
clearHostedZaloMediaForTest();
|
||||
prepareHostedZaloMediaUrlMock.mockReset();
|
||||
prepareHostedZaloMediaUrlMock.mockResolvedValue(
|
||||
"https://example.com/hooks/zalo/media/abc123abc123abc123abc123?token=secret",
|
||||
@@ -79,6 +131,7 @@ describe("Zalo polling media replies", () => {
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
clearHostedZaloMediaForTest();
|
||||
await resetLifecycleTestState();
|
||||
});
|
||||
|
||||
@@ -199,6 +252,123 @@ describe("Zalo polling media replies", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("shares one hosted media route across accounts on the same path", async () => {
|
||||
const registry = createEmptyPluginRegistry();
|
||||
setActivePluginRegistry(registry);
|
||||
getUpdatesMock.mockImplementation(() => new Promise(() => {}));
|
||||
|
||||
const { monitorZaloProvider } = await loadCachedLifecycleMonitorModule(
|
||||
"zalo-polling-media-reply",
|
||||
);
|
||||
const firstAbort = new AbortController();
|
||||
const firstRuntime = createRuntimeEnv();
|
||||
const firstSetup = createLifecycleMonitorSetup({
|
||||
accountId: "acct-zalo-polling-media-one",
|
||||
dmPolicy: "open",
|
||||
webhookUrl: "https://example.com/hooks/zalo",
|
||||
});
|
||||
const firstRun = monitorZaloProvider({
|
||||
token: "zalo-token-one",
|
||||
account: firstSetup.account,
|
||||
config: firstSetup.config,
|
||||
runtime: firstRuntime,
|
||||
abortSignal: firstAbort.signal,
|
||||
});
|
||||
|
||||
const secondAbort = new AbortController();
|
||||
let secondRun: Promise<void> | undefined;
|
||||
|
||||
try {
|
||||
await settleAsyncWork();
|
||||
const firstHostedMediaRoutes = registry.httpRoutes.filter(
|
||||
(route) => route.source === "zalo-hosted-media",
|
||||
);
|
||||
expect(firstHostedMediaRoutes).toHaveLength(1);
|
||||
const hostedMediaRoute = firstHostedMediaRoutes[0];
|
||||
expect(hostedMediaRoute).toEqual(
|
||||
expect.objectContaining({
|
||||
path: "/hooks/zalo/media",
|
||||
pluginId: "zalo",
|
||||
}),
|
||||
);
|
||||
|
||||
const secondRuntime = createRuntimeEnv();
|
||||
const secondSetup = createLifecycleMonitorSetup({
|
||||
accountId: "acct-zalo-polling-media-two",
|
||||
dmPolicy: "open",
|
||||
webhookUrl: "https://example.com/hooks/zalo",
|
||||
});
|
||||
secondRun = monitorZaloProvider({
|
||||
token: "zalo-token-two",
|
||||
account: secondSetup.account,
|
||||
config: secondSetup.config,
|
||||
runtime: secondRuntime,
|
||||
abortSignal: secondAbort.signal,
|
||||
});
|
||||
|
||||
await settleAsyncWork();
|
||||
const hostedMediaRoutes = registry.httpRoutes.filter(
|
||||
(route) => route.source === "zalo-hosted-media",
|
||||
);
|
||||
expect(hostedMediaRoutes).toHaveLength(1);
|
||||
expect(hostedMediaRoutes[0]).toBe(hostedMediaRoute);
|
||||
|
||||
await writeHostedZaloMediaFixture({
|
||||
id: "abc123abc123abc123abc123",
|
||||
routePath: "/hooks/zalo/media/",
|
||||
token: "route-token-one",
|
||||
buffer: Buffer.from("first-image-bytes"),
|
||||
contentType: "image/png",
|
||||
});
|
||||
const firstFetch = createHostedMediaResponse();
|
||||
await hostedMediaRoute.handler(
|
||||
{
|
||||
method: "GET",
|
||||
url: "/hooks/zalo/media/abc123abc123abc123abc123?token=route-token-one",
|
||||
} as never,
|
||||
firstFetch.res as never,
|
||||
);
|
||||
expect(firstFetch.res.statusCode).toBe(200);
|
||||
expect(firstFetch.headers.get("Content-Type")).toBe("image/png");
|
||||
expect(firstFetch.headers.get("Cache-Control")).toBe("no-store");
|
||||
expect(firstFetch.res.end).toHaveBeenCalledWith(Buffer.from("first-image-bytes"));
|
||||
|
||||
firstAbort.abort();
|
||||
await firstRun;
|
||||
expect(registry.httpRoutes.filter((route) => route.source === "zalo-hosted-media")).toEqual([
|
||||
hostedMediaRoute,
|
||||
]);
|
||||
|
||||
await writeHostedZaloMediaFixture({
|
||||
id: "def456def456def456def456",
|
||||
routePath: "/hooks/zalo/media/",
|
||||
token: "route-token-two",
|
||||
buffer: Buffer.from("second-image-bytes"),
|
||||
contentType: "image/jpeg",
|
||||
});
|
||||
const secondFetch = createHostedMediaResponse();
|
||||
await hostedMediaRoute.handler(
|
||||
{
|
||||
method: "GET",
|
||||
url: "/hooks/zalo/media/def456def456def456def456?token=route-token-two",
|
||||
} as never,
|
||||
secondFetch.res as never,
|
||||
);
|
||||
expect(secondFetch.res.statusCode).toBe(200);
|
||||
expect(secondFetch.headers.get("Content-Type")).toBe("image/jpeg");
|
||||
expect(secondFetch.res.end).toHaveBeenCalledWith(Buffer.from("second-image-bytes"));
|
||||
} finally {
|
||||
firstAbort.abort();
|
||||
secondAbort.abort();
|
||||
await firstRun;
|
||||
await secondRun;
|
||||
}
|
||||
|
||||
expect(
|
||||
registry.httpRoutes.filter((route) => route.source === "zalo-hosted-media"),
|
||||
).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("re-registers the hosted media route after the active registry swaps", async () => {
|
||||
const firstRegistry = createEmptyPluginRegistry();
|
||||
setActivePluginRegistry(firstRegistry);
|
||||
|
||||
@@ -108,7 +108,6 @@ function registerSharedHostedMediaRoute(params: {
|
||||
auth: "plugin",
|
||||
match: "prefix",
|
||||
path: params.path,
|
||||
replaceExisting: true,
|
||||
pluginId: "zalo",
|
||||
source: "zalo-hosted-media",
|
||||
accountId: params.accountId,
|
||||
|
||||
Reference in New Issue
Block a user