diff --git a/extensions/zalo/src/api.test.ts b/extensions/zalo/src/api.test.ts new file mode 100644 index 00000000000..00198f5072e --- /dev/null +++ b/extensions/zalo/src/api.test.ts @@ -0,0 +1,63 @@ +import { describe, expect, it, vi } from "vitest"; +import { deleteWebhook, getWebhookInfo, sendChatAction, type ZaloFetch } from "./api.js"; + +describe("Zalo API request methods", () => { + it("uses POST for getWebhookInfo", async () => { + const fetcher = vi.fn( + async () => new Response(JSON.stringify({ ok: true, result: {} })), + ); + + await getWebhookInfo("test-token", fetcher); + + expect(fetcher).toHaveBeenCalledTimes(1); + const [, init] = fetcher.mock.calls[0] ?? []; + expect(init?.method).toBe("POST"); + expect(init?.headers).toEqual({ "Content-Type": "application/json" }); + }); + + it("keeps POST for deleteWebhook", async () => { + const fetcher = vi.fn( + async () => new Response(JSON.stringify({ ok: true, result: {} })), + ); + + await deleteWebhook("test-token", fetcher); + + expect(fetcher).toHaveBeenCalledTimes(1); + const [, init] = fetcher.mock.calls[0] ?? []; + expect(init?.method).toBe("POST"); + expect(init?.headers).toEqual({ "Content-Type": "application/json" }); + }); + + it("aborts sendChatAction when the typing timeout elapses", async () => { + vi.useFakeTimers(); + try { + const fetcher = vi.fn( + (_, init) => + new Promise((_, reject) => { + init?.signal?.addEventListener("abort", () => reject(new Error("aborted")), { + once: true, + }); + }), + ); + + const promise = sendChatAction( + "test-token", + { + chat_id: "chat-123", + action: "typing", + }, + fetcher, + 25, + ); + const rejected = expect(promise).rejects.toThrow("aborted"); + + await vi.advanceTimersByTimeAsync(25); + + await rejected; + const [, init] = fetcher.mock.calls[0] ?? []; + expect(init?.signal?.aborted).toBe(true); + } finally { + vi.useRealTimers(); + } + }); +}); diff --git a/extensions/zalo/src/api.ts b/extensions/zalo/src/api.ts index ad11d5044d5..9bef1ce680e 100644 --- a/extensions/zalo/src/api.ts +++ b/extensions/zalo/src/api.ts @@ -58,11 +58,22 @@ export type ZaloSendPhotoParams = { caption?: string; }; +export type ZaloSendChatActionParams = { + chat_id: string; + action: "typing" | "upload_photo"; +}; + export type ZaloSetWebhookParams = { url: string; secret_token: string; }; +export type ZaloWebhookInfo = { + url?: string; + updated_at?: number; + has_custom_certificate?: boolean; +}; + export type ZaloGetUpdatesParams = { /** Timeout in seconds (passed as string to API) */ timeout?: number; @@ -161,6 +172,21 @@ export async function sendPhoto( return callZaloApi("sendPhoto", token, params, { fetch: fetcher }); } +/** + * Send a temporary chat action such as typing. + */ +export async function sendChatAction( + token: string, + params: ZaloSendChatActionParams, + fetcher?: ZaloFetch, + timeoutMs?: number, +): Promise> { + return callZaloApi("sendChatAction", token, params, { + timeoutMs, + fetch: fetcher, + }); +} + /** * Get updates using long polling (dev/testing only) * Note: Zalo returns a single update per call, not an array like Telegram @@ -183,8 +209,8 @@ export async function setWebhook( token: string, params: ZaloSetWebhookParams, fetcher?: ZaloFetch, -): Promise> { - return callZaloApi("setWebhook", token, params, { fetch: fetcher }); +): Promise> { + return callZaloApi("setWebhook", token, params, { fetch: fetcher }); } /** @@ -193,8 +219,12 @@ export async function setWebhook( export async function deleteWebhook( token: string, fetcher?: ZaloFetch, -): Promise> { - return callZaloApi("deleteWebhook", token, undefined, { fetch: fetcher }); + timeoutMs?: number, +): Promise> { + return callZaloApi("deleteWebhook", token, undefined, { + timeoutMs, + fetch: fetcher, + }); } /** @@ -203,6 +233,6 @@ export async function deleteWebhook( export async function getWebhookInfo( token: string, fetcher?: ZaloFetch, -): Promise> { - return callZaloApi("getWebhookInfo", token, undefined, { fetch: fetcher }); +): Promise> { + return callZaloApi("getWebhookInfo", token, undefined, { fetch: fetcher }); } diff --git a/extensions/zalo/src/channel.startup.test.ts b/extensions/zalo/src/channel.startup.test.ts new file mode 100644 index 00000000000..65e413f0f4f --- /dev/null +++ b/extensions/zalo/src/channel.startup.test.ts @@ -0,0 +1,100 @@ +import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/zalo"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createStartAccountContext } from "../../test-utils/start-account-context.js"; +import type { ResolvedZaloAccount } from "./accounts.js"; + +const hoisted = vi.hoisted(() => ({ + monitorZaloProvider: vi.fn(), + probeZalo: vi.fn(async () => ({ + ok: false as const, + error: "probe failed", + elapsedMs: 1, + })), +})); + +vi.mock("./monitor.js", async () => { + const actual = await vi.importActual("./monitor.js"); + return { + ...actual, + monitorZaloProvider: hoisted.monitorZaloProvider, + }; +}); + +vi.mock("./probe.js", async () => { + const actual = await vi.importActual("./probe.js"); + return { + ...actual, + probeZalo: hoisted.probeZalo, + }; +}); + +import { zaloPlugin } from "./channel.js"; + +function buildAccount(): ResolvedZaloAccount { + return { + accountId: "default", + enabled: true, + token: "test-token", + tokenSource: "config", + config: {}, + }; +} + +describe("zaloPlugin gateway.startAccount", () => { + afterEach(() => { + vi.clearAllMocks(); + }); + + it("keeps startAccount pending until abort", async () => { + hoisted.monitorZaloProvider.mockImplementationOnce( + async ({ abortSignal }: { abortSignal: AbortSignal }) => + await new Promise((resolve) => { + if (abortSignal.aborted) { + resolve(); + return; + } + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }), + ); + + const patches: ChannelAccountSnapshot[] = []; + const abort = new AbortController(); + const task = zaloPlugin.gateway!.startAccount!( + createStartAccountContext({ + account: buildAccount(), + abortSignal: abort.signal, + statusPatchSink: (next) => patches.push({ ...next }), + }), + ); + + let settled = false; + void task.then(() => { + settled = true; + }); + + await vi.waitFor(() => { + expect(hoisted.probeZalo).toHaveBeenCalledOnce(); + expect(hoisted.monitorZaloProvider).toHaveBeenCalledOnce(); + }); + + expect(settled).toBe(false); + expect(patches).toContainEqual( + expect.objectContaining({ + accountId: "default", + }), + ); + + abort.abort(); + await task; + + expect(settled).toBe(true); + expect(hoisted.monitorZaloProvider).toHaveBeenCalledWith( + expect.objectContaining({ + token: "test-token", + account: expect.objectContaining({ accountId: "default" }), + abortSignal: abort.signal, + useWebhook: false, + }), + ); + }); +}); diff --git a/extensions/zalo/src/channel.ts b/extensions/zalo/src/channel.ts index 296f81d765c..e4671bb90c1 100644 --- a/extensions/zalo/src/channel.ts +++ b/extensions/zalo/src/channel.ts @@ -334,6 +334,7 @@ export const zaloPlugin: ChannelPlugin = { startAccount: async (ctx) => { const account = ctx.account; const token = account.token.trim(); + const mode = account.config.webhookUrl ? "webhook" : "polling"; let zaloBotLabel = ""; const fetcher = resolveZaloProxyFetch(account.config.proxy); try { @@ -342,14 +343,21 @@ export const zaloPlugin: ChannelPlugin = { if (name) { zaloBotLabel = ` (${name})`; } + if (!probe.ok) { + ctx.log?.warn?.( + `[${account.accountId}] Zalo probe failed before provider start (${String(probe.elapsedMs)}ms): ${probe.error}`, + ); + } ctx.setStatus({ accountId: account.accountId, bot: probe.bot, }); - } catch { - // ignore probe errors + } catch (err) { + ctx.log?.warn?.( + `[${account.accountId}] Zalo probe threw before provider start: ${err instanceof Error ? (err.stack ?? err.message) : String(err)}`, + ); } - ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel}`); + ctx.log?.info(`[${account.accountId}] starting provider${zaloBotLabel} mode=${mode}`); const { monitorZaloProvider } = await import("./monitor.js"); return monitorZaloProvider({ token, diff --git a/extensions/zalo/src/monitor.lifecycle.test.ts b/extensions/zalo/src/monitor.lifecycle.test.ts new file mode 100644 index 00000000000..6cce789da56 --- /dev/null +++ b/extensions/zalo/src/monitor.lifecycle.test.ts @@ -0,0 +1,213 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk/zalo"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createEmptyPluginRegistry } from "../../../src/plugins/registry.js"; +import { setActivePluginRegistry } from "../../../src/plugins/runtime.js"; +import type { ResolvedZaloAccount } from "./accounts.js"; + +const getWebhookInfoMock = vi.fn(async () => ({ ok: true, result: { url: "" } })); +const deleteWebhookMock = vi.fn(async () => ({ ok: true, result: { url: "" } })); +const getUpdatesMock = vi.fn(() => new Promise(() => {})); +const setWebhookMock = vi.fn(async () => ({ ok: true, result: { url: "" } })); + +vi.mock("./api.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + deleteWebhook: deleteWebhookMock, + getWebhookInfo: getWebhookInfoMock, + getUpdates: getUpdatesMock, + setWebhook: setWebhookMock, + }; +}); + +vi.mock("./runtime.js", () => ({ + getZaloRuntime: () => ({ + logging: { + shouldLogVerbose: () => false, + }, + }), +})); + +async function waitForPollingLoopStart(): Promise { + await vi.waitFor(() => expect(getUpdatesMock).toHaveBeenCalledTimes(1)); +} + +describe("monitorZaloProvider lifecycle", () => { + afterEach(() => { + vi.clearAllMocks(); + setActivePluginRegistry(createEmptyPluginRegistry()); + }); + + it("stays alive in polling mode until abort", async () => { + const { monitorZaloProvider } = await import("./monitor.js"); + const abort = new AbortController(); + const runtime = { + log: vi.fn<(message: string) => void>(), + error: vi.fn<(message: string) => void>(), + }; + const account = { + accountId: "default", + config: {}, + } as unknown as ResolvedZaloAccount; + const config = {} as OpenClawConfig; + + let settled = false; + const run = monitorZaloProvider({ + token: "test-token", + account, + config, + runtime, + abortSignal: abort.signal, + }).then(() => { + settled = true; + }); + + await waitForPollingLoopStart(); + + expect(getWebhookInfoMock).toHaveBeenCalledTimes(1); + expect(deleteWebhookMock).not.toHaveBeenCalled(); + expect(getUpdatesMock).toHaveBeenCalledTimes(1); + expect(settled).toBe(false); + + abort.abort(); + await run; + + expect(settled).toBe(true); + expect(runtime.log).toHaveBeenCalledWith( + expect.stringContaining("Zalo provider stopped mode=polling"), + ); + }); + + it("deletes an existing webhook before polling", async () => { + getWebhookInfoMock.mockResolvedValueOnce({ + ok: true, + result: { url: "https://example.com/hooks/zalo" }, + }); + + const { monitorZaloProvider } = await import("./monitor.js"); + const abort = new AbortController(); + const runtime = { + log: vi.fn<(message: string) => void>(), + error: vi.fn<(message: string) => void>(), + }; + const account = { + accountId: "default", + config: {}, + } as unknown as ResolvedZaloAccount; + const config = {} as OpenClawConfig; + + const run = monitorZaloProvider({ + token: "test-token", + account, + config, + runtime, + abortSignal: abort.signal, + }); + + await waitForPollingLoopStart(); + + expect(getWebhookInfoMock).toHaveBeenCalledTimes(1); + expect(deleteWebhookMock).toHaveBeenCalledTimes(1); + expect(runtime.log).toHaveBeenCalledWith( + expect.stringContaining("Zalo polling mode ready (webhook disabled)"), + ); + + abort.abort(); + await run; + }); + + it("continues polling when webhook inspection returns 404", async () => { + const { ZaloApiError } = await import("./api.js"); + getWebhookInfoMock.mockRejectedValueOnce(new ZaloApiError("Not Found", 404, "Not Found")); + + const { monitorZaloProvider } = await import("./monitor.js"); + const abort = new AbortController(); + const runtime = { + log: vi.fn<(message: string) => void>(), + error: vi.fn<(message: string) => void>(), + }; + const account = { + accountId: "default", + config: {}, + } as unknown as ResolvedZaloAccount; + const config = {} as OpenClawConfig; + + const run = monitorZaloProvider({ + token: "test-token", + account, + config, + runtime, + abortSignal: abort.signal, + }); + + await waitForPollingLoopStart(); + + expect(getWebhookInfoMock).toHaveBeenCalledTimes(1); + expect(deleteWebhookMock).not.toHaveBeenCalled(); + expect(runtime.log).toHaveBeenCalledWith( + expect.stringContaining("webhook inspection unavailable; continuing without webhook cleanup"), + ); + expect(runtime.error).not.toHaveBeenCalled(); + + abort.abort(); + await run; + }); + + it("waits for webhook deletion before finishing webhook shutdown", async () => { + const registry = createEmptyPluginRegistry(); + setActivePluginRegistry(registry); + + let resolveDeleteWebhook: (() => void) | undefined; + deleteWebhookMock.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveDeleteWebhook = () => resolve({ ok: true, result: { url: "" } }); + }), + ); + + const { monitorZaloProvider } = await import("./monitor.js"); + const abort = new AbortController(); + const runtime = { + log: vi.fn<(message: string) => void>(), + error: vi.fn<(message: string) => void>(), + }; + const account = { + accountId: "default", + config: {}, + } as unknown as ResolvedZaloAccount; + const config = {} as OpenClawConfig; + + let settled = false; + const run = monitorZaloProvider({ + token: "test-token", + account, + config, + runtime, + abortSignal: abort.signal, + useWebhook: true, + webhookUrl: "https://example.com/hooks/zalo", + webhookSecret: "supersecret", // pragma: allowlist secret + }).then(() => { + settled = true; + }); + + await vi.waitFor(() => expect(setWebhookMock).toHaveBeenCalledTimes(1)); + expect(registry.httpRoutes).toHaveLength(1); + + abort.abort(); + + await vi.waitFor(() => expect(deleteWebhookMock).toHaveBeenCalledTimes(1)); + expect(deleteWebhookMock).toHaveBeenCalledWith("test-token", undefined, 5000); + expect(settled).toBe(false); + expect(registry.httpRoutes).toHaveLength(1); + + resolveDeleteWebhook?.(); + await run; + + expect(settled).toBe(true); + expect(registry.httpRoutes).toHaveLength(0); + expect(runtime.log).toHaveBeenCalledWith( + expect.stringContaining("Zalo provider stopped mode=webhook"), + ); + }); +}); diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 33692f27bbb..bd1351bd147 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -5,9 +5,11 @@ import type { OutboundReplyPayload, } from "openclaw/plugin-sdk/zalo"; import { + createTypingCallbacks, createScopedPairingAccess, createReplyPrefixOptions, issuePairingChallenge, + logTypingFailure, resolveDirectDmAuthorizationOutcome, resolveSenderCommandAuthorizationWithRuntime, resolveOutboundMediaUrls, @@ -15,13 +17,16 @@ import { resolveInboundRouteEnvelopeBuilderWithRuntime, sendMediaWithLeadingCaption, resolveWebhookPath, + waitForAbortSignal, warnMissingProviderGroupPolicyFallbackOnce, } from "openclaw/plugin-sdk/zalo"; import type { ResolvedZaloAccount } from "./accounts.js"; import { ZaloApiError, deleteWebhook, + getWebhookInfo, getUpdates, + sendChatAction, sendMessage, sendPhoto, setWebhook, @@ -64,15 +69,34 @@ export type ZaloMonitorOptions = { statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; }; -export type ZaloMonitorResult = { - stop: () => void; -}; - const ZALO_TEXT_LIMIT = 2000; const DEFAULT_MEDIA_MAX_MB = 5; +const WEBHOOK_CLEANUP_TIMEOUT_MS = 5_000; +const ZALO_TYPING_TIMEOUT_MS = 5_000; type ZaloCoreRuntime = ReturnType; +function formatZaloError(error: unknown): string { + if (error instanceof Error) { + return error.stack ?? `${error.name}: ${error.message}`; + } + return String(error); +} + +function describeWebhookTarget(rawUrl: string): string { + try { + const parsed = new URL(rawUrl); + return `${parsed.origin}${parsed.pathname}`; + } catch { + return rawUrl; + } +} + +function normalizeWebhookUrl(url: string | undefined): string | undefined { + const trimmed = url?.trim(); + return trimmed ? trimmed : undefined; +} + function logVerbose(core: ZaloCoreRuntime, runtime: ZaloRuntimeEnv, message: string): void { if (core.logging.shouldLogVerbose()) { runtime.log?.(`[zalo] ${message}`); @@ -151,6 +175,8 @@ function startPollingLoop(params: { } = params; const pollTimeout = 30; + runtime.log?.(`[${account.accountId}] Zalo polling loop started timeout=${String(pollTimeout)}s`); + const poll = async () => { if (isStopped() || abortSignal.aborted) { return; @@ -176,7 +202,7 @@ function startPollingLoop(params: { if (err instanceof ZaloApiError && err.isPollingTimeout) { // no updates } else if (!isStopped() && !abortSignal.aborted) { - runtime.error?.(`[${account.accountId}] Zalo polling error: ${String(err)}`); + runtime.error?.(`[${account.accountId}] Zalo polling error: ${formatZaloError(err)}`); await new Promise((resolve) => setTimeout(resolve, 5000)); } } @@ -522,12 +548,35 @@ async function processMessageWithPipeline(params: { channel: "zalo", accountId: account.accountId, }); + const typingCallbacks = createTypingCallbacks({ + start: async () => { + await sendChatAction( + token, + { + chat_id: chatId, + action: "typing", + }, + fetcher, + ZALO_TYPING_TIMEOUT_MS, + ); + }, + onStartError: (err) => { + logTypingFailure({ + log: (message) => logVerbose(core, runtime, message), + channel: "zalo", + action: "start", + target: chatId, + error: err, + }); + }, + }); await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { ...prefixOptions, + typingCallbacks, deliver: async (payload) => { await deliverZaloReply({ payload, @@ -567,7 +616,6 @@ async function deliverZaloReply(params: { const { payload, token, chatId, runtime, core, config, accountId, statusSink, fetcher } = params; const tableMode = params.tableMode ?? "code"; const text = core.channel.text.convertMarkdownTables(payload.text ?? "", tableMode); - const sentMedia = await sendMediaWithLeadingCaption({ mediaUrls: resolveOutboundMediaUrls(payload), caption: text, @@ -597,7 +645,7 @@ async function deliverZaloReply(params: { } } -export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise { +export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise { const { token, account, @@ -615,78 +663,140 @@ export async function monitorZaloProvider(options: ZaloMonitorOptions): Promise< const core = getZaloRuntime(); const effectiveMediaMaxMb = account.config.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; const fetcher = fetcherOverride ?? resolveZaloProxyFetch(account.config.proxy); + const mode = useWebhook ? "webhook" : "polling"; let stopped = false; const stopHandlers: Array<() => void> = []; + let cleanupWebhook: (() => Promise) | undefined; const stop = () => { + if (stopped) { + return; + } stopped = true; for (const handler of stopHandlers) { handler(); } }; - if (useWebhook) { - if (!webhookUrl || !webhookSecret) { - throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode"); - } - if (!webhookUrl.startsWith("https://")) { - throw new Error("Zalo webhook URL must use HTTPS"); - } - if (webhookSecret.length < 8 || webhookSecret.length > 256) { - throw new Error("Zalo webhook secret must be 8-256 characters"); + runtime.log?.( + `[${account.accountId}] Zalo provider init mode=${mode} mediaMaxMb=${String(effectiveMediaMaxMb)}`, + ); + + try { + if (useWebhook) { + if (!webhookUrl || !webhookSecret) { + throw new Error("Zalo webhookUrl and webhookSecret are required for webhook mode"); + } + if (!webhookUrl.startsWith("https://")) { + throw new Error("Zalo webhook URL must use HTTPS"); + } + if (webhookSecret.length < 8 || webhookSecret.length > 256) { + throw new Error("Zalo webhook secret must be 8-256 characters"); + } + + const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null }); + if (!path) { + throw new Error("Zalo webhookPath could not be derived"); + } + + runtime.log?.( + `[${account.accountId}] Zalo configuring webhook path=${path} target=${describeWebhookTarget(webhookUrl)}`, + ); + await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher); + let webhookCleanupPromise: Promise | undefined; + cleanupWebhook = async () => { + if (!webhookCleanupPromise) { + webhookCleanupPromise = (async () => { + runtime.log?.(`[${account.accountId}] Zalo stopping; deleting webhook`); + try { + await deleteWebhook(token, fetcher, WEBHOOK_CLEANUP_TIMEOUT_MS); + runtime.log?.(`[${account.accountId}] Zalo webhook deleted`); + } catch (err) { + const detail = + err instanceof Error && err.name === "AbortError" + ? `timed out after ${String(WEBHOOK_CLEANUP_TIMEOUT_MS)}ms` + : formatZaloError(err); + runtime.error?.(`[${account.accountId}] Zalo webhook delete failed: ${detail}`); + } + })(); + } + await webhookCleanupPromise; + }; + runtime.log?.(`[${account.accountId}] Zalo webhook registered path=${path}`); + + const unregister = registerZaloWebhookTarget({ + token, + account, + config, + runtime, + core, + path, + secret: webhookSecret, + statusSink: (patch) => statusSink?.(patch), + mediaMaxMb: effectiveMediaMaxMb, + fetcher, + }); + stopHandlers.push(unregister); + await waitForAbortSignal(abortSignal); + return; } - const path = resolveWebhookPath({ webhookPath, webhookUrl, defaultPath: null }); - if (!path) { - throw new Error("Zalo webhookPath could not be derived"); + runtime.log?.(`[${account.accountId}] Zalo polling mode: clearing webhook before startup`); + try { + try { + const currentWebhookUrl = normalizeWebhookUrl( + (await getWebhookInfo(token, fetcher)).result?.url, + ); + if (!currentWebhookUrl) { + runtime.log?.(`[${account.accountId}] Zalo polling mode ready (no webhook configured)`); + } else { + runtime.log?.( + `[${account.accountId}] Zalo polling mode disabling existing webhook ${describeWebhookTarget(currentWebhookUrl)}`, + ); + await deleteWebhook(token, fetcher); + runtime.log?.(`[${account.accountId}] Zalo polling mode ready (webhook disabled)`); + } + } catch (err) { + if (err instanceof ZaloApiError && err.errorCode === 404) { + // Some Zalo environments do not expose webhook inspection for polling bots. + runtime.log?.( + `[${account.accountId}] Zalo polling mode webhook inspection unavailable; continuing without webhook cleanup`, + ); + } else { + throw err; + } + } + } catch (err) { + runtime.error?.( + `[${account.accountId}] Zalo polling startup could not clear webhook: ${formatZaloError(err)}`, + ); } - await setWebhook(token, { url: webhookUrl, secret_token: webhookSecret }, fetcher); - - const unregister = registerZaloWebhookTarget({ + startPollingLoop({ token, account, config, runtime, core, - path, - secret: webhookSecret, - statusSink: (patch) => statusSink?.(patch), + abortSignal, + isStopped: () => stopped, mediaMaxMb: effectiveMediaMaxMb, + statusSink, fetcher, }); - stopHandlers.push(unregister); - abortSignal.addEventListener( - "abort", - () => { - void deleteWebhook(token, fetcher).catch(() => {}); - }, - { once: true }, + + await waitForAbortSignal(abortSignal); + } catch (err) { + runtime.error?.( + `[${account.accountId}] Zalo provider startup failed mode=${mode}: ${formatZaloError(err)}`, ); - return { stop }; + throw err; + } finally { + await cleanupWebhook?.(); + stop(); + runtime.log?.(`[${account.accountId}] Zalo provider stopped mode=${mode}`); } - - try { - await deleteWebhook(token, fetcher); - } catch { - // ignore - } - - startPollingLoop({ - token, - account, - config, - runtime, - core, - abortSignal, - isStopped: () => stopped, - mediaMaxMb: effectiveMediaMaxMb, - statusSink, - fetcher, - }); - - return { stop }; } export const __testing = { diff --git a/src/plugin-sdk/zalo.ts b/src/plugin-sdk/zalo.ts index 82f484b4877..2196493009e 100644 --- a/src/plugin-sdk/zalo.ts +++ b/src/plugin-sdk/zalo.ts @@ -41,6 +41,8 @@ export type { } from "../channels/plugins/types.js"; export type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; export { createReplyPrefixOptions } from "../channels/reply-prefix.js"; +export { logTypingFailure } from "../channels/logging.js"; +export { createTypingCallbacks } from "../channels/typing.js"; export type { OpenClawConfig } from "../config/config.js"; export { resolveDefaultGroupPolicy, @@ -56,6 +58,7 @@ export { } from "../config/types.secrets.js"; export { buildSecretInputSchema } from "./secret-input-schema.js"; export { MarkdownConfigSchema } from "../config/zod-schema.core.js"; +export { waitForAbortSignal } from "../infra/abort-signal.js"; export { createDedupeCache } from "../infra/dedupe.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export type { PluginRuntime } from "../plugins/runtime/types.js";