diff --git a/extensions/nostr/src/channel.inbound.test.ts b/extensions/nostr/src/channel.inbound.test.ts index 7bdbbb17c53..41119d359f0 100644 --- a/extensions/nostr/src/channel.inbound.test.ts +++ b/extensions/nostr/src/channel.inbound.test.ts @@ -94,13 +94,24 @@ async function startGatewayHarness(params: { const bus = createMockBus(); setNostrRuntime(harness.runtime); mocks.startNostrBus.mockResolvedValueOnce(bus as never); + const abort = new AbortController(); - const cleanup = (await startNostrGatewayAccount( + const task = startNostrGatewayAccount( createStartAccountContext({ account: params.account, cfg: params.cfg, + abortSignal: abort.signal, }), - )) as { stop: () => void }; + ); + await vi.waitFor(() => { + expect(mocks.startNostrBus).toHaveBeenCalledTimes(1); + }); + const cleanup = { + stop: async () => { + abort.abort(); + await task; + }, + }; return { harness, bus, cleanup }; } @@ -143,7 +154,7 @@ describe("nostr inbound gateway path", () => { expect(sendPairingReply).toHaveBeenCalledTimes(1); expect(mockCallArg(sendPairingReply)).toContain("Pairing code:"); - cleanup.stop(); + await cleanup.stop(); }); it("routes allowed DMs through the standard reply pipeline", async () => { @@ -186,6 +197,6 @@ describe("nostr inbound gateway path", () => { expect(ctx?.CommandAuthorized).toBe(true); expect(sendReply).toHaveBeenCalledWith("converted:|a|b|"); - cleanup.stop(); + await cleanup.stop(); }); }); diff --git a/extensions/nostr/src/channel.lifecycle.test.ts b/extensions/nostr/src/channel.lifecycle.test.ts new file mode 100644 index 00000000000..124a6dacbdb --- /dev/null +++ b/extensions/nostr/src/channel.lifecycle.test.ts @@ -0,0 +1,90 @@ +import { + createStartAccountContext, + expectStopPendingUntilAbort, + startAccountAndTrackLifecycle, + waitForStartedMocks, +} from "openclaw/plugin-sdk/channel-test-helpers"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { getActiveNostrBuses, startNostrGatewayAccount } from "./gateway.js"; +import { buildResolvedNostrAccount } from "./test-fixtures.js"; + +const mocks = vi.hoisted(() => ({ + startNostrBus: vi.fn(), +})); + +vi.mock("./nostr-bus.js", () => ({ + DEFAULT_RELAYS: ["wss://relay.example.com"], + startNostrBus: mocks.startNostrBus, +})); + +function createMockBus() { + return { + sendDm: vi.fn(async () => {}), + close: vi.fn(), + getMetrics: vi.fn(() => ({ counters: {} })), + publishProfile: vi.fn(), + getProfileState: vi.fn(async () => null), + }; +} + +describe("nostr gateway lifecycle", () => { + afterEach(() => { + mocks.startNostrBus.mockReset(); + }); + + it("keeps startAccount pending until abort, then closes the bus", async () => { + const bus = createMockBus(); + mocks.startNostrBus.mockResolvedValueOnce(bus as never); + + const { abort, task, isSettled } = startAccountAndTrackLifecycle({ + startAccount: startNostrGatewayAccount, + account: buildResolvedNostrAccount(), + }); + + await expectStopPendingUntilAbort({ + waitForStarted: waitForStartedMocks(mocks.startNostrBus), + isSettled, + abort, + task, + stop: bus.close, + }); + }); + + it("keeps the active bus registered while pending and removes it after abort", async () => { + const bus = createMockBus(); + mocks.startNostrBus.mockResolvedValueOnce(bus as never); + + const { abort, task, isSettled } = startAccountAndTrackLifecycle({ + startAccount: startNostrGatewayAccount, + account: buildResolvedNostrAccount(), + }); + + await vi.waitFor(() => { + expect(getActiveNostrBuses().get("default")).toBe(bus); + }); + expect(isSettled()).toBe(false); + + abort.abort(); + await task; + + expect(bus.close).toHaveBeenCalledOnce(); + expect(getActiveNostrBuses().has("default")).toBe(false); + }); + + it("stops immediately when startAccount receives an already-aborted signal", async () => { + const bus = createMockBus(); + mocks.startNostrBus.mockResolvedValueOnce(bus as never); + const abort = new AbortController(); + abort.abort(); + + await startNostrGatewayAccount( + createStartAccountContext({ + account: buildResolvedNostrAccount(), + abortSignal: abort.signal, + }), + ); + + expect(mocks.startNostrBus).toHaveBeenCalledOnce(); + expect(bus.close).toHaveBeenCalledOnce(); + }); +}); diff --git a/extensions/nostr/src/channel.outbound.test.ts b/extensions/nostr/src/channel.outbound.test.ts index 3cfec175488..7a5eb7ff6b4 100644 --- a/extensions/nostr/src/channel.outbound.test.ts +++ b/extensions/nostr/src/channel.outbound.test.ts @@ -57,12 +57,23 @@ async function startOutboundAccount(accountId?: string) { getProfileState: vi.fn(async () => null), }; mocks.startNostrBus.mockResolvedValueOnce(bus as unknown); + const abort = new AbortController(); - const cleanup = (await startNostrGatewayAccount( + const task = startNostrGatewayAccount( createStartAccountContext({ account: buildResolvedNostrAccount(accountId ? { accountId } : undefined), + abortSignal: abort.signal, }), - )) as { stop: () => void }; + ); + await vi.waitFor(() => { + expect(mocks.startNostrBus).toHaveBeenCalledTimes(1); + }); + const cleanup = { + stop: async () => { + abort.abort(); + await task; + }, + }; return { cleanup, sendDm }; } @@ -96,7 +107,7 @@ describe("nostr outbound cfg threading", () => { expect(mocks.normalizePubkey).toHaveBeenCalledWith("NPUB123"); expect(sendDm).toHaveBeenCalledWith("normalized-npub123", "converted:|a|b|"); - cleanup.stop(); + await cleanup.stop(); }); it("uses the configured defaultAccount when accountId is omitted", async () => { @@ -125,7 +136,7 @@ describe("nostr outbound cfg threading", () => { }); expect(sendDm).toHaveBeenCalledWith("normalized-npub123", "hello"); - cleanup.stop(); + await cleanup.stop(); }); it("backs declared message adapter capabilities with outbound sends", async () => { @@ -158,6 +169,6 @@ describe("nostr outbound cfg threading", () => { }, }); - cleanup.stop(); + await cleanup.stop(); }); }); diff --git a/extensions/nostr/src/gateway.ts b/extensions/nostr/src/gateway.ts index feb8b12c0ed..29c6f2a2df0 100644 --- a/extensions/nostr/src/gateway.ts +++ b/extensions/nostr/src/gateway.ts @@ -5,6 +5,7 @@ import { import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing"; import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import { runStoppablePassiveMonitor } from "openclaw/plugin-sdk/extension-shared"; import { type ChannelOutboundAdapter, type ChannelPlugin } from "./channel-api.js"; import type { MetricEvent, MetricsSnapshot } from "./metrics.js"; import { startNostrBus, type NostrBusHandle } from "./nostr-bus.js"; @@ -143,118 +144,133 @@ export const startNostrGatewayAccount: NostrGatewayStart = async (ctx) => { return "block"; }; - const bus = await startNostrBus({ - accountId: account.accountId, - privateKey: account.privateKey, - relays: account.relays, - authorizeSender: async ({ senderPubkey, reply }) => - await authorizeSender({ senderId: senderPubkey, reply }), - onMessage: async (senderPubkey, text, reply, meta) => { - const resolvedAccess = await resolveInboundAccess(senderPubkey, text); - if (resolvedAccess.senderAccess.decision !== "allow") { - ctx.log?.warn?.( - `[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.senderAccess.reasonCode})`, - ); - return; - } - - const { dispatchInboundDirectDmWithRuntime } = await import("./inbound-direct-dm-runtime.js"); - await dispatchInboundDirectDmWithRuntime({ - cfg: ctx.cfg, - runtime, - channel: "nostr", - channelLabel: "Nostr", + await runStoppablePassiveMonitor({ + abortSignal: ctx.abortSignal, + start: async () => { + const bus = await startNostrBus({ accountId: account.accountId, - peer: { - kind: "direct", - id: senderPubkey, - }, - senderId: senderPubkey, - senderAddress: `nostr:${senderPubkey}`, - recipientAddress: `nostr:${account.publicKey}`, - conversationLabel: senderPubkey, - rawBody: text, - messageId: meta.eventId, - timestamp: meta.createdAt * 1000, - commandAuthorized: resolvedAccess.commandAccess.requested - ? resolvedAccess.commandAccess.authorized - : undefined, - deliver: async (payload) => { - const outboundText = - payload && typeof payload === "object" && "text" in payload - ? ((payload as { text?: string }).text ?? "") - : ""; - if (!outboundText.trim()) { + privateKey: account.privateKey, + relays: account.relays, + authorizeSender: async ({ senderPubkey, reply }) => + await authorizeSender({ senderId: senderPubkey, reply }), + onMessage: async (senderPubkey, text, reply, meta) => { + const resolvedAccess = await resolveInboundAccess(senderPubkey, text); + if (resolvedAccess.senderAccess.decision !== "allow") { + ctx.log?.warn?.( + `[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.senderAccess.reasonCode})`, + ); return; } - const tableMode = runtime.channel.text.resolveMarkdownTableMode({ + + const { dispatchInboundDirectDmWithRuntime } = + await import("./inbound-direct-dm-runtime.js"); + await dispatchInboundDirectDmWithRuntime({ cfg: ctx.cfg, + runtime, channel: "nostr", + channelLabel: "Nostr", accountId: account.accountId, + peer: { + kind: "direct", + id: senderPubkey, + }, + senderId: senderPubkey, + senderAddress: `nostr:${senderPubkey}`, + recipientAddress: `nostr:${account.publicKey}`, + conversationLabel: senderPubkey, + rawBody: text, + messageId: meta.eventId, + timestamp: meta.createdAt * 1000, + commandAuthorized: resolvedAccess.commandAccess.requested + ? resolvedAccess.commandAccess.authorized + : undefined, + deliver: async (payload) => { + const outboundText = + payload && typeof payload === "object" && "text" in payload + ? ((payload as { text?: string }).text ?? "") + : ""; + if (!outboundText.trim()) { + return; + } + const tableMode = runtime.channel.text.resolveMarkdownTableMode({ + cfg: ctx.cfg, + channel: "nostr", + accountId: account.accountId, + }); + await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode)); + }, + onRecordError: (err) => { + ctx.log?.error?.( + `[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`, + ); + }, + onDispatchError: (err, info) => { + ctx.log?.error?.( + `[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`, + ); + }, }); - await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode)); }, - onRecordError: (err) => { - ctx.log?.error?.( - `[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`, - ); + onError: (error, context) => { + ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`); }, - onDispatchError: (err, info) => { - ctx.log?.error?.( - `[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`, - ); + onConnect: (relay) => { + ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`); + }, + onDisconnect: (relay) => { + ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`); + }, + onEose: (relays) => { + ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`); + }, + onMetric: (event: MetricEvent) => { + if (event.name.startsWith("event.rejected.")) { + ctx.log?.debug?.( + `[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`, + ); + } else if (event.name === "relay.circuit_breaker.open") { + ctx.log?.warn?.( + `[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`, + ); + } else if (event.name === "relay.circuit_breaker.close") { + ctx.log?.info?.( + `[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`, + ); + } else if (event.name === "relay.error") { + ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`); + } + if (busHandle) { + metricsSnapshots.set(account.accountId, busHandle.getMetrics()); + } }, }); - }, - onError: (error, context) => { - ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`); - }, - onConnect: (relay) => { - ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`); - }, - onDisconnect: (relay) => { - ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`); - }, - onEose: (relays) => { - ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`); - }, - onMetric: (event: MetricEvent) => { - if (event.name.startsWith("event.rejected.")) { - ctx.log?.debug?.( - `[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`, - ); - } else if (event.name === "relay.circuit_breaker.open") { - ctx.log?.warn?.( - `[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`, - ); - } else if (event.name === "relay.circuit_breaker.close") { - ctx.log?.info?.( - `[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`, - ); - } else if (event.name === "relay.error") { - ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`); - } - if (busHandle) { - metricsSnapshots.set(account.accountId, busHandle.getMetrics()); - } + let stopped = false; + busHandle = bus; + activeBuses.set(account.accountId, bus); + + ctx.log?.info?.( + `[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`, + ); + + return { + stop: () => { + if (stopped) { + return; + } + stopped = true; + bus.close(); + if (busHandle === bus) { + busHandle = null; + } + if (activeBuses.get(account.accountId) === bus) { + activeBuses.delete(account.accountId); + } + metricsSnapshots.delete(account.accountId); + ctx.log?.info?.(`[${account.accountId}] Nostr provider stopped`); + }, + }; }, }); - - busHandle = bus; - activeBuses.set(account.accountId, bus); - - ctx.log?.info?.( - `[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`, - ); - - return { - stop: () => { - bus.close(); - activeBuses.delete(account.accountId); - metricsSnapshots.delete(account.accountId); - ctx.log?.info?.(`[${account.accountId}] Nostr provider stopped`); - }, - }; }; export const nostrPairingTextAdapter = { diff --git a/extensions/nostr/src/nostr-bus.inbound.test.ts b/extensions/nostr/src/nostr-bus.inbound.test.ts index 512aa465204..c10d43fcc17 100644 --- a/extensions/nostr/src/nostr-bus.inbound.test.ts +++ b/extensions/nostr/src/nostr-bus.inbound.test.ts @@ -10,6 +10,8 @@ const mockState = vi.hoisted(() => ({ oneose?: () => void; onclose?: (reason: string[]) => void; } | null, + subscribeMany: vi.fn(), + close: vi.fn(), verifyEvent: vi.fn(() => true), decrypt: vi.fn(() => "plaintext"), publishProfile: vi.fn(async () => ({ @@ -23,14 +25,15 @@ const mockState = vi.hoisted(() => ({ vi.mock("nostr-tools", () => { class MockSimplePool { subscribeMany( - _relays: string[], - _filters: unknown, + relays: string[], + filters: unknown, handlers: { onevent: (event: Record) => void | Promise; oneose?: () => void; onclose?: (reason: string[]) => void; }, ) { + mockState.subscribeMany(relays, filters, handlers); mockState.handlers = handlers; return { close: vi.fn(), @@ -38,6 +41,10 @@ vi.mock("nostr-tools", () => { } publish = vi.fn(async () => {}); + + close(relays: string[]) { + mockState.close(relays); + } } return { @@ -91,6 +98,8 @@ async function emitEvent(event: Record) { describe("startNostrBus inbound guards", () => { beforeEach(() => { mockState.handlers = null; + mockState.subscribeMany.mockClear(); + mockState.close.mockClear(); mockState.verifyEvent.mockClear(); mockState.verifyEvent.mockReturnValue(true); mockState.decrypt.mockClear(); @@ -101,6 +110,40 @@ describe("startNostrBus inbound guards", () => { mockState.handlers = null; }); + it("subscribes to DMs with a single Nostr filter object", async () => { + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + onMessage: vi.fn(async () => {}), + onMetric: () => {}, + }); + + expect(mockState.subscribeMany).toHaveBeenCalledTimes(1); + const filters = mockState.subscribeMany.mock.calls[0]?.[1]; + expect(Array.isArray(filters)).toBe(false); + expect(filters).toMatchObject({ + kinds: [4], + "#p": [BOT_PUBKEY], + since: 0, + }); + + bus.close(); + }); + + it("closes the relay pool when the bus closes", async () => { + const bus = await startNostrBus({ + privateKey: TEST_HEX_PRIVATE_KEY, + relays: ["wss://relay.example"], + onMessage: vi.fn(async () => {}), + onMetric: () => {}, + }); + + bus.close(); + + await vi.waitFor(() => { + expect(mockState.close).toHaveBeenCalledWith(["wss://relay.example"]); + }); + }); + it("checks sender authorization after verify and before decrypt", async () => { const onMessage = vi.fn(async () => {}); const authorizeSender = vi.fn(async () => "block" as const); diff --git a/extensions/nostr/src/nostr-bus.ts b/extensions/nostr/src/nostr-bus.ts index 4ff45c66917..b01480aba1a 100644 --- a/extensions/nostr/src/nostr-bus.ts +++ b/extensions/nostr/src/nostr-bus.ts @@ -614,28 +614,27 @@ export async function startNostrBus(options: NostrBusOptions): Promise[1], - { - onevent: handleEvent, - oneose: () => { - // EOSE handler - called when all stored events have been received - for (const relay of relays) { - metrics.emit("relay.message.eose", 1, { relay }); - } - onEose?.(relays.join(", ")); - }, - onclose: (reason) => { - // Handle subscription close - for (const relay of relays) { - metrics.emit("relay.message.closed", 1, { relay }); - options.onDisconnect?.(relay); - } - onError?.(new Error(`Subscription closed: ${reason.join(", ")}`), "subscription"); - }, + const dmFilter = { kinds: [4], "#p": [pk], since } satisfies Parameters< + typeof pool.subscribeMany + >[1]; + const sub = pool.subscribeMany(relays, dmFilter, { + onevent: handleEvent, + oneose: () => { + // EOSE handler - called when all stored events have been received + for (const relay of relays) { + metrics.emit("relay.message.eose", 1, { relay }); + } + onEose?.(relays.join(", ")); }, - ); + onclose: (reason) => { + // Handle subscription close + for (const relay of relays) { + metrics.emit("relay.message.closed", 1, { relay }); + options.onDisconnect?.(relay); + } + onError?.(new Error(`Subscription closed: ${reason.join(", ")}`), "subscription"); + }, + }); // Public sendDm function const sendDm = async (toPubkey: string, text: string): Promise => { @@ -694,6 +693,7 @@ export async function startNostrBus(options: NostrBusOptions): Promise { sub.close(); + setTimeout(() => pool.close(relays), 0); seen.stop(); perSenderRateLimiter.clear(); globalRateLimiter.clear(); diff --git a/extensions/nostr/src/nostr-profile-import.test.ts b/extensions/nostr/src/nostr-profile-import.test.ts index 44ba43394ba..965e1717363 100644 --- a/extensions/nostr/src/nostr-profile-import.test.ts +++ b/extensions/nostr/src/nostr-profile-import.test.ts @@ -2,14 +2,69 @@ * Tests for Nostr Profile Import */ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, beforeEach, vi } from "vitest"; import type { NostrProfile } from "./config-schema.js"; -import { mergeProfiles } from "./nostr-profile-import.js"; +import { importProfileFromRelays, mergeProfiles } from "./nostr-profile-import.js"; -// Note: importProfileFromRelays requires real network calls or complex mocking -// of nostr-tools SimplePool, so we focus on unit testing mergeProfiles +const mockState = vi.hoisted(() => ({ + subscribeMany: vi.fn(), +})); + +vi.mock("nostr-tools", () => { + class MockSimplePool { + subscribeMany( + relays: string[], + filters: unknown, + handlers: { + onevent: (event: Record) => void; + oneose?: () => void; + onclose?: () => void; + }, + ) { + mockState.subscribeMany(relays, filters, handlers); + queueMicrotask(() => handlers.oneose?.()); + return { + close: vi.fn(), + }; + } + + close = vi.fn(); + } + + return { + SimplePool: MockSimplePool, + verifyEvent: vi.fn(() => true), + }; +}); + +// Mock SimplePool so importProfileFromRelays can assert the relay subscription shape. describe("nostr-profile-import", () => { + beforeEach(() => { + mockState.subscribeMany.mockClear(); + }); + + describe("importProfileFromRelays", () => { + it("subscribes to profiles with a single Nostr filter object", async () => { + const pubkey = "a".repeat(64); + + await importProfileFromRelays({ + pubkey, + relays: ["wss://relay.example"], + timeoutMs: 1, + }); + + expect(mockState.subscribeMany).toHaveBeenCalledTimes(1); + const filters = mockState.subscribeMany.mock.calls[0]?.[1]; + expect(Array.isArray(filters)).toBe(false); + expect(filters).toMatchObject({ + kinds: [0], + authors: [pubkey], + limit: 1, + }); + }); + }); + describe("mergeProfiles", () => { it("returns empty object when both are undefined", () => { const result = mergeProfiles(undefined, undefined); diff --git a/extensions/nostr/src/nostr-profile-import.ts b/extensions/nostr/src/nostr-profile-import.ts index 4600f61d38c..bb4897b0279 100644 --- a/extensions/nostr/src/nostr-profile-import.ts +++ b/extensions/nostr/src/nostr-profile-import.ts @@ -122,33 +122,29 @@ export async function importProfileFromRelays( for (const relay of relays) { relaysQueried.push(relay); - const sub = pool.subscribeMany( - [relay], - [ - { - kinds: [0], - authors: [pubkey], - limit: 1, - }, - ] as unknown as Parameters[1], - { - onevent(event) { - events.push({ event, relay }); - }, - oneose() { - completed++; - if (completed >= relays.length) { - resolve(); - } - }, - onclose() { - completed++; - if (completed >= relays.length) { - resolve(); - } - }, + const profileFilter = { + kinds: [0], + authors: [pubkey], + limit: 1, + } satisfies Parameters[1]; + + const sub = pool.subscribeMany([relay], profileFilter, { + onevent(event) { + events.push({ event, relay }); }, - ); + oneose() { + completed++; + if (completed >= relays.length) { + resolve(); + } + }, + onclose() { + completed++; + if (completed >= relays.length) { + resolve(); + } + }, + }); // Clean up subscription after timeout setTimeout(() => {