mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
Zalo: fix provider lifecycle restarts (#39892)
* Zalo: fix provider lifecycle restarts * Zalo: add typing indicators, smart webhook cleanup, and API type fixes * fix review * add allow list test secrect * Zalo: bound webhook cleanup during shutdown * Zalo: bound typing chat action timeout * Zalo: use plugin-safe abort helper import
This commit is contained in:
63
extensions/zalo/src/api.test.ts
Normal file
63
extensions/zalo/src/api.test.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { deleteWebhook, getWebhookInfo, sendChatAction, type ZaloFetch } from "./api.js";
|
||||
|
||||
describe("Zalo API request methods", () => {
|
||||
it("uses POST for getWebhookInfo", async () => {
|
||||
const fetcher = vi.fn<ZaloFetch>(
|
||||
async () => new Response(JSON.stringify({ ok: true, result: {} })),
|
||||
);
|
||||
|
||||
await getWebhookInfo("test-token", fetcher);
|
||||
|
||||
expect(fetcher).toHaveBeenCalledTimes(1);
|
||||
const [, init] = fetcher.mock.calls[0] ?? [];
|
||||
expect(init?.method).toBe("POST");
|
||||
expect(init?.headers).toEqual({ "Content-Type": "application/json" });
|
||||
});
|
||||
|
||||
it("keeps POST for deleteWebhook", async () => {
|
||||
const fetcher = vi.fn<ZaloFetch>(
|
||||
async () => new Response(JSON.stringify({ ok: true, result: {} })),
|
||||
);
|
||||
|
||||
await deleteWebhook("test-token", fetcher);
|
||||
|
||||
expect(fetcher).toHaveBeenCalledTimes(1);
|
||||
const [, init] = fetcher.mock.calls[0] ?? [];
|
||||
expect(init?.method).toBe("POST");
|
||||
expect(init?.headers).toEqual({ "Content-Type": "application/json" });
|
||||
});
|
||||
|
||||
it("aborts sendChatAction when the typing timeout elapses", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const fetcher = vi.fn<ZaloFetch>(
|
||||
(_, init) =>
|
||||
new Promise<Response>((_, reject) => {
|
||||
init?.signal?.addEventListener("abort", () => reject(new Error("aborted")), {
|
||||
once: true,
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
const promise = sendChatAction(
|
||||
"test-token",
|
||||
{
|
||||
chat_id: "chat-123",
|
||||
action: "typing",
|
||||
},
|
||||
fetcher,
|
||||
25,
|
||||
);
|
||||
const rejected = expect(promise).rejects.toThrow("aborted");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(25);
|
||||
|
||||
await rejected;
|
||||
const [, init] = fetcher.mock.calls[0] ?? [];
|
||||
expect(init?.signal?.aborted).toBe(true);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -58,11 +58,22 @@ export type ZaloSendPhotoParams = {
|
||||
caption?: string;
|
||||
};
|
||||
|
||||
export type ZaloSendChatActionParams = {
|
||||
chat_id: string;
|
||||
action: "typing" | "upload_photo";
|
||||
};
|
||||
|
||||
export type ZaloSetWebhookParams = {
|
||||
url: string;
|
||||
secret_token: string;
|
||||
};
|
||||
|
||||
export type ZaloWebhookInfo = {
|
||||
url?: string;
|
||||
updated_at?: number;
|
||||
has_custom_certificate?: boolean;
|
||||
};
|
||||
|
||||
export type ZaloGetUpdatesParams = {
|
||||
/** Timeout in seconds (passed as string to API) */
|
||||
timeout?: number;
|
||||
@@ -161,6 +172,21 @@ export async function sendPhoto(
|
||||
return callZaloApi<ZaloMessage>("sendPhoto", token, params, { fetch: fetcher });
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a temporary chat action such as typing.
|
||||
*/
|
||||
export async function sendChatAction(
|
||||
token: string,
|
||||
params: ZaloSendChatActionParams,
|
||||
fetcher?: ZaloFetch,
|
||||
timeoutMs?: number,
|
||||
): Promise<ZaloApiResponse<boolean>> {
|
||||
return callZaloApi<boolean>("sendChatAction", token, params, {
|
||||
timeoutMs,
|
||||
fetch: fetcher,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get updates using long polling (dev/testing only)
|
||||
* Note: Zalo returns a single update per call, not an array like Telegram
|
||||
@@ -183,8 +209,8 @@ export async function setWebhook(
|
||||
token: string,
|
||||
params: ZaloSetWebhookParams,
|
||||
fetcher?: ZaloFetch,
|
||||
): Promise<ZaloApiResponse<boolean>> {
|
||||
return callZaloApi<boolean>("setWebhook", token, params, { fetch: fetcher });
|
||||
): Promise<ZaloApiResponse<ZaloWebhookInfo>> {
|
||||
return callZaloApi<ZaloWebhookInfo>("setWebhook", token, params, { fetch: fetcher });
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -193,8 +219,12 @@ export async function setWebhook(
|
||||
export async function deleteWebhook(
|
||||
token: string,
|
||||
fetcher?: ZaloFetch,
|
||||
): Promise<ZaloApiResponse<boolean>> {
|
||||
return callZaloApi<boolean>("deleteWebhook", token, undefined, { fetch: fetcher });
|
||||
timeoutMs?: number,
|
||||
): Promise<ZaloApiResponse<ZaloWebhookInfo>> {
|
||||
return callZaloApi<ZaloWebhookInfo>("deleteWebhook", token, undefined, {
|
||||
timeoutMs,
|
||||
fetch: fetcher,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -203,6 +233,6 @@ export async function deleteWebhook(
|
||||
export async function getWebhookInfo(
|
||||
token: string,
|
||||
fetcher?: ZaloFetch,
|
||||
): Promise<ZaloApiResponse<{ url?: string; has_custom_certificate?: boolean }>> {
|
||||
return callZaloApi("getWebhookInfo", token, undefined, { fetch: fetcher });
|
||||
): Promise<ZaloApiResponse<ZaloWebhookInfo>> {
|
||||
return callZaloApi<ZaloWebhookInfo>("getWebhookInfo", token, undefined, { fetch: fetcher });
|
||||
}
|
||||
|
||||
100
extensions/zalo/src/channel.startup.test.ts
Normal file
100
extensions/zalo/src/channel.startup.test.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/zalo";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createStartAccountContext } from "../../test-utils/start-account-context.js";
|
||||
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
monitorZaloProvider: vi.fn(),
|
||||
probeZalo: vi.fn(async () => ({
|
||||
ok: false as const,
|
||||
error: "probe failed",
|
||||
elapsedMs: 1,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("./monitor.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./monitor.js")>("./monitor.js");
|
||||
return {
|
||||
...actual,
|
||||
monitorZaloProvider: hoisted.monitorZaloProvider,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./probe.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("./probe.js")>("./probe.js");
|
||||
return {
|
||||
...actual,
|
||||
probeZalo: hoisted.probeZalo,
|
||||
};
|
||||
});
|
||||
|
||||
import { zaloPlugin } from "./channel.js";
|
||||
|
||||
function buildAccount(): ResolvedZaloAccount {
|
||||
return {
|
||||
accountId: "default",
|
||||
enabled: true,
|
||||
token: "test-token",
|
||||
tokenSource: "config",
|
||||
config: {},
|
||||
};
|
||||
}
|
||||
|
||||
describe("zaloPlugin gateway.startAccount", () => {
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("keeps startAccount pending until abort", async () => {
|
||||
hoisted.monitorZaloProvider.mockImplementationOnce(
|
||||
async ({ abortSignal }: { abortSignal: AbortSignal }) =>
|
||||
await new Promise<void>((resolve) => {
|
||||
if (abortSignal.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
abortSignal.addEventListener("abort", () => resolve(), { once: true });
|
||||
}),
|
||||
);
|
||||
|
||||
const patches: ChannelAccountSnapshot[] = [];
|
||||
const abort = new AbortController();
|
||||
const task = zaloPlugin.gateway!.startAccount!(
|
||||
createStartAccountContext({
|
||||
account: buildAccount(),
|
||||
abortSignal: abort.signal,
|
||||
statusPatchSink: (next) => patches.push({ ...next }),
|
||||
}),
|
||||
);
|
||||
|
||||
let settled = false;
|
||||
void task.then(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.probeZalo).toHaveBeenCalledOnce();
|
||||
expect(hoisted.monitorZaloProvider).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
expect(settled).toBe(false);
|
||||
expect(patches).toContainEqual(
|
||||
expect.objectContaining({
|
||||
accountId: "default",
|
||||
}),
|
||||
);
|
||||
|
||||
abort.abort();
|
||||
await task;
|
||||
|
||||
expect(settled).toBe(true);
|
||||
expect(hoisted.monitorZaloProvider).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
token: "test-token",
|
||||
account: expect.objectContaining({ accountId: "default" }),
|
||||
abortSignal: abort.signal,
|
||||
useWebhook: false,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -334,6 +334,7 @@ export const zaloPlugin: ChannelPlugin<ResolvedZaloAccount> = {
|
||||
startAccount: async (ctx) => {
|
||||
const account = ctx.account;
|
||||
const token = account.token.trim();
|
||||
const mode = account.config.webhookUrl ? "webhook" : "polling";
|
||||
let zaloBotLabel = "";
|
||||
const fetcher = resolveZaloProxyFetch(account.config.proxy);
|
||||
try {
|
||||
@@ -342,14 +343,21 @@ export const zaloPlugin: ChannelPlugin<ResolvedZaloAccount> = {
|
||||
if (name) {
|
||||
zaloBotLabel = ` (${name})`;
|
||||
}
|
||||
if (!probe.ok) {
|
||||
ctx.log?.warn?.(
|
||||
`[${account.accountId}] Zalo probe failed before provider start (${String(probe.elapsedMs)}ms): ${probe.error}`,
|
||||
);
|
||||
}
|
||||
ctx.setStatus({
|
||||
accountId: account.accountId,
|
||||
bot: probe.bot,
|
||||
});
|
||||
} catch {
|
||||
// ignore probe errors
|
||||
} catch (err) {
|
||||
ctx.log?.warn?.(
|
||||
`[${account.accountId}] Zalo probe threw before provider start: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`,
|
||||
);
|
||||
}
|
||||
ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel}`);
|
||||
ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel} mode=${mode}`);
|
||||
const { monitorZaloProvider } = await import("./monitor.js");
|
||||
return monitorZaloProvider({
|
||||
token,
|
||||
|
||||
213
extensions/zalo/src/monitor.lifecycle.test.ts
Normal file
213
extensions/zalo/src/monitor.lifecycle.test.ts
Normal file
@@ -0,0 +1,213 @@
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/zalo";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js";
|
||||
import { setActivePluginRegistry } from "../../../src/plugins/runtime.js";
|
||||
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||
|
||||
const getWebhookInfoMock = vi.fn(async () => ({ ok: true, result: { url: "" } }));
|
||||
const deleteWebhookMock = vi.fn(async () => ({ ok: true, result: { url: "" } }));
|
||||
const getUpdatesMock = vi.fn(() => new Promise(() => {}));
|
||||
const setWebhookMock = vi.fn(async () => ({ ok: true, result: { url: "" } }));
|
||||
|
||||
vi.mock("./api.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("./api.js")>();
|
||||
return {
|
||||
...actual,
|
||||
deleteWebhook: deleteWebhookMock,
|
||||
getWebhookInfo: getWebhookInfoMock,
|
||||
getUpdates: getUpdatesMock,
|
||||
setWebhook: setWebhookMock,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./runtime.js", () => ({
|
||||
getZaloRuntime: () => ({
|
||||
logging: {
|
||||
shouldLogVerbose: () => false,
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
async function waitForPollingLoopStart(): Promise<void> {
|
||||
await vi.waitFor(() => expect(getUpdatesMock).toHaveBeenCalledTimes(1));
|
||||
}
|
||||
|
||||
describe("monitorZaloProvider lifecycle", () => {
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks();
|
||||
setActivePluginRegistry(createEmptyPluginRegistry());
|
||||
});
|
||||
|
||||
it("stays alive in polling mode until abort", async () => {
|
||||
const { monitorZaloProvider } = await import("./monitor.js");
|
||||
const abort = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn<(message: string) => void>(),
|
||||
error: vi.fn<(message: string) => void>(),
|
||||
};
|
||||
const account = {
|
||||
accountId: "default",
|
||||
config: {},
|
||||
} as unknown as ResolvedZaloAccount;
|
||||
const config = {} as OpenClawConfig;
|
||||
|
||||
let settled = false;
|
||||
const run = monitorZaloProvider({
|
||||
token: "test-token",
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
abortSignal: abort.signal,
|
||||
}).then(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await waitForPollingLoopStart();
|
||||
|
||||
expect(getWebhookInfoMock).toHaveBeenCalledTimes(1);
|
||||
expect(deleteWebhookMock).not.toHaveBeenCalled();
|
||||
expect(getUpdatesMock).toHaveBeenCalledTimes(1);
|
||||
expect(settled).toBe(false);
|
||||
|
||||
abort.abort();
|
||||
await run;
|
||||
|
||||
expect(settled).toBe(true);
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Zalo provider stopped mode=polling"),
|
||||
);
|
||||
});
|
||||
|
||||
it("deletes an existing webhook before polling", async () => {
|
||||
getWebhookInfoMock.mockResolvedValueOnce({
|
||||
ok: true,
|
||||
result: { url: "https://example.com/hooks/zalo" },
|
||||
});
|
||||
|
||||
const { monitorZaloProvider } = await import("./monitor.js");
|
||||
const abort = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn<(message: string) => void>(),
|
||||
error: vi.fn<(message: string) => void>(),
|
||||
};
|
||||
const account = {
|
||||
accountId: "default",
|
||||
config: {},
|
||||
} as unknown as ResolvedZaloAccount;
|
||||
const config = {} as OpenClawConfig;
|
||||
|
||||
const run = monitorZaloProvider({
|
||||
token: "test-token",
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
abortSignal: abort.signal,
|
||||
});
|
||||
|
||||
await waitForPollingLoopStart();
|
||||
|
||||
expect(getWebhookInfoMock).toHaveBeenCalledTimes(1);
|
||||
expect(deleteWebhookMock).toHaveBeenCalledTimes(1);
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Zalo polling mode ready (webhook disabled)"),
|
||||
);
|
||||
|
||||
abort.abort();
|
||||
await run;
|
||||
});
|
||||
|
||||
it("continues polling when webhook inspection returns 404", async () => {
|
||||
const { ZaloApiError } = await import("./api.js");
|
||||
getWebhookInfoMock.mockRejectedValueOnce(new ZaloApiError("Not Found", 404, "Not Found"));
|
||||
|
||||
const { monitorZaloProvider } = await import("./monitor.js");
|
||||
const abort = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn<(message: string) => void>(),
|
||||
error: vi.fn<(message: string) => void>(),
|
||||
};
|
||||
const account = {
|
||||
accountId: "default",
|
||||
config: {},
|
||||
} as unknown as ResolvedZaloAccount;
|
||||
const config = {} as OpenClawConfig;
|
||||
|
||||
const run = monitorZaloProvider({
|
||||
token: "test-token",
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
abortSignal: abort.signal,
|
||||
});
|
||||
|
||||
await waitForPollingLoopStart();
|
||||
|
||||
expect(getWebhookInfoMock).toHaveBeenCalledTimes(1);
|
||||
expect(deleteWebhookMock).not.toHaveBeenCalled();
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("webhook inspection unavailable; continuing without webhook cleanup"),
|
||||
);
|
||||
expect(runtime.error).not.toHaveBeenCalled();
|
||||
|
||||
abort.abort();
|
||||
await run;
|
||||
});
|
||||
|
||||
it("waits for webhook deletion before finishing webhook shutdown", async () => {
|
||||
const registry = createEmptyPluginRegistry();
|
||||
setActivePluginRegistry(registry);
|
||||
|
||||
let resolveDeleteWebhook: (() => void) | undefined;
|
||||
deleteWebhookMock.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise((resolve) => {
|
||||
resolveDeleteWebhook = () => resolve({ ok: true, result: { url: "" } });
|
||||
}),
|
||||
);
|
||||
|
||||
const { monitorZaloProvider } = await import("./monitor.js");
|
||||
const abort = new AbortController();
|
||||
const runtime = {
|
||||
log: vi.fn<(message: string) => void>(),
|
||||
error: vi.fn<(message: string) => void>(),
|
||||
};
|
||||
const account = {
|
||||
accountId: "default",
|
||||
config: {},
|
||||
} as unknown as ResolvedZaloAccount;
|
||||
const config = {} as OpenClawConfig;
|
||||
|
||||
let settled = false;
|
||||
const run = monitorZaloProvider({
|
||||
token: "test-token",
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
abortSignal: abort.signal,
|
||||
useWebhook: true,
|
||||
webhookUrl: "https://example.com/hooks/zalo",
|
||||
webhookSecret: "supersecret", // pragma: allowlist secret
|
||||
}).then(() => {
|
||||
settled = true;
|
||||
});
|
||||
|
||||
await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1));
|
||||
expect(registry.httpRoutes).toHaveLength(1);
|
||||
|
||||
abort.abort();
|
||||
|
||||
await vi.waitFor(() => expect(deleteWebhookMock).toHaveBeenCalledTimes(1));
|
||||
expect(deleteWebhookMock).toHaveBeenCalledWith("test-token", undefined, 5000);
|
||||
expect(settled).toBe(false);
|
||||
expect(registry.httpRoutes).toHaveLength(1);
|
||||
|
||||
resolveDeleteWebhook?.();
|
||||
await run;
|
||||
|
||||
expect(settled).toBe(true);
|
||||
expect(registry.httpRoutes).toHaveLength(0);
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Zalo provider stopped mode=webhook"),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -5,9 +5,11 @@ import type {
|
||||
OutboundReplyPayload,
|
||||
} from "openclaw/plugin-sdk/zalo";
|
||||
import {
|
||||
createTypingCallbacks,
|
||||
createScopedPairingAccess,
|
||||
createReplyPrefixOptions,
|
||||
issuePairingChallenge,
|
||||
logTypingFailure,
|
||||
resolveDirectDmAuthorizationOutcome,
|
||||
resolveSenderCommandAuthorizationWithRuntime,
|
||||
resolveOutboundMediaUrls,
|
||||
@@ -15,13 +17,16 @@ import {
|
||||
resolveInboundRouteEnvelopeBuilderWithRuntime,
|
||||
sendMediaWithLeadingCaption,
|
||||
resolveWebhookPath,
|
||||
waitForAbortSignal,
|
||||
warnMissingProviderGroupPolicyFallbackOnce,
|
||||
} from "openclaw/plugin-sdk/zalo";
|
||||
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||
import {
|
||||
ZaloApiError,
|
||||
deleteWebhook,
|
||||
getWebhookInfo,
|
||||
getUpdates,
|
||||
sendChatAction,
|
||||
sendMessage,
|
||||
sendPhoto,
|
||||
setWebhook,
|
||||
@@ -64,15 +69,34 @@ export type ZaloMonitorOptions = {
|
||||
statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void;
|
||||
};
|
||||
|
||||
export type ZaloMonitorResult = {
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
const ZALO_TEXT_LIMIT = 2000;
|
||||
const DEFAULT_MEDIA_MAX_MB = 5;
|
||||
const WEBHOOK_CLEANUP_TIMEOUT_MS = 5_000;
|
||||
const ZALO_TYPING_TIMEOUT_MS = 5_000;
|
||||
|
||||
type ZaloCoreRuntime = ReturnType<typeof getZaloRuntime>;
|
||||
|
||||
function formatZaloError(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.stack ?? `${error.name}: ${error.message}`;
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function describeWebhookTarget(rawUrl: string): string {
|
||||
try {
|
||||
const parsed = new URL(rawUrl);
|
||||
return `${parsed.origin}${parsed.pathname}`;
|
||||
} catch {
|
||||
return rawUrl;
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeWebhookUrl(url: string | undefined): string | undefined {
|
||||
const trimmed = url?.trim();
|
||||
return trimmed ? trimmed : undefined;
|
||||
}
|
||||
|
||||
function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void {
|
||||
if (core.logging.shouldLogVerbose()) {
|
||||
runtime.log?.(`[zalo] ${message}`);
|
||||
@@ -151,6 +175,8 @@ function startPollingLoop(params: {
|
||||
} = params;
|
||||
const pollTimeout = 30;
|
||||
|
||||
runtime.log?.(`[${account.accountId}] Zalo polling loop started timeout=${String(pollTimeout)}s`);
|
||||
|
||||
const poll = async () => {
|
||||
if (isStopped() || abortSignal.aborted) {
|
||||
return;
|
||||
@@ -176,7 +202,7 @@ function startPollingLoop(params: {
|
||||
if (err instanceof ZaloApiError && err.isPollingTimeout) {
|
||||
// no updates
|
||||
} else if (!isStopped() && !abortSignal.aborted) {
|
||||
runtime.error?.(`[${account.accountId}] Zalo polling error: ${String(err)}`);
|
||||
runtime.error?.(`[${account.accountId}] Zalo polling error: ${formatZaloError(err)}`);
|
||||
await new Promise((resolve) => setTimeout(resolve, 5000));
|
||||
}
|
||||
}
|
||||
@@ -522,12 +548,35 @@ async function processMessageWithPipeline(params: {
|
||||
channel: "zalo",
|
||||
accountId: account.accountId,
|
||||
});
|
||||
const typingCallbacks = createTypingCallbacks({
|
||||
start: async () => {
|
||||
await sendChatAction(
|
||||
token,
|
||||
{
|
||||
chat_id: chatId,
|
||||
action: "typing",
|
||||
},
|
||||
fetcher,
|
||||
ZALO_TYPING_TIMEOUT_MS,
|
||||
);
|
||||
},
|
||||
onStartError: (err) => {
|
||||
logTypingFailure({
|
||||
log: (message) => logVerbose(core, runtime, message),
|
||||
channel: "zalo",
|
||||
action: "start",
|
||||
target: chatId,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
cfg: config,
|
||||
dispatcherOptions: {
|
||||
...prefixOptions,
|
||||
typingCallbacks,
|
||||
deliver: async (payload) => {
|
||||
await deliverZaloReply({
|
||||
payload,
|
||||
@@ -567,7 +616,6 @@ async function deliverZaloReply(params: {
|
||||
const { payload, token, chatId, runtime, core, config, accountId, statusSink, fetcher } = params;
|
||||
const tableMode = params.tableMode ?? "code";
|
||||
const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode);
|
||||
|
||||
const sentMedia = await sendMediaWithLeadingCaption({
|
||||
mediaUrls: resolveOutboundMediaUrls(payload),
|
||||
caption: text,
|
||||
@@ -597,7 +645,7 @@ async function deliverZaloReply(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise<ZaloMonitorResult> {
|
||||
export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise<void> {
|
||||
const {
|
||||
token,
|
||||
account,
|
||||
@@ -615,78 +663,140 @@ export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise<
|
||||
const core = getZaloRuntime();
|
||||
const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
|
||||
const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy);
|
||||
const mode = useWebhook ? "webhook" : "polling";
|
||||
|
||||
let stopped = false;
|
||||
const stopHandlers: Array<() => void> = [];
|
||||
let cleanupWebhook: (() => Promise<void>) | undefined;
|
||||
|
||||
const stop = () => {
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopped = true;
|
||||
for (const handler of stopHandlers) {
|
||||
handler();
|
||||
}
|
||||
};
|
||||
|
||||
if (useWebhook) {
|
||||
if (!webhookUrl || !webhookSecret) {
|
||||
throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode");
|
||||
}
|
||||
if (!webhookUrl.startsWith("https://")) {
|
||||
throw new Error("Zalo webhook URL must use HTTPS");
|
||||
}
|
||||
if (webhookSecret.length < 8 || webhookSecret.length > 256) {
|
||||
throw new Error("Zalo webhook secret must be 8-256 characters");
|
||||
runtime.log?.(
|
||||
`[${account.accountId}] Zalo provider init mode=${mode} mediaMaxMb=${String(effectiveMediaMaxMb)}`,
|
||||
);
|
||||
|
||||
try {
|
||||
if (useWebhook) {
|
||||
if (!webhookUrl || !webhookSecret) {
|
||||
throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode");
|
||||
}
|
||||
if (!webhookUrl.startsWith("https://")) {
|
||||
throw new Error("Zalo webhook URL must use HTTPS");
|
||||
}
|
||||
if (webhookSecret.length < 8 || webhookSecret.length > 256) {
|
||||
throw new Error("Zalo webhook secret must be 8-256 characters");
|
||||
}
|
||||
|
||||
const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null });
|
||||
if (!path) {
|
||||
throw new Error("Zalo webhookPath could not be derived");
|
||||
}
|
||||
|
||||
runtime.log?.(
|
||||
`[${account.accountId}] Zalo configuring webhook path=${path} target=${describeWebhookTarget(webhookUrl)}`,
|
||||
);
|
||||
await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher);
|
||||
let webhookCleanupPromise: Promise<void> | undefined;
|
||||
cleanupWebhook = async () => {
|
||||
if (!webhookCleanupPromise) {
|
||||
webhookCleanupPromise = (async () => {
|
||||
runtime.log?.(`[${account.accountId}] Zalo stopping; deleting webhook`);
|
||||
try {
|
||||
await deleteWebhook(token, fetcher, WEBHOOK_CLEANUP_TIMEOUT_MS);
|
||||
runtime.log?.(`[${account.accountId}] Zalo webhook deleted`);
|
||||
} catch (err) {
|
||||
const detail =
|
||||
err instanceof Error && err.name === "AbortError"
|
||||
? `timed out after ${String(WEBHOOK_CLEANUP_TIMEOUT_MS)}ms`
|
||||
: formatZaloError(err);
|
||||
runtime.error?.(`[${account.accountId}] Zalo webhook delete failed: ${detail}`);
|
||||
}
|
||||
})();
|
||||
}
|
||||
await webhookCleanupPromise;
|
||||
};
|
||||
runtime.log?.(`[${account.accountId}] Zalo webhook registered path=${path}`);
|
||||
|
||||
const unregister = registerZaloWebhookTarget({
|
||||
token,
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
core,
|
||||
path,
|
||||
secret: webhookSecret,
|
||||
statusSink: (patch) => statusSink?.(patch),
|
||||
mediaMaxMb: effectiveMediaMaxMb,
|
||||
fetcher,
|
||||
});
|
||||
stopHandlers.push(unregister);
|
||||
await waitForAbortSignal(abortSignal);
|
||||
return;
|
||||
}
|
||||
|
||||
const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null });
|
||||
if (!path) {
|
||||
throw new Error("Zalo webhookPath could not be derived");
|
||||
runtime.log?.(`[${account.accountId}] Zalo polling mode: clearing webhook before startup`);
|
||||
try {
|
||||
try {
|
||||
const currentWebhookUrl = normalizeWebhookUrl(
|
||||
(await getWebhookInfo(token, fetcher)).result?.url,
|
||||
);
|
||||
if (!currentWebhookUrl) {
|
||||
runtime.log?.(`[${account.accountId}] Zalo polling mode ready (no webhook configured)`);
|
||||
} else {
|
||||
runtime.log?.(
|
||||
`[${account.accountId}] Zalo polling mode disabling existing webhook ${describeWebhookTarget(currentWebhookUrl)}`,
|
||||
);
|
||||
await deleteWebhook(token, fetcher);
|
||||
runtime.log?.(`[${account.accountId}] Zalo polling mode ready (webhook disabled)`);
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof ZaloApiError && err.errorCode === 404) {
|
||||
// Some Zalo environments do not expose webhook inspection for polling bots.
|
||||
runtime.log?.(
|
||||
`[${account.accountId}] Zalo polling mode webhook inspection unavailable; continuing without webhook cleanup`,
|
||||
);
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
runtime.error?.(
|
||||
`[${account.accountId}] Zalo polling startup could not clear webhook: ${formatZaloError(err)}`,
|
||||
);
|
||||
}
|
||||
|
||||
await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher);
|
||||
|
||||
const unregister = registerZaloWebhookTarget({
|
||||
startPollingLoop({
|
||||
token,
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
core,
|
||||
path,
|
||||
secret: webhookSecret,
|
||||
statusSink: (patch) => statusSink?.(patch),
|
||||
abortSignal,
|
||||
isStopped: () => stopped,
|
||||
mediaMaxMb: effectiveMediaMaxMb,
|
||||
statusSink,
|
||||
fetcher,
|
||||
});
|
||||
stopHandlers.push(unregister);
|
||||
abortSignal.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
void deleteWebhook(token, fetcher).catch(() => {});
|
||||
},
|
||||
{ once: true },
|
||||
|
||||
await waitForAbortSignal(abortSignal);
|
||||
} catch (err) {
|
||||
runtime.error?.(
|
||||
`[${account.accountId}] Zalo provider startup failed mode=${mode}: ${formatZaloError(err)}`,
|
||||
);
|
||||
return { stop };
|
||||
throw err;
|
||||
} finally {
|
||||
await cleanupWebhook?.();
|
||||
stop();
|
||||
runtime.log?.(`[${account.accountId}] Zalo provider stopped mode=${mode}`);
|
||||
}
|
||||
|
||||
try {
|
||||
await deleteWebhook(token, fetcher);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
startPollingLoop({
|
||||
token,
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
core,
|
||||
abortSignal,
|
||||
isStopped: () => stopped,
|
||||
mediaMaxMb: effectiveMediaMaxMb,
|
||||
statusSink,
|
||||
fetcher,
|
||||
});
|
||||
|
||||
return { stop };
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
|
||||
@@ -41,6 +41,8 @@ export type {
|
||||
} from "../channels/plugins/types.js";
|
||||
export type { ChannelPlugin } from "../channels/plugins/types.plugin.js";
|
||||
export { createReplyPrefixOptions } from "../channels/reply-prefix.js";
|
||||
export { logTypingFailure } from "../channels/logging.js";
|
||||
export { createTypingCallbacks } from "../channels/typing.js";
|
||||
export type { OpenClawConfig } from "../config/config.js";
|
||||
export {
|
||||
resolveDefaultGroupPolicy,
|
||||
@@ -56,6 +58,7 @@ export {
|
||||
} from "../config/types.secrets.js";
|
||||
export { buildSecretInputSchema } from "./secret-input-schema.js";
|
||||
export { MarkdownConfigSchema } from "../config/zod-schema.core.js";
|
||||
export { waitForAbortSignal } from "../infra/abort-signal.js";
|
||||
export { createDedupeCache } from "../infra/dedupe.js";
|
||||
export { emptyPluginConfigSchema } from "../plugins/config-schema.js";
|
||||
export type { PluginRuntime } from "../plugins/runtime/types.js";
|
||||
|
||||
Reference in New Issue
Block a user