diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c7ceb4773d..12f087a5f0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Diagnostics/OTEL: add bounded outbound message delivery lifecycle diagnostics and export them as low-cardinality delivery spans/metrics without message body, recipient, room, or media-path data. Thanks @vincentkoc. - Diagnostics/OTEL: emit bounded exec-process diagnostics and export them as `openclaw.exec` spans without exposing command text, working directories, or container identifiers. (#71451) Thanks @vincentkoc and @jlapenna. - Diagnostics/OTEL: support `OPENCLAW_OTEL_PRELOADED=1` so the plugin can reuse an already-registered OpenTelemetry SDK while keeping OpenClaw diagnostic listeners wired. (#71450) Thanks @vincentkoc and @jlapenna. - Control UI: refine the agent Tool Access panel with compact live-tool chips, collapsible tool groups, direct per-tool toggles, and clearer runtime/source provenance. (#71405) Thanks @BunsDev. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 97846da94cc..2872651ff84 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -56ccee3ef8ff3b0ba7e2e765ae631b59254464585d5fef9db7e905f2c4c34ded plugin-sdk-api-baseline.json -39184cf8afaec691f0352d1a113e30a7099b87c0748237a3c7307e903ba24eee plugin-sdk-api-baseline.jsonl +1c8faa44e6ad80aeca7add9793d1dee1b7c552a0220c3dcebd8475b7ecd69342 plugin-sdk-api-baseline.json +6ae517ad38d843fb3453cff8c9a081f1f9b7fa54ee563dcef69524ed7013b57f plugin-sdk-api-baseline.jsonl diff --git a/docs/logging.md b/docs/logging.md index 3680eb9c3ee..6eb5c6d7e88 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -206,6 +206,9 @@ Message flow: - `webhook.error`: webhook handler errors. - `message.queued`: message enqueued for processing. - `message.processed`: outcome + duration + optional error. +- `message.delivery.started`: outbound delivery attempt started. +- `message.delivery.completed`: outbound delivery attempt finished + duration/result count. +- `message.delivery.error`: outbound delivery attempt failed + duration/bounded error category. Queue + session: @@ -345,6 +348,11 @@ Message flow: `openclaw.outcome`) - `openclaw.message.duration_ms` (histogram, attrs: `openclaw.channel`, `openclaw.outcome`) +- `openclaw.message.delivery.started` (counter, attrs: `openclaw.channel`, + `openclaw.delivery.kind`) +- `openclaw.message.delivery.duration_ms` (histogram, attrs: + `openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`, + `openclaw.errorCategory`) Queues + sessions: @@ -390,6 +398,9 @@ Exec: - `openclaw.message.processed` - `openclaw.channel`, `openclaw.outcome`, `openclaw.chatId`, `openclaw.messageId`, `openclaw.reason` +- `openclaw.message.delivery` + - `openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`, + `openclaw.errorCategory`, `openclaw.delivery.result_count` - `openclaw.session.stuck` - `openclaw.state`, `openclaw.ageMs`, `openclaw.queueDepth` diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index 976f64b23bb..30378d9e80f 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -878,6 +878,107 @@ describe("diagnostics-otel service", () => { await service.stop?.(ctx); }); + test("exports message delivery spans and metrics with low-cardinality attributes", async () => { + const service = createDiagnosticsOtelService(); + const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true }); + await service.start(ctx); + + emitDiagnosticEvent({ + type: "message.delivery.started", + channel: "matrix", + deliveryKind: "text", + sessionKey: "session-secret", + }); + emitDiagnosticEvent({ + type: "message.delivery.completed", + channel: "matrix", + deliveryKind: "text", + durationMs: 25, + resultCount: 1, + sessionKey: "session-secret", + }); + emitDiagnosticEvent({ + type: "message.delivery.error", + channel: "discord", + deliveryKind: "media", + durationMs: 40, + errorCategory: "TypeError", + sessionKey: "session-secret", + }); + await flushDiagnosticEvents(); + + expect( + telemetryState.counters.get("openclaw.message.delivery.started")?.add, + ).toHaveBeenCalledWith(1, { + "openclaw.channel": "matrix", + "openclaw.delivery.kind": "text", + }); + expect( + telemetryState.histograms.get("openclaw.message.delivery.duration_ms")?.record, + ).toHaveBeenCalledWith( + 25, + expect.objectContaining({ + "openclaw.channel": "matrix", + "openclaw.delivery.kind": "text", + "openclaw.outcome": "completed", + }), + ); + expect( + telemetryState.histograms.get("openclaw.message.delivery.duration_ms")?.record, + ).toHaveBeenCalledWith( + 40, + expect.objectContaining({ + "openclaw.channel": "discord", + "openclaw.delivery.kind": "media", + "openclaw.outcome": "error", + "openclaw.errorCategory": "TypeError", + }), + ); + + const deliverySpanCalls = telemetryState.tracer.startSpan.mock.calls.filter( + (call) => call[0] === "openclaw.message.delivery", + ); + expect(deliverySpanCalls).toHaveLength(2); + expect(deliverySpanCalls[0]?.[1]).toMatchObject({ + attributes: { + "openclaw.channel": "matrix", + "openclaw.delivery.kind": "text", + "openclaw.outcome": "completed", + "openclaw.delivery.result_count": 1, + }, + startTime: expect.any(Number), + }); + expect(deliverySpanCalls[1]?.[1]).toMatchObject({ + attributes: { + "openclaw.channel": "discord", + "openclaw.delivery.kind": "media", + "openclaw.outcome": "error", + "openclaw.errorCategory": "TypeError", + }, + startTime: expect.any(Number), + }); + for (const call of deliverySpanCalls) { + expect(call[1]).toEqual({ + attributes: expect.not.objectContaining({ + "openclaw.sessionKey": expect.anything(), + "openclaw.messageId": expect.anything(), + "openclaw.conversationId": expect.anything(), + "openclaw.content": expect.anything(), + "openclaw.to": expect.anything(), + }), + startTime: expect.any(Number), + }); + } + const errorSpan = telemetryState.spans.find( + (span) => span.name === "openclaw.message.delivery" && span.setStatus.mock.calls.length > 0, + ); + expect(errorSpan?.setStatus).toHaveBeenCalledWith({ + code: 2, + message: "TypeError", + }); + await service.stop?.(ctx); + }); + test("does not export model or tool content unless capture is explicitly enabled", async () => { const service = createDiagnosticsOtelService(); const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true }); diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 4a63637f1e2..58d1ce0201c 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -59,6 +59,13 @@ type OtelContentCapturePolicy = { systemPrompt: boolean; }; +type MessageDeliveryDiagnosticEvent = Extract< + DiagnosticEventPayload, + { + type: "message.delivery.started" | "message.delivery.completed" | "message.delivery.error"; + } +>; + const NO_CONTENT_CAPTURE: OtelContentCapturePolicy = { inputMessages: false, outputMessages: false, @@ -514,6 +521,20 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { unit: "ms", description: "Message processing duration", }); + const messageDeliveryStartedCounter = meter.createCounter( + "openclaw.message.delivery.started", + { + unit: "1", + description: "Outbound message delivery attempts started", + }, + ); + const messageDeliveryDurationHistogram = meter.createHistogram( + "openclaw.message.delivery.duration_ms", + { + unit: "ms", + description: "Outbound message delivery duration", + }, + ); const queueDepthHistogram = meter.createHistogram("openclaw.queue.depth", { unit: "1", description: "Queue depth on enqueue/dequeue", @@ -861,6 +882,64 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { span.end(); }; + const messageDeliveryAttrs = ( + evt: MessageDeliveryDiagnosticEvent, + ): Record => ({ + "openclaw.channel": evt.channel, + "openclaw.delivery.kind": evt.deliveryKind, + }); + + const recordMessageDeliveryStarted = ( + evt: Extract, + ) => { + messageDeliveryStartedCounter.add(1, messageDeliveryAttrs(evt)); + }; + + const recordMessageDeliveryCompleted = ( + evt: Extract, + ) => { + const attrs = { + ...messageDeliveryAttrs(evt), + "openclaw.outcome": "completed", + }; + messageDeliveryDurationHistogram.record(evt.durationMs, attrs); + if (!tracesEnabled) { + return; + } + const span = spanWithDuration( + "openclaw.message.delivery", + { + ...attrs, + "openclaw.delivery.result_count": evt.resultCount, + }, + evt.durationMs, + { endTimeMs: evt.ts }, + ); + span.end(evt.ts); + }; + + const recordMessageDeliveryError = ( + evt: Extract, + ) => { + const attrs = { + ...messageDeliveryAttrs(evt), + "openclaw.outcome": "error", + "openclaw.errorCategory": lowCardinalityAttr(evt.errorCategory, "other"), + }; + messageDeliveryDurationHistogram.record(evt.durationMs, attrs); + if (!tracesEnabled) { + return; + } + const span = spanWithDuration("openclaw.message.delivery", attrs, evt.durationMs, { + endTimeMs: evt.ts, + }); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: redactSensitiveText(evt.errorCategory), + }); + span.end(evt.ts); + }; + const recordLaneEnqueue = ( evt: Extract, ) => { @@ -1160,6 +1239,15 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { case "message.processed": recordMessageProcessed(evt); return; + case "message.delivery.started": + recordMessageDeliveryStarted(evt); + return; + case "message.delivery.completed": + recordMessageDeliveryCompleted(evt); + return; + case "message.delivery.error": + recordMessageDeliveryError(evt); + return; case "queue.lane.enqueue": recordLaneEnqueue(evt); return; diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index ba0635199ca..cf94d725820 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -84,6 +84,30 @@ export type DiagnosticMessageProcessedEvent = DiagnosticBaseEvent & { error?: string; }; +export type DiagnosticMessageDeliveryKind = "text" | "media" | "edit" | "reaction" | "other"; + +type DiagnosticMessageDeliveryBaseEvent = DiagnosticBaseEvent & { + channel: string; + sessionKey?: string; + deliveryKind: DiagnosticMessageDeliveryKind; +}; + +export type DiagnosticMessageDeliveryStartedEvent = DiagnosticMessageDeliveryBaseEvent & { + type: "message.delivery.started"; +}; + +export type DiagnosticMessageDeliveryCompletedEvent = DiagnosticMessageDeliveryBaseEvent & { + type: "message.delivery.completed"; + durationMs: number; + resultCount: number; +}; + +export type DiagnosticMessageDeliveryErrorEvent = DiagnosticMessageDeliveryBaseEvent & { + type: "message.delivery.error"; + durationMs: number; + errorCategory: string; +}; + export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & { type: "session.state"; sessionKey?: string; @@ -310,6 +334,9 @@ export type DiagnosticEventPayload = | DiagnosticWebhookErrorEvent | DiagnosticMessageQueuedEvent | DiagnosticMessageProcessedEvent + | DiagnosticMessageDeliveryStartedEvent + | DiagnosticMessageDeliveryCompletedEvent + | DiagnosticMessageDeliveryErrorEvent | DiagnosticSessionStateEvent | DiagnosticSessionStuckEvent | DiagnosticLaneEnqueueEvent @@ -352,6 +379,9 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set([ "tool.execution.completed", "tool.execution.error", "exec.process.completed", + "message.delivery.started", + "message.delivery.completed", + "message.delivery.error", "model.call.started", "model.call.completed", "model.call.error", diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 990539e656e..8bc2e980f56 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -14,6 +14,11 @@ import { import type { PluginHookRegistration } from "../../plugins/types.js"; import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js"; import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js"; +import { + onInternalDiagnosticEvent, + resetDiagnosticEventsForTest, + type DiagnosticEventPayload, +} from "../diagnostic-events.js"; import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js"; const mocks = vi.hoisted(() => ({ @@ -209,6 +214,10 @@ async function deliverSingleMatrixForHookTest(params?: { sessionKey?: string }) }); } +function flushDiagnosticEvents() { + return new Promise((resolve) => setImmediate(resolve)); +} + async function runBestEffortPartialFailureDelivery() { const sendMatrix = vi .fn() @@ -251,6 +260,7 @@ describe("deliverOutboundPayloads", () => { }); beforeEach(() => { + resetDiagnosticEventsForTest(); releasePinnedPluginChannelRegistry(); setActivePluginRegistry(defaultRegistry); mocks.appendAssistantMessageToSessionTranscript.mockClear(); @@ -278,10 +288,89 @@ describe("deliverOutboundPayloads", () => { }); afterEach(() => { + resetDiagnosticEventsForTest(); releasePinnedPluginChannelRegistry(); setActivePluginRegistry(emptyRegistry); }); + it("emits bounded delivery diagnostics for successful outbound sends", async () => { + const events: DiagnosticEventPayload[] = []; + const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event)); + const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); + + try { + await deliverOutboundPayloads({ + cfg: matrixChunkConfig, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "secret delivery body" }], + deps: { matrix: sendMatrix }, + session: { key: "session-1" }, + }); + await flushDiagnosticEvents(); + } finally { + unsubscribe(); + } + + const deliveryEvents = events.filter((event) => event.type.startsWith("message.delivery.")); + expect(deliveryEvents).toEqual([ + expect.objectContaining({ + type: "message.delivery.started", + channel: "matrix", + deliveryKind: "text", + sessionKey: "session-1", + }), + expect.objectContaining({ + type: "message.delivery.completed", + channel: "matrix", + deliveryKind: "text", + durationMs: expect.any(Number), + resultCount: 1, + sessionKey: "session-1", + }), + ]); + expect(JSON.stringify(deliveryEvents)).not.toContain("secret delivery body"); + expect(JSON.stringify(deliveryEvents)).not.toContain("!room:example"); + }); + + it("emits bounded delivery diagnostics for outbound send failures", async () => { + const events: DiagnosticEventPayload[] = []; + const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event)); + const sendMatrix = vi + .fn() + .mockRejectedValue(new TypeError("secret delivery body could not send")); + + try { + await deliverOutboundPayloads({ + cfg: matrixChunkConfig, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "secret delivery body" }], + deps: { matrix: sendMatrix }, + bestEffort: true, + session: { key: "session-1" }, + }); + await flushDiagnosticEvents(); + } finally { + unsubscribe(); + } + + const errorEvent = events.find((event) => event.type === "message.delivery.error"); + expect(errorEvent).toEqual( + expect.objectContaining({ + type: "message.delivery.error", + channel: "matrix", + deliveryKind: "text", + durationMs: expect.any(Number), + errorCategory: "TypeError", + sessionKey: "session-1", + }), + ); + expect( + JSON.stringify(events.filter((event) => event.type.startsWith("message.delivery."))), + ).not.toContain("secret delivery body"); + }); + it("keeps requester session channel authoritative for delivery media policy", async () => { const resolveMediaAccessSpy = vi.spyOn( mediaCapabilityModule, diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 5049d8afafe..0724ee5390c 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -28,6 +28,8 @@ import { createSubsystemLogger } from "../../logging/subsystem.js"; import type { OutboundMediaAccess } from "../../media/load-options.js"; import { resolveAgentScopedOutboundMediaAccess } from "../../media/read-capability.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import { diagnosticErrorCategory } from "../diagnostic-error-metadata.js"; +import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diagnostic-events.js"; import { formatErrorMessage } from "../errors.js"; import { throwIfAborted } from "./abort.js"; import type { OutboundDeliveryResult } from "./deliver-types.js"; @@ -369,6 +371,73 @@ type MessageSentEvent = { messageId?: string; }; +function sessionKeyForDeliveryDiagnostics(params: { + mirror?: DeliveryMirror; + session?: OutboundSessionContext; +}): string | undefined { + return params.mirror?.sessionKey ?? params.session?.key ?? params.session?.policyKey; +} + +function deliveryKindForPayload( + payload: ReplyPayload, + payloadSummary: NormalizedOutboundPayload, +): DiagnosticMessageDeliveryKind { + if (payloadSummary.mediaUrls.length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { + return "media"; + } + if (payload.presentation || payload.interactive || payload.channelData || payload.audioAsVoice) { + return "other"; + } + return "text"; +} + +function emitMessageDeliveryStarted(params: { + channel: Exclude; + deliveryKind: DiagnosticMessageDeliveryKind; + sessionKey?: string; +}): void { + emitDiagnosticEvent({ + type: "message.delivery.started", + channel: params.channel, + deliveryKind: params.deliveryKind, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + }); +} + +function emitMessageDeliveryCompleted(params: { + channel: Exclude; + deliveryKind: DiagnosticMessageDeliveryKind; + durationMs: number; + resultCount: number; + sessionKey?: string; +}): void { + emitDiagnosticEvent({ + type: "message.delivery.completed", + channel: params.channel, + deliveryKind: params.deliveryKind, + durationMs: params.durationMs, + resultCount: params.resultCount, + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + }); +} + +function emitMessageDeliveryError(params: { + channel: Exclude; + deliveryKind: DiagnosticMessageDeliveryKind; + durationMs: number; + error: unknown; + sessionKey?: string; +}): void { + emitDiagnosticEvent({ + type: "message.delivery.error", + channel: params.channel, + deliveryKind: params.deliveryKind, + durationMs: params.durationMs, + errorCategory: diagnosticErrorCategory(params.error), + ...(params.sessionKey ? { sessionKey: params.sessionKey } : {}), + }); +} + function normalizeEmptyPayloadForDelivery(payload: ReplyPayload): ReplyPayload | null { const text = typeof payload.text === "string" ? payload.text : ""; if (!text.trim()) { @@ -871,6 +940,7 @@ async function deliverOutboundPayloadsCore( mirrorGroupId, }); const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false; + const diagnosticSessionKey = sessionKeyForDeliveryDiagnostics(params); if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) { log.warn( "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", @@ -883,6 +953,47 @@ async function deliverOutboundPayloadsCore( } for (const payload of normalizedPayloads) { let payloadSummary = buildPayloadSummary(payload); + let deliveryKind: DiagnosticMessageDeliveryKind = "other"; + let deliveryStartedAt = 0; + let deliveryStarted = false; + let deliveryFinished = false; + const startDeliveryDiagnostics = (kind: DiagnosticMessageDeliveryKind) => { + deliveryKind = kind; + deliveryStartedAt = Date.now(); + deliveryStarted = true; + deliveryFinished = false; + emitMessageDeliveryStarted({ + channel, + deliveryKind, + sessionKey: diagnosticSessionKey, + }); + }; + const completeDeliveryDiagnostics = (resultCount: number) => { + if (!deliveryStarted) { + return; + } + deliveryFinished = true; + emitMessageDeliveryCompleted({ + channel, + deliveryKind, + durationMs: Date.now() - deliveryStartedAt, + resultCount, + sessionKey: diagnosticSessionKey, + }); + }; + const errorDeliveryDiagnostics = (err: unknown) => { + if (!deliveryStarted || deliveryFinished) { + return; + } + deliveryFinished = true; + emitMessageDeliveryError({ + channel, + deliveryKind, + durationMs: Date.now() - deliveryStartedAt, + error: err, + sessionKey: diagnosticSessionKey, + }); + }; try { throwIfAborted(abortSignal); @@ -912,6 +1023,7 @@ async function deliverOutboundPayloadsCore( continue; } payloadSummary = buildPayloadSummary(effectivePayload); + startDeliveryDiagnostics(deliveryKindForPayload(effectivePayload, payloadSummary)); params.onPayload?.(payloadSummary); const replyToResolution = resolveCurrentReplyTo(effectivePayload); @@ -955,6 +1067,7 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, results: [delivery], }); + completeDeliveryDiagnostics(1); emitMessageSent({ success: true, content: payloadSummary.hookContent ?? payloadSummary.text, @@ -989,6 +1102,7 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, results: deliveredResults, }); + completeDeliveryDiagnostics(deliveredResults.length); emitMessageSent({ success: results.length > beforeCount, content: payloadSummary.hookContent ?? payloadSummary.text, @@ -1029,6 +1143,7 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, results: deliveredResults, }); + completeDeliveryDiagnostics(deliveredResults.length); emitMessageSent({ success: results.length > beforeCount, content: payloadSummary.hookContent ?? payloadSummary.text, @@ -1070,12 +1185,14 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, results: results.slice(beforeCount), }); + completeDeliveryDiagnostics(results.length - beforeCount); emitMessageSent({ success: true, content: payloadSummary.hookContent ?? payloadSummary.text, messageId: lastMessageId, }); } catch (err) { + errorDeliveryDiagnostics(err); emitMessageSent({ success: false, content: payloadSummary.hookContent ?? payloadSummary.text, diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index c3f3f1e840d..ea209f62261 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -25,11 +25,13 @@ export type DiagnosticStabilityEventRecord = { mode?: string; level?: string; detector?: string; + deliveryKind?: string; toolName?: string; pairedToolName?: string; provider?: string; model?: string; durationMs?: number; + resultCount?: number; commandLength?: number; exitCode?: number; timedOut?: boolean; @@ -204,6 +206,24 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.outcome = event.outcome; assignReasonCode(record, event.reason); break; + case "message.delivery.started": + record.channel = event.channel; + record.deliveryKind = event.deliveryKind; + break; + case "message.delivery.completed": + record.channel = event.channel; + record.deliveryKind = event.deliveryKind; + record.durationMs = event.durationMs; + record.resultCount = event.resultCount; + record.outcome = "completed"; + break; + case "message.delivery.error": + record.channel = event.channel; + record.deliveryKind = event.deliveryKind; + record.durationMs = event.durationMs; + record.outcome = "error"; + assignReasonCode(record, event.errorCategory); + break; case "session.state": record.outcome = event.state; assignReasonCode(record, event.reason); diff --git a/src/logging/diagnostic.test.ts b/src/logging/diagnostic.test.ts index 66000e04eaa..7b5ee6d4443 100644 --- a/src/logging/diagnostic.test.ts +++ b/src/logging/diagnostic.test.ts @@ -14,7 +14,12 @@ import { pruneDiagnosticSessionStates, resetDiagnosticSessionStateForTest, } from "./diagnostic-session-state.js"; -import { getDiagnosticStabilitySnapshot } from "./diagnostic-stability.js"; +import { + getDiagnosticStabilitySnapshot, + resetDiagnosticStabilityRecorderForTest, + startDiagnosticStabilityRecorder, + stopDiagnosticStabilityRecorder, +} from "./diagnostic-stability.js"; import { logSessionStateChange, resetDiagnosticStateForTest, @@ -32,6 +37,10 @@ function createEmitMemorySampleMock() { })); } +function flushDiagnosticEvents() { + return new Promise((resolve) => setImmediate(resolve)); +} + describe("diagnostic session state pruning", () => { beforeEach(() => { vi.useFakeTimers(); @@ -232,3 +241,44 @@ describe("stuck session diagnostics threshold", () => { expect(resolveStuckSessionWarnMs()).toBe(120_000); }); }); + +describe("diagnostic stability snapshots", () => { + beforeEach(() => { + resetDiagnosticEventsForTest(); + resetDiagnosticStabilityRecorderForTest(); + }); + + afterEach(() => { + stopDiagnosticStabilityRecorder(); + resetDiagnosticStabilityRecorderForTest(); + resetDiagnosticEventsForTest(); + }); + + it("records bounded outbound delivery diagnostics without session identifiers", async () => { + startDiagnosticStabilityRecorder(); + + emitDiagnosticEvent({ + type: "message.delivery.error", + channel: "matrix", + deliveryKind: "text", + durationMs: 12, + errorCategory: "TypeError", + sessionKey: "session-secret", + }); + await flushDiagnosticEvents(); + + expect(getDiagnosticStabilitySnapshot({ limit: 10 }).events).toContainEqual( + expect.objectContaining({ + type: "message.delivery.error", + channel: "matrix", + deliveryKind: "text", + durationMs: 12, + outcome: "error", + reason: "TypeError", + }), + ); + const [event] = getDiagnosticStabilitySnapshot({ limit: 10 }).events; + expect(event).not.toHaveProperty("sessionKey"); + expect(event).not.toHaveProperty("sessionId"); + }); +});