diff --git a/changelog/fragments/README.md b/changelog/fragments/README.md new file mode 100644 index 00000000000..93bb5b65d70 --- /dev/null +++ b/changelog/fragments/README.md @@ -0,0 +1,13 @@ +# Changelog Fragments + +Use this directory when a PR should not edit `CHANGELOG.md` directly. + +- One fragment file per PR. +- File name recommendation: `pr-.md`. +- Include at least one line with both `#` and `thanks @`. + +Example: + +```md +- Fix LINE monitor lifecycle wait ownership (#27001) (thanks @alice) +``` diff --git a/extensions/line/src/channel.startup.test.ts b/extensions/line/src/channel.startup.test.ts index 11ba80bda12..812636113cb 100644 --- a/extensions/line/src/channel.startup.test.ts +++ b/extensions/line/src/channel.startup.test.ts @@ -129,50 +129,4 @@ describe("linePlugin gateway.startAccount", () => { abort.abort(); await task; }); - - it("stays pending until abort signal fires (no premature exit)", async () => { - const { runtime, monitorLineProvider } = createRuntime(); - setLineRuntime(runtime); - - const abort = new AbortController(); - let resolved = false; - - const task = linePlugin.gateway!.startAccount!( - createStartAccountCtx({ - token: "token", - secret: "secret", - runtime: createRuntimeEnv(), - abortSignal: abort.signal, - }), - ).then(() => { - resolved = true; - }); - - // Allow async internals to flush - await new Promise((r) => setTimeout(r, 50)); - - expect(monitorLineProvider).toHaveBeenCalled(); - expect(resolved).toBe(false); - - abort.abort(); - await task; - expect(resolved).toBe(true); - }); - - it("resolves immediately when abortSignal is already aborted", async () => { - const { runtime } = createRuntime(); - setLineRuntime(runtime); - - const abort = new AbortController(); - abort.abort(); - - await linePlugin.gateway!.startAccount!( - createStartAccountCtx({ - token: "token", - secret: "secret", - runtime: createRuntimeEnv(), - abortSignal: abort.signal, - }), - ); - }); }); diff --git a/extensions/line/src/channel.ts b/extensions/line/src/channel.ts index f37a86aa0c4..1c87ad8e2f3 100644 --- a/extensions/line/src/channel.ts +++ b/extensions/line/src/channel.ts @@ -661,18 +661,6 @@ export const linePlugin: ChannelPlugin = { webhookPath: account.config.webhookPath, }); - // Keep the provider alive until the abort signal fires. Without this, - // the startAccount promise resolves immediately after webhook registration - // and the channel supervisor treats the provider as "exited", triggering an - // auto-restart loop (up to 10 attempts). - await new Promise((resolve) => { - if (ctx.abortSignal.aborted) { - resolve(); - return; - } - ctx.abortSignal.addEventListener("abort", () => resolve(), { once: true }); - }); - return monitor; }, logoutAccount: async ({ accountId, cfg }) => { diff --git a/scripts/pr b/scripts/pr index 90cfe029db0..36ab74972c4 100755 --- a/scripts/pr +++ b/scripts/pr @@ -664,6 +664,61 @@ validate_changelog_entry_for_pr() { echo "changelog validated: found PR #$pr (contributor handle unavailable, skipping thanks check)" } +changed_changelog_fragment_files() { + git diff --name-only origin/main...HEAD -- changelog/fragments | rg '^changelog/fragments/.*\.md$' || true +} + +validate_changelog_fragments_for_pr() { + local pr="$1" + local contrib="$2" + shift 2 + + if [ "$#" -lt 1 ]; then + echo "No changelog fragments provided for validation." + exit 1 + fi + + local pr_pattern + pr_pattern="(#$pr|openclaw#$pr)" + + local added_lines + local file + local all_added_lines="" + for file in "$@"; do + added_lines=$(git diff --unified=0 origin/main...HEAD -- "$file" | awk ' + /^\+\+\+/ { next } + /^\+/ { print substr($0, 2) } + ') + + if [ -z "$added_lines" ]; then + echo "$file is in diff but no added lines were detected." + exit 1 + fi + + all_added_lines=$(printf '%s\n%s\n' "$all_added_lines" "$added_lines") + done + + local with_pr + with_pr=$(printf '%s\n' "$all_added_lines" | rg -in "$pr_pattern" || true) + if [ -z "$with_pr" ]; then + echo "Changelog fragment update must reference PR #$pr (for example, (#$pr))." + exit 1 + fi + + if [ -n "$contrib" ] && [ "$contrib" != "null" ]; then + local with_pr_and_thanks + with_pr_and_thanks=$(printf '%s\n' "$all_added_lines" | rg -in "$pr_pattern" | rg -i "thanks @$contrib" || true) + if [ -z "$with_pr_and_thanks" ]; then + echo "Changelog fragment update must include both PR #$pr and thanks @$contrib on the entry line." + exit 1 + fi + echo "changelog fragments validated: found PR #$pr + thanks @$contrib" + return 0 + fi + + echo "changelog fragments validated: found PR #$pr (contributor handle unavailable, skipping thanks check)" +} + prepare_gates() { local pr="$1" enter_worktree "$pr" false @@ -684,13 +739,30 @@ prepare_gates() { docs_only=true fi - # Enforce workflow policy: every prepared PR must include a changelog update. - if ! printf '%s\n' "$changed_files" | rg -q '^CHANGELOG\.md$'; then - echo "Missing CHANGELOG.md update in PR diff. This workflow requires a changelog entry." + local has_changelog_update=false + if printf '%s\n' "$changed_files" | rg -q '^CHANGELOG\.md$'; then + has_changelog_update=true + fi + local fragment_files + fragment_files=$(changed_changelog_fragment_files) + local has_fragment_update=false + if [ -n "$fragment_files" ]; then + has_fragment_update=true + fi + # Enforce workflow policy: every prepared PR must include either CHANGELOG.md + # or one or more changelog fragments. + if [ "$has_changelog_update" = "false" ] && [ "$has_fragment_update" = "false" ]; then + echo "Missing changelog update. Add CHANGELOG.md changes or changelog/fragments/*.md entry." exit 1 fi local contrib="${PR_AUTHOR:-}" - validate_changelog_entry_for_pr "$pr" "$contrib" + if [ "$has_changelog_update" = "true" ]; then + validate_changelog_entry_for_pr "$pr" "$contrib" + fi + if [ "$has_fragment_update" = "true" ]; then + mapfile -t fragment_file_list <<<"$fragment_files" + validate_changelog_fragments_for_pr "$pr" "$contrib" "${fragment_file_list[@]}" + fi run_quiet_logged "pnpm build" ".local/gates-build.log" pnpm build run_quiet_logged "pnpm check" ".local/gates-check.log" pnpm check diff --git a/src/discord/monitor/gateway-error-guard.test.ts b/src/discord/monitor/gateway-error-guard.test.ts new file mode 100644 index 00000000000..783fcc6a712 --- /dev/null +++ b/src/discord/monitor/gateway-error-guard.test.ts @@ -0,0 +1,33 @@ +import { EventEmitter } from "node:events"; +import { describe, expect, it, vi } from "vitest"; +import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js"; + +describe("attachEarlyGatewayErrorGuard", () => { + it("captures gateway errors until released", () => { + const emitter = new EventEmitter(); + const fallbackErrorListener = vi.fn(); + emitter.on("error", fallbackErrorListener); + const client = { + getPlugin: vi.fn(() => ({ emitter })), + }; + + const guard = attachEarlyGatewayErrorGuard(client as never); + emitter.emit("error", new Error("Fatal Gateway error: 4014")); + expect(guard.pendingErrors).toHaveLength(1); + + guard.release(); + emitter.emit("error", new Error("Fatal Gateway error: 4000")); + expect(guard.pendingErrors).toHaveLength(1); + expect(fallbackErrorListener).toHaveBeenCalledTimes(2); + }); + + it("returns noop guard when gateway emitter is unavailable", () => { + const client = { + getPlugin: vi.fn(() => undefined), + }; + + const guard = attachEarlyGatewayErrorGuard(client as never); + expect(guard.pendingErrors).toEqual([]); + expect(() => guard.release()).not.toThrow(); + }); +}); diff --git a/src/discord/monitor/gateway-error-guard.ts b/src/discord/monitor/gateway-error-guard.ts new file mode 100644 index 00000000000..5cb79753325 --- /dev/null +++ b/src/discord/monitor/gateway-error-guard.ts @@ -0,0 +1,36 @@ +import type { Client } from "@buape/carbon"; +import { getDiscordGatewayEmitter } from "../monitor.gateway.js"; + +export type EarlyGatewayErrorGuard = { + pendingErrors: unknown[]; + release: () => void; +}; + +export function attachEarlyGatewayErrorGuard(client: Client): EarlyGatewayErrorGuard { + const pendingErrors: unknown[] = []; + const gateway = client.getPlugin("gateway"); + const emitter = getDiscordGatewayEmitter(gateway); + if (!emitter) { + return { + pendingErrors, + release: () => {}, + }; + } + + let released = false; + const onGatewayError = (err: unknown) => { + pendingErrors.push(err); + }; + emitter.on("error", onGatewayError); + + return { + pendingErrors, + release: () => { + if (released) { + return; + } + released = true; + emitter.removeListener("error", onGatewayError); + }, + }; +} diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index 2239503a5db..8243da5a246 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -34,7 +34,6 @@ import { createDiscordRetryRunner } from "../../infra/retry-policy.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; import { createNonExitingRuntime, type RuntimeEnv } from "../../runtime.js"; import { resolveDiscordAccount } from "../accounts.js"; -import { getDiscordGatewayEmitter } from "../monitor.gateway.js"; import { fetchDiscordApplicationId } from "../probe.js"; import { normalizeDiscordToken } from "../token.js"; import { createDiscordVoiceCommand } from "../voice/command.js"; @@ -52,6 +51,7 @@ import { } from "./agent-components.js"; import { resolveDiscordSlashCommandConfig } from "./commands.js"; import { createExecApprovalButton, DiscordExecApprovalHandler } from "./exec-approvals.js"; +import { attachEarlyGatewayErrorGuard } from "./gateway-error-guard.js"; import { createDiscordGatewayPlugin } from "./gateway-plugin.js"; import { DiscordMessageListener, @@ -230,33 +230,6 @@ function isDiscordDisallowedIntentsError(err: unknown): boolean { return message.includes(String(DISCORD_DISALLOWED_INTENTS_CODE)); } -type EarlyGatewayErrorGuard = { - pendingErrors: unknown[]; - release: () => void; -}; - -function attachEarlyGatewayErrorGuard(client: Client): EarlyGatewayErrorGuard { - const pendingErrors: unknown[] = []; - const gateway = client.getPlugin("gateway"); - const emitter = getDiscordGatewayEmitter(gateway); - if (!emitter) { - return { - pendingErrors, - release: () => {}, - }; - } - const onGatewayError = (err: unknown) => { - pendingErrors.push(err); - }; - emitter.on("error", onGatewayError); - return { - pendingErrors, - release: () => { - emitter.removeListener("error", onGatewayError); - }, - }; -} - export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const cfg = opts.config ?? loadConfig(); const account = resolveDiscordAccount({ diff --git a/src/infra/abort-signal.test.ts b/src/infra/abort-signal.test.ts new file mode 100644 index 00000000000..be32e0d881a --- /dev/null +++ b/src/infra/abort-signal.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it } from "vitest"; +import { waitForAbortSignal } from "./abort-signal.js"; + +describe("waitForAbortSignal", () => { + it("resolves immediately when signal is missing", async () => { + await expect(waitForAbortSignal(undefined)).resolves.toBeUndefined(); + }); + + it("resolves immediately when signal is already aborted", async () => { + const abort = new AbortController(); + abort.abort(); + await expect(waitForAbortSignal(abort.signal)).resolves.toBeUndefined(); + }); + + it("waits until abort fires", async () => { + const abort = new AbortController(); + let resolved = false; + + const task = waitForAbortSignal(abort.signal).then(() => { + resolved = true; + }); + await Promise.resolve(); + expect(resolved).toBe(false); + + abort.abort(); + await task; + expect(resolved).toBe(true); + }); +}); diff --git a/src/infra/abort-signal.ts b/src/infra/abort-signal.ts new file mode 100644 index 00000000000..77922784eda --- /dev/null +++ b/src/infra/abort-signal.ts @@ -0,0 +1,12 @@ +export async function waitForAbortSignal(signal?: AbortSignal): Promise { + if (!signal || signal.aborted) { + return; + } + await new Promise((resolve) => { + const onAbort = () => { + signal.removeEventListener("abort", onAbort); + resolve(); + }; + signal.addEventListener("abort", onAbort, { once: true }); + }); +} diff --git a/src/line/monitor.lifecycle.test.ts b/src/line/monitor.lifecycle.test.ts new file mode 100644 index 00000000000..635d921e7ad --- /dev/null +++ b/src/line/monitor.lifecycle.test.ts @@ -0,0 +1,92 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../config/config.js"; +import type { RuntimeEnv } from "../runtime.js"; + +const { createLineBotMock, registerPluginHttpRouteMock, unregisterHttpMock } = vi.hoisted(() => ({ + createLineBotMock: vi.fn(() => ({ + account: { accountId: "default" }, + handleWebhook: vi.fn(), + })), + registerPluginHttpRouteMock: vi.fn(), + unregisterHttpMock: vi.fn(), +})); + +vi.mock("./bot.js", () => ({ + createLineBot: createLineBotMock, +})); + +vi.mock("../plugins/http-path.js", () => ({ + normalizePluginHttpPath: (_path: string | undefined, fallback: string) => fallback, +})); + +vi.mock("../plugins/http-registry.js", () => ({ + registerPluginHttpRoute: registerPluginHttpRouteMock, +})); + +vi.mock("./webhook-node.js", () => ({ + createLineNodeWebhookHandler: vi.fn(() => vi.fn()), +})); + +describe("monitorLineProvider lifecycle", () => { + beforeEach(() => { + createLineBotMock.mockClear(); + unregisterHttpMock.mockClear(); + registerPluginHttpRouteMock.mockClear().mockReturnValue(unregisterHttpMock); + }); + + it("waits for abort before resolving", async () => { + const { monitorLineProvider } = await import("./monitor.js"); + const abort = new AbortController(); + let resolved = false; + + const task = monitorLineProvider({ + channelAccessToken: "token", + channelSecret: "secret", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + abortSignal: abort.signal, + }).then((monitor) => { + resolved = true; + return monitor; + }); + + await vi.waitFor(() => expect(registerPluginHttpRouteMock).toHaveBeenCalledTimes(1)); + expect(resolved).toBe(false); + + abort.abort(); + await task; + expect(unregisterHttpMock).toHaveBeenCalledTimes(1); + }); + + it("stops immediately when signal is already aborted", async () => { + const { monitorLineProvider } = await import("./monitor.js"); + const abort = new AbortController(); + abort.abort(); + + await monitorLineProvider({ + channelAccessToken: "token", + channelSecret: "secret", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + abortSignal: abort.signal, + }); + + expect(unregisterHttpMock).toHaveBeenCalledTimes(1); + }); + + it("returns immediately without abort signal and stop is idempotent", async () => { + const { monitorLineProvider } = await import("./monitor.js"); + + const monitor = await monitorLineProvider({ + channelAccessToken: "token", + channelSecret: "secret", + config: {} as OpenClawConfig, + runtime: {} as RuntimeEnv, + }); + + expect(unregisterHttpMock).not.toHaveBeenCalled(); + monitor.stop(); + monitor.stop(); + expect(unregisterHttpMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/line/monitor.ts b/src/line/monitor.ts index 07a995c4eed..49fcc518a3f 100644 --- a/src/line/monitor.ts +++ b/src/line/monitor.ts @@ -4,6 +4,7 @@ import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/pr import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import type { OpenClawConfig } from "../config/config.js"; import { danger, logVerbose } from "../globals.js"; +import { waitForAbortSignal } from "../infra/abort-signal.js"; import { normalizePluginHttpPath } from "../plugins/http-path.js"; import { registerPluginHttpRoute } from "../plugins/http-registry.js"; import type { RuntimeEnv } from "../runtime.js"; @@ -296,7 +297,12 @@ export async function monitorLineProvider( logVerbose(`line: registered webhook handler at ${normalizedPath}`); // Handle abort signal + let stopped = false; const stopHandler = () => { + if (stopped) { + return; + } + stopped = true; logVerbose(`line: stopping provider for account ${resolvedAccountId}`); unregisterHttp(); recordChannelRuntimeState({ @@ -309,7 +315,12 @@ export async function monitorLineProvider( }); }; - abortSignal?.addEventListener("abort", stopHandler); + if (abortSignal?.aborted) { + stopHandler(); + } else if (abortSignal) { + abortSignal.addEventListener("abort", stopHandler, { once: true }); + await waitForAbortSignal(abortSignal); + } return { account: bot.account, diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index 4e59f6c0c6a..5c0df3de6ef 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { monitorTelegramProvider } from "./monitor.js"; type MockCtx = { @@ -160,19 +160,30 @@ vi.mock("../auto-reply/reply.js", () => ({ })); describe("monitorTelegramProvider (grammY)", () => { + let consoleErrorSpy: { mockRestore: () => void } | undefined; + beforeEach(() => { loadConfig.mockReturnValue({ agents: { defaults: { maxConcurrent: 2 } }, channels: { telegram: {} }, }); initSpy.mockClear(); - runSpy.mockClear(); + runSpy.mockReset().mockImplementation(() => + makeRunnerStub({ + task: () => Promise.reject(new Error("runSpy called without explicit test stub")), + }), + ); computeBackoff.mockClear(); sleepWithAbort.mockClear(); startTelegramWebhookSpy.mockClear(); registerUnhandledRejectionHandlerMock.mockClear(); resetUnhandledRejection(); createTelegramBotErrors.length = 0; + consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); + }); + + afterEach(() => { + consoleErrorSpy?.mockRestore(); }); it("processes a DM and sends reply", async () => { diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 579db8ad3a1..06410b74ed1 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -2,6 +2,7 @@ import { type RunOptions, run } from "@grammyjs/runner"; import { resolveAgentMaxConcurrent } from "../config/agent-limits.js"; import type { OpenClawConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; +import { waitForAbortSignal } from "../infra/abort-signal.js"; import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; import { formatErrorMessage } from "../infra/errors.js"; import { formatDurationPrecise } from "../infra/format-time/format-duration.ts"; @@ -172,16 +173,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { abortSignal: opts.abortSignal, publicUrl: opts.webhookUrl, }); - const abortSignal = opts.abortSignal; - if (abortSignal && !abortSignal.aborted) { - await new Promise((resolve) => { - const onAbort = () => { - abortSignal.removeEventListener("abort", onAbort); - resolve(); - }; - abortSignal.addEventListener("abort", onAbort, { once: true }); - }); - } + await waitForAbortSignal(opts.abortSignal); return; }