mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:00:54 +00:00
fix(nostr): retry inbound events after handler failures
This commit is contained in:
@@ -235,6 +235,32 @@ describe("startNostrBus inbound guards", () => {
|
|||||||
bus.close();
|
bus.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("retries a replayed event after the message handler fails", async () => {
|
||||||
|
const onMessage = vi
|
||||||
|
.fn<(sender: string, plaintext: string) => Promise<void>>()
|
||||||
|
.mockRejectedValueOnce(new Error("boom"))
|
||||||
|
.mockResolvedValueOnce(undefined);
|
||||||
|
const bus = await startNostrBus({
|
||||||
|
privateKey: TEST_HEX_PRIVATE_KEY,
|
||||||
|
onMessage,
|
||||||
|
onMetric: () => {},
|
||||||
|
});
|
||||||
|
|
||||||
|
const event = createEvent({
|
||||||
|
id: "retry-after-handler-failure",
|
||||||
|
});
|
||||||
|
|
||||||
|
await emitEvent(event);
|
||||||
|
await emitEvent(event);
|
||||||
|
|
||||||
|
expect(mockState.verifyEvent).toHaveBeenCalledTimes(2);
|
||||||
|
expect(mockState.decrypt).toHaveBeenCalledTimes(2);
|
||||||
|
expect(onMessage).toHaveBeenCalledTimes(2);
|
||||||
|
expect(bus.getMetrics().eventsProcessed).toBe(1);
|
||||||
|
|
||||||
|
bus.close();
|
||||||
|
});
|
||||||
|
|
||||||
it("does not rate limit an allowed sender while another authorization is still pending", async () => {
|
it("does not rate limit an allowed sender while another authorization is still pending", async () => {
|
||||||
const onMessage = vi.fn(async () => {});
|
const onMessage = vi.fn(async () => {});
|
||||||
let resolveBlocked: ((value: "block") => void) | undefined;
|
let resolveBlocked: ((value: "block") => void) | undefined;
|
||||||
|
|||||||
@@ -617,15 +617,13 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark seen AFTER verify (don't cache invalid IDs)
|
|
||||||
markSeen();
|
|
||||||
|
|
||||||
// Decrypt the message
|
// Decrypt the message
|
||||||
let plaintext: string;
|
let plaintext: string;
|
||||||
try {
|
try {
|
||||||
plaintext = decrypt(sk, event.pubkey, event.content);
|
plaintext = decrypt(sk, event.pubkey, event.content);
|
||||||
metrics.emit("decrypt.success");
|
metrics.emit("decrypt.success");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
markSeen();
|
||||||
metrics.emit("decrypt.failure");
|
metrics.emit("decrypt.failure");
|
||||||
metrics.emit("event.rejected.decrypt_failed");
|
metrics.emit("event.rejected.decrypt_failed");
|
||||||
onError?.(err as Error, `decrypt from ${event.pubkey}`);
|
onError?.(err as Error, `decrypt from ${event.pubkey}`);
|
||||||
@@ -633,6 +631,7 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (Buffer.byteLength(plaintext, "utf8") > guardPolicy.maxPlaintextBytes) {
|
if (Buffer.byteLength(plaintext, "utf8") > guardPolicy.maxPlaintextBytes) {
|
||||||
|
markSeen();
|
||||||
metrics.emit("event.rejected.oversized_plaintext");
|
metrics.emit("event.rejected.oversized_plaintext");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -643,6 +642,9 @@ export async function startNostrBus(options: NostrBusOptions): Promise<NostrBusH
|
|||||||
createdAt: event.created_at,
|
createdAt: event.created_at,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Only cache successful deliveries so handler failures can retry.
|
||||||
|
markSeen();
|
||||||
|
|
||||||
// Mark as processed
|
// Mark as processed
|
||||||
metrics.emit("event.processed");
|
metrics.emit("event.processed");
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user