fix(nostr): keep dm subscriptions alive until abort

This commit is contained in:
IWhatsskill
2026-05-28 02:04:34 +02:00
committed by Peter Steinberger
parent 22515eea44
commit 8a76cc3470
8 changed files with 383 additions and 161 deletions

View File

@@ -94,13 +94,24 @@ async function startGatewayHarness(params: {
const bus = createMockBus();
setNostrRuntime(harness.runtime);
mocks.startNostrBus.mockResolvedValueOnce(bus as never);
const abort = new AbortController();
const cleanup = (await startNostrGatewayAccount(
const task = startNostrGatewayAccount(
createStartAccountContext({
account: params.account,
cfg: params.cfg,
abortSignal: abort.signal,
}),
)) as { stop: () => void };
);
await vi.waitFor(() => {
expect(mocks.startNostrBus).toHaveBeenCalledTimes(1);
});
const cleanup = {
stop: async () => {
abort.abort();
await task;
},
};
return { harness, bus, cleanup };
}
@@ -143,7 +154,7 @@ describe("nostr inbound gateway path", () => {
expect(sendPairingReply).toHaveBeenCalledTimes(1);
expect(mockCallArg(sendPairingReply)).toContain("Pairing code:");
cleanup.stop();
await cleanup.stop();
});
it("routes allowed DMs through the standard reply pipeline", async () => {
@@ -186,6 +197,6 @@ describe("nostr inbound gateway path", () => {
expect(ctx?.CommandAuthorized).toBe(true);
expect(sendReply).toHaveBeenCalledWith("converted:|a|b|");
cleanup.stop();
await cleanup.stop();
});
});

View File

@@ -0,0 +1,90 @@
import {
createStartAccountContext,
expectStopPendingUntilAbort,
startAccountAndTrackLifecycle,
waitForStartedMocks,
} from "openclaw/plugin-sdk/channel-test-helpers";
import { afterEach, describe, expect, it, vi } from "vitest";
import { getActiveNostrBuses, startNostrGatewayAccount } from "./gateway.js";
import { buildResolvedNostrAccount } from "./test-fixtures.js";
const mocks = vi.hoisted(() => ({
startNostrBus: vi.fn(),
}));
vi.mock("./nostr-bus.js", () => ({
DEFAULT_RELAYS: ["wss://relay.example.com"],
startNostrBus: mocks.startNostrBus,
}));
function createMockBus() {
return {
sendDm: vi.fn(async () => {}),
close: vi.fn(),
getMetrics: vi.fn(() => ({ counters: {} })),
publishProfile: vi.fn(),
getProfileState: vi.fn(async () => null),
};
}
describe("nostr gateway lifecycle", () => {
afterEach(() => {
mocks.startNostrBus.mockReset();
});
it("keeps startAccount pending until abort, then closes the bus", async () => {
const bus = createMockBus();
mocks.startNostrBus.mockResolvedValueOnce(bus as never);
const { abort, task, isSettled } = startAccountAndTrackLifecycle({
startAccount: startNostrGatewayAccount,
account: buildResolvedNostrAccount(),
});
await expectStopPendingUntilAbort({
waitForStarted: waitForStartedMocks(mocks.startNostrBus),
isSettled,
abort,
task,
stop: bus.close,
});
});
it("keeps the active bus registered while pending and removes it after abort", async () => {
const bus = createMockBus();
mocks.startNostrBus.mockResolvedValueOnce(bus as never);
const { abort, task, isSettled } = startAccountAndTrackLifecycle({
startAccount: startNostrGatewayAccount,
account: buildResolvedNostrAccount(),
});
await vi.waitFor(() => {
expect(getActiveNostrBuses().get("default")).toBe(bus);
});
expect(isSettled()).toBe(false);
abort.abort();
await task;
expect(bus.close).toHaveBeenCalledOnce();
expect(getActiveNostrBuses().has("default")).toBe(false);
});
it("stops immediately when startAccount receives an already-aborted signal", async () => {
const bus = createMockBus();
mocks.startNostrBus.mockResolvedValueOnce(bus as never);
const abort = new AbortController();
abort.abort();
await startNostrGatewayAccount(
createStartAccountContext({
account: buildResolvedNostrAccount(),
abortSignal: abort.signal,
}),
);
expect(mocks.startNostrBus).toHaveBeenCalledOnce();
expect(bus.close).toHaveBeenCalledOnce();
});
});

View File

@@ -57,12 +57,23 @@ async function startOutboundAccount(accountId?: string) {
getProfileState: vi.fn(async () => null),
};
mocks.startNostrBus.mockResolvedValueOnce(bus as unknown);
const abort = new AbortController();
const cleanup = (await startNostrGatewayAccount(
const task = startNostrGatewayAccount(
createStartAccountContext({
account: buildResolvedNostrAccount(accountId ? { accountId } : undefined),
abortSignal: abort.signal,
}),
)) as { stop: () => void };
);
await vi.waitFor(() => {
expect(mocks.startNostrBus).toHaveBeenCalledTimes(1);
});
const cleanup = {
stop: async () => {
abort.abort();
await task;
},
};
return { cleanup, sendDm };
}
@@ -96,7 +107,7 @@ describe("nostr outbound cfg threading", () => {
expect(mocks.normalizePubkey).toHaveBeenCalledWith("NPUB123");
expect(sendDm).toHaveBeenCalledWith("normalized-npub123", "converted:|a|b|");
cleanup.stop();
await cleanup.stop();
});
it("uses the configured defaultAccount when accountId is omitted", async () => {
@@ -125,7 +136,7 @@ describe("nostr outbound cfg threading", () => {
});
expect(sendDm).toHaveBeenCalledWith("normalized-npub123", "hello");
cleanup.stop();
await cleanup.stop();
});
it("backs declared message adapter capabilities with outbound sends", async () => {
@@ -158,6 +169,6 @@ describe("nostr outbound cfg threading", () => {
},
});
cleanup.stop();
await cleanup.stop();
});
});

View File

@@ -5,6 +5,7 @@ import {
import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing";
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { runStoppablePassiveMonitor } from "openclaw/plugin-sdk/extension-shared";
import { type ChannelOutboundAdapter, type ChannelPlugin } from "./channel-api.js";
import type { MetricEvent, MetricsSnapshot } from "./metrics.js";
import { startNostrBus, type NostrBusHandle } from "./nostr-bus.js";
@@ -143,118 +144,133 @@ export const startNostrGatewayAccount: NostrGatewayStart = async (ctx) => {
return "block";
};
const bus = await startNostrBus({
accountId: account.accountId,
privateKey: account.privateKey,
relays: account.relays,
authorizeSender: async ({ senderPubkey, reply }) =>
await authorizeSender({ senderId: senderPubkey, reply }),
onMessage: async (senderPubkey, text, reply, meta) => {
const resolvedAccess = await resolveInboundAccess(senderPubkey, text);
if (resolvedAccess.senderAccess.decision !== "allow") {
ctx.log?.warn?.(
`[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.senderAccess.reasonCode})`,
);
return;
}
const { dispatchInboundDirectDmWithRuntime } = await import("./inbound-direct-dm-runtime.js");
await dispatchInboundDirectDmWithRuntime({
cfg: ctx.cfg,
runtime,
channel: "nostr",
channelLabel: "Nostr",
await runStoppablePassiveMonitor({
abortSignal: ctx.abortSignal,
start: async () => {
const bus = await startNostrBus({
accountId: account.accountId,
peer: {
kind: "direct",
id: senderPubkey,
},
senderId: senderPubkey,
senderAddress: `nostr:${senderPubkey}`,
recipientAddress: `nostr:${account.publicKey}`,
conversationLabel: senderPubkey,
rawBody: text,
messageId: meta.eventId,
timestamp: meta.createdAt * 1000,
commandAuthorized: resolvedAccess.commandAccess.requested
? resolvedAccess.commandAccess.authorized
: undefined,
deliver: async (payload) => {
const outboundText =
payload && typeof payload === "object" && "text" in payload
? ((payload as { text?: string }).text ?? "")
: "";
if (!outboundText.trim()) {
privateKey: account.privateKey,
relays: account.relays,
authorizeSender: async ({ senderPubkey, reply }) =>
await authorizeSender({ senderId: senderPubkey, reply }),
onMessage: async (senderPubkey, text, reply, meta) => {
const resolvedAccess = await resolveInboundAccess(senderPubkey, text);
if (resolvedAccess.senderAccess.decision !== "allow") {
ctx.log?.warn?.(
`[${account.accountId}] dropping Nostr DM after preflight drift (${senderPubkey}, ${resolvedAccess.senderAccess.reasonCode})`,
);
return;
}
const tableMode = runtime.channel.text.resolveMarkdownTableMode({
const { dispatchInboundDirectDmWithRuntime } =
await import("./inbound-direct-dm-runtime.js");
await dispatchInboundDirectDmWithRuntime({
cfg: ctx.cfg,
runtime,
channel: "nostr",
channelLabel: "Nostr",
accountId: account.accountId,
peer: {
kind: "direct",
id: senderPubkey,
},
senderId: senderPubkey,
senderAddress: `nostr:${senderPubkey}`,
recipientAddress: `nostr:${account.publicKey}`,
conversationLabel: senderPubkey,
rawBody: text,
messageId: meta.eventId,
timestamp: meta.createdAt * 1000,
commandAuthorized: resolvedAccess.commandAccess.requested
? resolvedAccess.commandAccess.authorized
: undefined,
deliver: async (payload) => {
const outboundText =
payload && typeof payload === "object" && "text" in payload
? ((payload as { text?: string }).text ?? "")
: "";
if (!outboundText.trim()) {
return;
}
const tableMode = runtime.channel.text.resolveMarkdownTableMode({
cfg: ctx.cfg,
channel: "nostr",
accountId: account.accountId,
});
await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode));
},
onRecordError: (err) => {
ctx.log?.error?.(
`[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`,
);
},
onDispatchError: (err, info) => {
ctx.log?.error?.(
`[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`,
);
},
});
await reply(runtime.channel.text.convertMarkdownTables(outboundText, tableMode));
},
onRecordError: (err) => {
ctx.log?.error?.(
`[${account.accountId}] failed recording Nostr inbound session: ${String(err)}`,
);
onError: (error, context) => {
ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`);
},
onDispatchError: (err, info) => {
ctx.log?.error?.(
`[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`,
);
onConnect: (relay) => {
ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`);
},
onDisconnect: (relay) => {
ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`);
},
onEose: (relays) => {
ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`);
},
onMetric: (event: MetricEvent) => {
if (event.name.startsWith("event.rejected.")) {
ctx.log?.debug?.(
`[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`,
);
} else if (event.name === "relay.circuit_breaker.open") {
ctx.log?.warn?.(
`[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`,
);
} else if (event.name === "relay.circuit_breaker.close") {
ctx.log?.info?.(
`[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`,
);
} else if (event.name === "relay.error") {
ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`);
}
if (busHandle) {
metricsSnapshots.set(account.accountId, busHandle.getMetrics());
}
},
});
},
onError: (error, context) => {
ctx.log?.error?.(`[${account.accountId}] Nostr error (${context}): ${error.message}`);
},
onConnect: (relay) => {
ctx.log?.debug?.(`[${account.accountId}] Connected to relay: ${relay}`);
},
onDisconnect: (relay) => {
ctx.log?.debug?.(`[${account.accountId}] Disconnected from relay: ${relay}`);
},
onEose: (relays) => {
ctx.log?.debug?.(`[${account.accountId}] EOSE received from relays: ${relays}`);
},
onMetric: (event: MetricEvent) => {
if (event.name.startsWith("event.rejected.")) {
ctx.log?.debug?.(
`[${account.accountId}] Metric: ${event.name} ${JSON.stringify(event.labels)}`,
);
} else if (event.name === "relay.circuit_breaker.open") {
ctx.log?.warn?.(
`[${account.accountId}] Circuit breaker opened for relay: ${event.labels?.relay}`,
);
} else if (event.name === "relay.circuit_breaker.close") {
ctx.log?.info?.(
`[${account.accountId}] Circuit breaker closed for relay: ${event.labels?.relay}`,
);
} else if (event.name === "relay.error") {
ctx.log?.debug?.(`[${account.accountId}] Relay error: ${event.labels?.relay}`);
}
if (busHandle) {
metricsSnapshots.set(account.accountId, busHandle.getMetrics());
}
let stopped = false;
busHandle = bus;
activeBuses.set(account.accountId, bus);
ctx.log?.info?.(
`[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`,
);
return {
stop: () => {
if (stopped) {
return;
}
stopped = true;
bus.close();
if (busHandle === bus) {
busHandle = null;
}
if (activeBuses.get(account.accountId) === bus) {
activeBuses.delete(account.accountId);
}
metricsSnapshots.delete(account.accountId);
ctx.log?.info?.(`[${account.accountId}] Nostr provider stopped`);
},
};
},
});
busHandle = bus;
activeBuses.set(account.accountId, bus);
ctx.log?.info?.(
`[${account.accountId}] Nostr provider started, connected to ${account.relays.length} relay(s)`,
);
return {
stop: () => {
bus.close();
activeBuses.delete(account.accountId);
metricsSnapshots.delete(account.accountId);
ctx.log?.info?.(`[${account.accountId}] Nostr provider stopped`);
},
};
};
export const nostrPairingTextAdapter = {

View File

@@ -10,6 +10,8 @@ const mockState = vi.hoisted(() => ({
oneose?: () => void;
onclose?: (reason: string[]) => void;
} | null,
subscribeMany: vi.fn(),
close: vi.fn(),
verifyEvent: vi.fn(() => true),
decrypt: vi.fn(() => "plaintext"),
publishProfile: vi.fn(async () => ({
@@ -23,14 +25,15 @@ const mockState = vi.hoisted(() => ({
vi.mock("nostr-tools", () => {
class MockSimplePool {
subscribeMany(
_relays: string[],
_filters: unknown,
relays: string[],
filters: unknown,
handlers: {
onevent: (event: Record<string, unknown>) => void | Promise<void>;
oneose?: () => void;
onclose?: (reason: string[]) => void;
},
) {
mockState.subscribeMany(relays, filters, handlers);
mockState.handlers = handlers;
return {
close: vi.fn(),
@@ -38,6 +41,10 @@ vi.mock("nostr-tools", () => {
}
publish = vi.fn(async () => {});
close(relays: string[]) {
mockState.close(relays);
}
}
return {
@@ -91,6 +98,8 @@ async function emitEvent(event: Record<string, unknown>) {
describe("startNostrBus inbound guards", () => {
beforeEach(() => {
mockState.handlers = null;
mockState.subscribeMany.mockClear();
mockState.close.mockClear();
mockState.verifyEvent.mockClear();
mockState.verifyEvent.mockReturnValue(true);
mockState.decrypt.mockClear();
@@ -101,6 +110,40 @@ describe("startNostrBus inbound guards", () => {
mockState.handlers = null;
});
it("subscribes to DMs with a single Nostr filter object", async () => {
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
onMessage: vi.fn(async () => {}),
onMetric: () => {},
});
expect(mockState.subscribeMany).toHaveBeenCalledTimes(1);
const filters = mockState.subscribeMany.mock.calls[0]?.[1];
expect(Array.isArray(filters)).toBe(false);
expect(filters).toMatchObject({
kinds: [4],
"#p": [BOT_PUBKEY],
since: 0,
});
bus.close();
});
it("closes the relay pool when the bus closes", async () => {
const bus = await startNostrBus({
privateKey: TEST_HEX_PRIVATE_KEY,
relays: ["wss://relay.example"],
onMessage: vi.fn(async () => {}),
onMetric: () => {},
});
bus.close();
await vi.waitFor(() => {
expect(mockState.close).toHaveBeenCalledWith(["wss://relay.example"]);
});
});
it("checks sender authorization after verify and before decrypt", async () => {
const onMessage = vi.fn(async () => {});
const authorizeSender = vi.fn(async () => "block" as const);

View File

@@ -614,28 +614,27 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
}
}
const sub = pool.subscribeMany(
relays,
[{ kinds: [4], "#p": [pk], since }] as unknown as Parameters<typeof pool.subscribeMany>[1],
{
onevent: handleEvent,
oneose: () => {
// EOSE handler - called when all stored events have been received
for (const relay of relays) {
metrics.emit("relay.message.eose", 1, { relay });
}
onEose?.(relays.join(", "));
},
onclose: (reason) => {
// Handle subscription close
for (const relay of relays) {
metrics.emit("relay.message.closed", 1, { relay });
options.onDisconnect?.(relay);
}
onError?.(new Error(`Subscription closed: ${reason.join(", ")}`), "subscription");
},
const dmFilter = { kinds: [4], "#p": [pk], since } satisfies Parameters<
typeof pool.subscribeMany
>[1];
const sub = pool.subscribeMany(relays, dmFilter, {
onevent: handleEvent,
oneose: () => {
// EOSE handler - called when all stored events have been received
for (const relay of relays) {
metrics.emit("relay.message.eose", 1, { relay });
}
onEose?.(relays.join(", "));
},
);
onclose: (reason) => {
// Handle subscription close
for (const relay of relays) {
metrics.emit("relay.message.closed", 1, { relay });
options.onDisconnect?.(relay);
}
onError?.(new Error(`Subscription closed: ${reason.join(", ")}`), "subscription");
},
});
// Public sendDm function
const sendDm = async (toPubkey: string, text: string): Promise<void> => {
@@ -694,6 +693,7 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
return {
close: () => {
sub.close();
setTimeout(() => pool.close(relays), 0);
seen.stop();
perSenderRateLimiter.clear();
globalRateLimiter.clear();

View File

@@ -2,14 +2,69 @@
* Tests for Nostr Profile Import
*/
import { describe, it, expect } from "vitest";
import { describe, it, expect, beforeEach, vi } from "vitest";
import type { NostrProfile } from "./config-schema.js";
import { mergeProfiles } from "./nostr-profile-import.js";
import { importProfileFromRelays, mergeProfiles } from "./nostr-profile-import.js";
// Note: importProfileFromRelays requires real network calls or complex mocking
// of nostr-tools SimplePool, so we focus on unit testing mergeProfiles
const mockState = vi.hoisted(() => ({
subscribeMany: vi.fn(),
}));
vi.mock("nostr-tools", () => {
class MockSimplePool {
subscribeMany(
relays: string[],
filters: unknown,
handlers: {
onevent: (event: Record<string, unknown>) => void;
oneose?: () => void;
onclose?: () => void;
},
) {
mockState.subscribeMany(relays, filters, handlers);
queueMicrotask(() => handlers.oneose?.());
return {
close: vi.fn(),
};
}
close = vi.fn();
}
return {
SimplePool: MockSimplePool,
verifyEvent: vi.fn(() => true),
};
});
// Mock SimplePool so importProfileFromRelays can assert the relay subscription shape.
describe("nostr-profile-import", () => {
beforeEach(() => {
mockState.subscribeMany.mockClear();
});
describe("importProfileFromRelays", () => {
it("subscribes to profiles with a single Nostr filter object", async () => {
const pubkey = "a".repeat(64);
await importProfileFromRelays({
pubkey,
relays: ["wss://relay.example"],
timeoutMs: 1,
});
expect(mockState.subscribeMany).toHaveBeenCalledTimes(1);
const filters = mockState.subscribeMany.mock.calls[0]?.[1];
expect(Array.isArray(filters)).toBe(false);
expect(filters).toMatchObject({
kinds: [0],
authors: [pubkey],
limit: 1,
});
});
});
describe("mergeProfiles", () => {
it("returns empty object when both are undefined", () => {
const result = mergeProfiles(undefined, undefined);

View File

@@ -122,33 +122,29 @@ export async function importProfileFromRelays(
for (const relay of relays) {
relaysQueried.push(relay);
const sub = pool.subscribeMany(
[relay],
[
{
kinds: [0],
authors: [pubkey],
limit: 1,
},
] as unknown as Parameters<typeof pool.subscribeMany>[1],
{
onevent(event) {
events.push({ event, relay });
},
oneose() {
completed++;
if (completed >= relays.length) {
resolve();
}
},
onclose() {
completed++;
if (completed >= relays.length) {
resolve();
}
},
const profileFilter = {
kinds: [0],
authors: [pubkey],
limit: 1,
} satisfies Parameters<typeof pool.subscribeMany>[1];
const sub = pool.subscribeMany([relay], profileFilter, {
onevent(event) {
events.push({ event, relay });
},
);
oneose() {
completed++;
if (completed >= relays.length) {
resolve();
}
},
onclose() {
completed++;
if (completed >= relays.length) {
resolve();
}
},
});
// Clean up subscription after timeout
setTimeout(() => {