mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
feat(diagnostics): add outbound delivery lifecycle events
Add bounded outbound message delivery lifecycle diagnostics and OTEL export without message body, recipient, room, media path, or raw channel result data.
This commit is contained in:
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Changes
|
||||
|
||||
- Diagnostics/OTEL: add bounded outbound message delivery lifecycle diagnostics and export them as low-cardinality delivery spans/metrics without message body, recipient, room, or media-path data. Thanks @vincentkoc.
|
||||
- Diagnostics/OTEL: emit bounded exec-process diagnostics and export them as `openclaw.exec` spans without exposing command text, working directories, or container identifiers. (#71451) Thanks @vincentkoc and @jlapenna.
|
||||
- Diagnostics/OTEL: support `OPENCLAW_OTEL_PRELOADED=1` so the plugin can reuse an already-registered OpenTelemetry SDK while keeping OpenClaw diagnostic listeners wired. (#71450) Thanks @vincentkoc and @jlapenna.
|
||||
- Control UI: refine the agent Tool Access panel with compact live-tool chips, collapsible tool groups, direct per-tool toggles, and clearer runtime/source provenance. (#71405) Thanks @BunsDev.
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
56ccee3ef8ff3b0ba7e2e765ae631b59254464585d5fef9db7e905f2c4c34ded plugin-sdk-api-baseline.json
|
||||
39184cf8afaec691f0352d1a113e30a7099b87c0748237a3c7307e903ba24eee plugin-sdk-api-baseline.jsonl
|
||||
1c8faa44e6ad80aeca7add9793d1dee1b7c552a0220c3dcebd8475b7ecd69342 plugin-sdk-api-baseline.json
|
||||
6ae517ad38d843fb3453cff8c9a081f1f9b7fa54ee563dcef69524ed7013b57f plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -206,6 +206,9 @@ Message flow:
|
||||
- `webhook.error`: webhook handler errors.
|
||||
- `message.queued`: message enqueued for processing.
|
||||
- `message.processed`: outcome + duration + optional error.
|
||||
- `message.delivery.started`: outbound delivery attempt started.
|
||||
- `message.delivery.completed`: outbound delivery attempt finished + duration/result count.
|
||||
- `message.delivery.error`: outbound delivery attempt failed + duration/bounded error category.
|
||||
|
||||
Queue + session:
|
||||
|
||||
@@ -345,6 +348,11 @@ Message flow:
|
||||
`openclaw.outcome`)
|
||||
- `openclaw.message.duration_ms` (histogram, attrs: `openclaw.channel`,
|
||||
`openclaw.outcome`)
|
||||
- `openclaw.message.delivery.started` (counter, attrs: `openclaw.channel`,
|
||||
`openclaw.delivery.kind`)
|
||||
- `openclaw.message.delivery.duration_ms` (histogram, attrs:
|
||||
`openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`,
|
||||
`openclaw.errorCategory`)
|
||||
|
||||
Queues + sessions:
|
||||
|
||||
@@ -390,6 +398,9 @@ Exec:
|
||||
- `openclaw.message.processed`
|
||||
- `openclaw.channel`, `openclaw.outcome`, `openclaw.chatId`,
|
||||
`openclaw.messageId`, `openclaw.reason`
|
||||
- `openclaw.message.delivery`
|
||||
- `openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`,
|
||||
`openclaw.errorCategory`, `openclaw.delivery.result_count`
|
||||
- `openclaw.session.stuck`
|
||||
- `openclaw.state`, `openclaw.ageMs`, `openclaw.queueDepth`
|
||||
|
||||
|
||||
@@ -878,6 +878,107 @@ describe("diagnostics-otel service", () => {
|
||||
await service.stop?.(ctx);
|
||||
});
|
||||
|
||||
test("exports message delivery spans and metrics with low-cardinality attributes", async () => {
|
||||
const service = createDiagnosticsOtelService();
|
||||
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true });
|
||||
await service.start(ctx);
|
||||
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.started",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
sessionKey: "session-secret",
|
||||
});
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.completed",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
durationMs: 25,
|
||||
resultCount: 1,
|
||||
sessionKey: "session-secret",
|
||||
});
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.error",
|
||||
channel: "discord",
|
||||
deliveryKind: "media",
|
||||
durationMs: 40,
|
||||
errorCategory: "TypeError",
|
||||
sessionKey: "session-secret",
|
||||
});
|
||||
await flushDiagnosticEvents();
|
||||
|
||||
expect(
|
||||
telemetryState.counters.get("openclaw.message.delivery.started")?.add,
|
||||
).toHaveBeenCalledWith(1, {
|
||||
"openclaw.channel": "matrix",
|
||||
"openclaw.delivery.kind": "text",
|
||||
});
|
||||
expect(
|
||||
telemetryState.histograms.get("openclaw.message.delivery.duration_ms")?.record,
|
||||
).toHaveBeenCalledWith(
|
||||
25,
|
||||
expect.objectContaining({
|
||||
"openclaw.channel": "matrix",
|
||||
"openclaw.delivery.kind": "text",
|
||||
"openclaw.outcome": "completed",
|
||||
}),
|
||||
);
|
||||
expect(
|
||||
telemetryState.histograms.get("openclaw.message.delivery.duration_ms")?.record,
|
||||
).toHaveBeenCalledWith(
|
||||
40,
|
||||
expect.objectContaining({
|
||||
"openclaw.channel": "discord",
|
||||
"openclaw.delivery.kind": "media",
|
||||
"openclaw.outcome": "error",
|
||||
"openclaw.errorCategory": "TypeError",
|
||||
}),
|
||||
);
|
||||
|
||||
const deliverySpanCalls = telemetryState.tracer.startSpan.mock.calls.filter(
|
||||
(call) => call[0] === "openclaw.message.delivery",
|
||||
);
|
||||
expect(deliverySpanCalls).toHaveLength(2);
|
||||
expect(deliverySpanCalls[0]?.[1]).toMatchObject({
|
||||
attributes: {
|
||||
"openclaw.channel": "matrix",
|
||||
"openclaw.delivery.kind": "text",
|
||||
"openclaw.outcome": "completed",
|
||||
"openclaw.delivery.result_count": 1,
|
||||
},
|
||||
startTime: expect.any(Number),
|
||||
});
|
||||
expect(deliverySpanCalls[1]?.[1]).toMatchObject({
|
||||
attributes: {
|
||||
"openclaw.channel": "discord",
|
||||
"openclaw.delivery.kind": "media",
|
||||
"openclaw.outcome": "error",
|
||||
"openclaw.errorCategory": "TypeError",
|
||||
},
|
||||
startTime: expect.any(Number),
|
||||
});
|
||||
for (const call of deliverySpanCalls) {
|
||||
expect(call[1]).toEqual({
|
||||
attributes: expect.not.objectContaining({
|
||||
"openclaw.sessionKey": expect.anything(),
|
||||
"openclaw.messageId": expect.anything(),
|
||||
"openclaw.conversationId": expect.anything(),
|
||||
"openclaw.content": expect.anything(),
|
||||
"openclaw.to": expect.anything(),
|
||||
}),
|
||||
startTime: expect.any(Number),
|
||||
});
|
||||
}
|
||||
const errorSpan = telemetryState.spans.find(
|
||||
(span) => span.name === "openclaw.message.delivery" && span.setStatus.mock.calls.length > 0,
|
||||
);
|
||||
expect(errorSpan?.setStatus).toHaveBeenCalledWith({
|
||||
code: 2,
|
||||
message: "TypeError",
|
||||
});
|
||||
await service.stop?.(ctx);
|
||||
});
|
||||
|
||||
test("does not export model or tool content unless capture is explicitly enabled", async () => {
|
||||
const service = createDiagnosticsOtelService();
|
||||
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true });
|
||||
|
||||
@@ -59,6 +59,13 @@ type OtelContentCapturePolicy = {
|
||||
systemPrompt: boolean;
|
||||
};
|
||||
|
||||
type MessageDeliveryDiagnosticEvent = Extract<
|
||||
DiagnosticEventPayload,
|
||||
{
|
||||
type: "message.delivery.started" | "message.delivery.completed" | "message.delivery.error";
|
||||
}
|
||||
>;
|
||||
|
||||
const NO_CONTENT_CAPTURE: OtelContentCapturePolicy = {
|
||||
inputMessages: false,
|
||||
outputMessages: false,
|
||||
@@ -514,6 +521,20 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
unit: "ms",
|
||||
description: "Message processing duration",
|
||||
});
|
||||
const messageDeliveryStartedCounter = meter.createCounter(
|
||||
"openclaw.message.delivery.started",
|
||||
{
|
||||
unit: "1",
|
||||
description: "Outbound message delivery attempts started",
|
||||
},
|
||||
);
|
||||
const messageDeliveryDurationHistogram = meter.createHistogram(
|
||||
"openclaw.message.delivery.duration_ms",
|
||||
{
|
||||
unit: "ms",
|
||||
description: "Outbound message delivery duration",
|
||||
},
|
||||
);
|
||||
const queueDepthHistogram = meter.createHistogram("openclaw.queue.depth", {
|
||||
unit: "1",
|
||||
description: "Queue depth on enqueue/dequeue",
|
||||
@@ -861,6 +882,64 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
span.end();
|
||||
};
|
||||
|
||||
const messageDeliveryAttrs = (
|
||||
evt: MessageDeliveryDiagnosticEvent,
|
||||
): Record<string, string> => ({
|
||||
"openclaw.channel": evt.channel,
|
||||
"openclaw.delivery.kind": evt.deliveryKind,
|
||||
});
|
||||
|
||||
const recordMessageDeliveryStarted = (
|
||||
evt: Extract<DiagnosticEventPayload, { type: "message.delivery.started" }>,
|
||||
) => {
|
||||
messageDeliveryStartedCounter.add(1, messageDeliveryAttrs(evt));
|
||||
};
|
||||
|
||||
const recordMessageDeliveryCompleted = (
|
||||
evt: Extract<DiagnosticEventPayload, { type: "message.delivery.completed" }>,
|
||||
) => {
|
||||
const attrs = {
|
||||
...messageDeliveryAttrs(evt),
|
||||
"openclaw.outcome": "completed",
|
||||
};
|
||||
messageDeliveryDurationHistogram.record(evt.durationMs, attrs);
|
||||
if (!tracesEnabled) {
|
||||
return;
|
||||
}
|
||||
const span = spanWithDuration(
|
||||
"openclaw.message.delivery",
|
||||
{
|
||||
...attrs,
|
||||
"openclaw.delivery.result_count": evt.resultCount,
|
||||
},
|
||||
evt.durationMs,
|
||||
{ endTimeMs: evt.ts },
|
||||
);
|
||||
span.end(evt.ts);
|
||||
};
|
||||
|
||||
const recordMessageDeliveryError = (
|
||||
evt: Extract<DiagnosticEventPayload, { type: "message.delivery.error" }>,
|
||||
) => {
|
||||
const attrs = {
|
||||
...messageDeliveryAttrs(evt),
|
||||
"openclaw.outcome": "error",
|
||||
"openclaw.errorCategory": lowCardinalityAttr(evt.errorCategory, "other"),
|
||||
};
|
||||
messageDeliveryDurationHistogram.record(evt.durationMs, attrs);
|
||||
if (!tracesEnabled) {
|
||||
return;
|
||||
}
|
||||
const span = spanWithDuration("openclaw.message.delivery", attrs, evt.durationMs, {
|
||||
endTimeMs: evt.ts,
|
||||
});
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: redactSensitiveText(evt.errorCategory),
|
||||
});
|
||||
span.end(evt.ts);
|
||||
};
|
||||
|
||||
const recordLaneEnqueue = (
|
||||
evt: Extract<DiagnosticEventPayload, { type: "queue.lane.enqueue" }>,
|
||||
) => {
|
||||
@@ -1160,6 +1239,15 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
case "message.processed":
|
||||
recordMessageProcessed(evt);
|
||||
return;
|
||||
case "message.delivery.started":
|
||||
recordMessageDeliveryStarted(evt);
|
||||
return;
|
||||
case "message.delivery.completed":
|
||||
recordMessageDeliveryCompleted(evt);
|
||||
return;
|
||||
case "message.delivery.error":
|
||||
recordMessageDeliveryError(evt);
|
||||
return;
|
||||
case "queue.lane.enqueue":
|
||||
recordLaneEnqueue(evt);
|
||||
return;
|
||||
|
||||
@@ -84,6 +84,30 @@ export type DiagnosticMessageProcessedEvent = DiagnosticBaseEvent & {
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type DiagnosticMessageDeliveryKind = "text" | "media" | "edit" | "reaction" | "other";
|
||||
|
||||
type DiagnosticMessageDeliveryBaseEvent = DiagnosticBaseEvent & {
|
||||
channel: string;
|
||||
sessionKey?: string;
|
||||
deliveryKind: DiagnosticMessageDeliveryKind;
|
||||
};
|
||||
|
||||
export type DiagnosticMessageDeliveryStartedEvent = DiagnosticMessageDeliveryBaseEvent & {
|
||||
type: "message.delivery.started";
|
||||
};
|
||||
|
||||
export type DiagnosticMessageDeliveryCompletedEvent = DiagnosticMessageDeliveryBaseEvent & {
|
||||
type: "message.delivery.completed";
|
||||
durationMs: number;
|
||||
resultCount: number;
|
||||
};
|
||||
|
||||
export type DiagnosticMessageDeliveryErrorEvent = DiagnosticMessageDeliveryBaseEvent & {
|
||||
type: "message.delivery.error";
|
||||
durationMs: number;
|
||||
errorCategory: string;
|
||||
};
|
||||
|
||||
export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & {
|
||||
type: "session.state";
|
||||
sessionKey?: string;
|
||||
@@ -310,6 +334,9 @@ export type DiagnosticEventPayload =
|
||||
| DiagnosticWebhookErrorEvent
|
||||
| DiagnosticMessageQueuedEvent
|
||||
| DiagnosticMessageProcessedEvent
|
||||
| DiagnosticMessageDeliveryStartedEvent
|
||||
| DiagnosticMessageDeliveryCompletedEvent
|
||||
| DiagnosticMessageDeliveryErrorEvent
|
||||
| DiagnosticSessionStateEvent
|
||||
| DiagnosticSessionStuckEvent
|
||||
| DiagnosticLaneEnqueueEvent
|
||||
@@ -352,6 +379,9 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
|
||||
"tool.execution.completed",
|
||||
"tool.execution.error",
|
||||
"exec.process.completed",
|
||||
"message.delivery.started",
|
||||
"message.delivery.completed",
|
||||
"message.delivery.error",
|
||||
"model.call.started",
|
||||
"model.call.completed",
|
||||
"model.call.error",
|
||||
|
||||
@@ -14,6 +14,11 @@ import {
|
||||
import type { PluginHookRegistration } from "../../plugins/types.js";
|
||||
import { createOutboundTestPlugin, createTestRegistry } from "../../test-utils/channel-plugins.js";
|
||||
import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js";
|
||||
import {
|
||||
onInternalDiagnosticEvent,
|
||||
resetDiagnosticEventsForTest,
|
||||
type DiagnosticEventPayload,
|
||||
} from "../diagnostic-events.js";
|
||||
import { resolvePreferredOpenClawTmpDir } from "../tmp-openclaw-dir.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
@@ -209,6 +214,10 @@ async function deliverSingleMatrixForHookTest(params?: { sessionKey?: string })
|
||||
});
|
||||
}
|
||||
|
||||
function flushDiagnosticEvents() {
|
||||
return new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
|
||||
async function runBestEffortPartialFailureDelivery() {
|
||||
const sendMatrix = vi
|
||||
.fn()
|
||||
@@ -251,6 +260,7 @@ describe("deliverOutboundPayloads", () => {
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
resetDiagnosticEventsForTest();
|
||||
releasePinnedPluginChannelRegistry();
|
||||
setActivePluginRegistry(defaultRegistry);
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockClear();
|
||||
@@ -278,10 +288,89 @@ describe("deliverOutboundPayloads", () => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetDiagnosticEventsForTest();
|
||||
releasePinnedPluginChannelRegistry();
|
||||
setActivePluginRegistry(emptyRegistry);
|
||||
});
|
||||
|
||||
it("emits bounded delivery diagnostics for successful outbound sends", async () => {
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event));
|
||||
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
|
||||
|
||||
try {
|
||||
await deliverOutboundPayloads({
|
||||
cfg: matrixChunkConfig,
|
||||
channel: "matrix",
|
||||
to: "!room:example",
|
||||
payloads: [{ text: "secret delivery body" }],
|
||||
deps: { matrix: sendMatrix },
|
||||
session: { key: "session-1" },
|
||||
});
|
||||
await flushDiagnosticEvents();
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
const deliveryEvents = events.filter((event) => event.type.startsWith("message.delivery."));
|
||||
expect(deliveryEvents).toEqual([
|
||||
expect.objectContaining({
|
||||
type: "message.delivery.started",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
sessionKey: "session-1",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
type: "message.delivery.completed",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
durationMs: expect.any(Number),
|
||||
resultCount: 1,
|
||||
sessionKey: "session-1",
|
||||
}),
|
||||
]);
|
||||
expect(JSON.stringify(deliveryEvents)).not.toContain("secret delivery body");
|
||||
expect(JSON.stringify(deliveryEvents)).not.toContain("!room:example");
|
||||
});
|
||||
|
||||
it("emits bounded delivery diagnostics for outbound send failures", async () => {
|
||||
const events: DiagnosticEventPayload[] = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((event) => events.push(event));
|
||||
const sendMatrix = vi
|
||||
.fn()
|
||||
.mockRejectedValue(new TypeError("secret delivery body could not send"));
|
||||
|
||||
try {
|
||||
await deliverOutboundPayloads({
|
||||
cfg: matrixChunkConfig,
|
||||
channel: "matrix",
|
||||
to: "!room:example",
|
||||
payloads: [{ text: "secret delivery body" }],
|
||||
deps: { matrix: sendMatrix },
|
||||
bestEffort: true,
|
||||
session: { key: "session-1" },
|
||||
});
|
||||
await flushDiagnosticEvents();
|
||||
} finally {
|
||||
unsubscribe();
|
||||
}
|
||||
|
||||
const errorEvent = events.find((event) => event.type === "message.delivery.error");
|
||||
expect(errorEvent).toEqual(
|
||||
expect.objectContaining({
|
||||
type: "message.delivery.error",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
durationMs: expect.any(Number),
|
||||
errorCategory: "TypeError",
|
||||
sessionKey: "session-1",
|
||||
}),
|
||||
);
|
||||
expect(
|
||||
JSON.stringify(events.filter((event) => event.type.startsWith("message.delivery."))),
|
||||
).not.toContain("secret delivery body");
|
||||
});
|
||||
|
||||
it("keeps requester session channel authoritative for delivery media policy", async () => {
|
||||
const resolveMediaAccessSpy = vi.spyOn(
|
||||
mediaCapabilityModule,
|
||||
|
||||
@@ -28,6 +28,8 @@ import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { OutboundMediaAccess } from "../../media/load-options.js";
|
||||
import { resolveAgentScopedOutboundMediaAccess } from "../../media/read-capability.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { diagnosticErrorCategory } from "../diagnostic-error-metadata.js";
|
||||
import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diagnostic-events.js";
|
||||
import { formatErrorMessage } from "../errors.js";
|
||||
import { throwIfAborted } from "./abort.js";
|
||||
import type { OutboundDeliveryResult } from "./deliver-types.js";
|
||||
@@ -369,6 +371,73 @@ type MessageSentEvent = {
|
||||
messageId?: string;
|
||||
};
|
||||
|
||||
function sessionKeyForDeliveryDiagnostics(params: {
|
||||
mirror?: DeliveryMirror;
|
||||
session?: OutboundSessionContext;
|
||||
}): string | undefined {
|
||||
return params.mirror?.sessionKey ?? params.session?.key ?? params.session?.policyKey;
|
||||
}
|
||||
|
||||
function deliveryKindForPayload(
|
||||
payload: ReplyPayload,
|
||||
payloadSummary: NormalizedOutboundPayload,
|
||||
): DiagnosticMessageDeliveryKind {
|
||||
if (payloadSummary.mediaUrls.length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
|
||||
return "media";
|
||||
}
|
||||
if (payload.presentation || payload.interactive || payload.channelData || payload.audioAsVoice) {
|
||||
return "other";
|
||||
}
|
||||
return "text";
|
||||
}
|
||||
|
||||
function emitMessageDeliveryStarted(params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
deliveryKind: DiagnosticMessageDeliveryKind;
|
||||
sessionKey?: string;
|
||||
}): void {
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.started",
|
||||
channel: params.channel,
|
||||
deliveryKind: params.deliveryKind,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function emitMessageDeliveryCompleted(params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
deliveryKind: DiagnosticMessageDeliveryKind;
|
||||
durationMs: number;
|
||||
resultCount: number;
|
||||
sessionKey?: string;
|
||||
}): void {
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.completed",
|
||||
channel: params.channel,
|
||||
deliveryKind: params.deliveryKind,
|
||||
durationMs: params.durationMs,
|
||||
resultCount: params.resultCount,
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function emitMessageDeliveryError(params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
deliveryKind: DiagnosticMessageDeliveryKind;
|
||||
durationMs: number;
|
||||
error: unknown;
|
||||
sessionKey?: string;
|
||||
}): void {
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.error",
|
||||
channel: params.channel,
|
||||
deliveryKind: params.deliveryKind,
|
||||
durationMs: params.durationMs,
|
||||
errorCategory: diagnosticErrorCategory(params.error),
|
||||
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeEmptyPayloadForDelivery(payload: ReplyPayload): ReplyPayload | null {
|
||||
const text = typeof payload.text === "string" ? payload.text : "";
|
||||
if (!text.trim()) {
|
||||
@@ -871,6 +940,7 @@ async function deliverOutboundPayloadsCore(
|
||||
mirrorGroupId,
|
||||
});
|
||||
const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false;
|
||||
const diagnosticSessionKey = sessionKeyForDeliveryDiagnostics(params);
|
||||
if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) {
|
||||
log.warn(
|
||||
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
|
||||
@@ -883,6 +953,47 @@ async function deliverOutboundPayloadsCore(
|
||||
}
|
||||
for (const payload of normalizedPayloads) {
|
||||
let payloadSummary = buildPayloadSummary(payload);
|
||||
let deliveryKind: DiagnosticMessageDeliveryKind = "other";
|
||||
let deliveryStartedAt = 0;
|
||||
let deliveryStarted = false;
|
||||
let deliveryFinished = false;
|
||||
const startDeliveryDiagnostics = (kind: DiagnosticMessageDeliveryKind) => {
|
||||
deliveryKind = kind;
|
||||
deliveryStartedAt = Date.now();
|
||||
deliveryStarted = true;
|
||||
deliveryFinished = false;
|
||||
emitMessageDeliveryStarted({
|
||||
channel,
|
||||
deliveryKind,
|
||||
sessionKey: diagnosticSessionKey,
|
||||
});
|
||||
};
|
||||
const completeDeliveryDiagnostics = (resultCount: number) => {
|
||||
if (!deliveryStarted) {
|
||||
return;
|
||||
}
|
||||
deliveryFinished = true;
|
||||
emitMessageDeliveryCompleted({
|
||||
channel,
|
||||
deliveryKind,
|
||||
durationMs: Date.now() - deliveryStartedAt,
|
||||
resultCount,
|
||||
sessionKey: diagnosticSessionKey,
|
||||
});
|
||||
};
|
||||
const errorDeliveryDiagnostics = (err: unknown) => {
|
||||
if (!deliveryStarted || deliveryFinished) {
|
||||
return;
|
||||
}
|
||||
deliveryFinished = true;
|
||||
emitMessageDeliveryError({
|
||||
channel,
|
||||
deliveryKind,
|
||||
durationMs: Date.now() - deliveryStartedAt,
|
||||
error: err,
|
||||
sessionKey: diagnosticSessionKey,
|
||||
});
|
||||
};
|
||||
try {
|
||||
throwIfAborted(abortSignal);
|
||||
|
||||
@@ -912,6 +1023,7 @@ async function deliverOutboundPayloadsCore(
|
||||
continue;
|
||||
}
|
||||
payloadSummary = buildPayloadSummary(effectivePayload);
|
||||
startDeliveryDiagnostics(deliveryKindForPayload(effectivePayload, payloadSummary));
|
||||
|
||||
params.onPayload?.(payloadSummary);
|
||||
const replyToResolution = resolveCurrentReplyTo(effectivePayload);
|
||||
@@ -955,6 +1067,7 @@ async function deliverOutboundPayloadsCore(
|
||||
target: deliveryTarget,
|
||||
results: [delivery],
|
||||
});
|
||||
completeDeliveryDiagnostics(1);
|
||||
emitMessageSent({
|
||||
success: true,
|
||||
content: payloadSummary.hookContent ?? payloadSummary.text,
|
||||
@@ -989,6 +1102,7 @@ async function deliverOutboundPayloadsCore(
|
||||
target: deliveryTarget,
|
||||
results: deliveredResults,
|
||||
});
|
||||
completeDeliveryDiagnostics(deliveredResults.length);
|
||||
emitMessageSent({
|
||||
success: results.length > beforeCount,
|
||||
content: payloadSummary.hookContent ?? payloadSummary.text,
|
||||
@@ -1029,6 +1143,7 @@ async function deliverOutboundPayloadsCore(
|
||||
target: deliveryTarget,
|
||||
results: deliveredResults,
|
||||
});
|
||||
completeDeliveryDiagnostics(deliveredResults.length);
|
||||
emitMessageSent({
|
||||
success: results.length > beforeCount,
|
||||
content: payloadSummary.hookContent ?? payloadSummary.text,
|
||||
@@ -1070,12 +1185,14 @@ async function deliverOutboundPayloadsCore(
|
||||
target: deliveryTarget,
|
||||
results: results.slice(beforeCount),
|
||||
});
|
||||
completeDeliveryDiagnostics(results.length - beforeCount);
|
||||
emitMessageSent({
|
||||
success: true,
|
||||
content: payloadSummary.hookContent ?? payloadSummary.text,
|
||||
messageId: lastMessageId,
|
||||
});
|
||||
} catch (err) {
|
||||
errorDeliveryDiagnostics(err);
|
||||
emitMessageSent({
|
||||
success: false,
|
||||
content: payloadSummary.hookContent ?? payloadSummary.text,
|
||||
|
||||
@@ -25,11 +25,13 @@ export type DiagnosticStabilityEventRecord = {
|
||||
mode?: string;
|
||||
level?: string;
|
||||
detector?: string;
|
||||
deliveryKind?: string;
|
||||
toolName?: string;
|
||||
pairedToolName?: string;
|
||||
provider?: string;
|
||||
model?: string;
|
||||
durationMs?: number;
|
||||
resultCount?: number;
|
||||
commandLength?: number;
|
||||
exitCode?: number;
|
||||
timedOut?: boolean;
|
||||
@@ -204,6 +206,24 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
|
||||
record.outcome = event.outcome;
|
||||
assignReasonCode(record, event.reason);
|
||||
break;
|
||||
case "message.delivery.started":
|
||||
record.channel = event.channel;
|
||||
record.deliveryKind = event.deliveryKind;
|
||||
break;
|
||||
case "message.delivery.completed":
|
||||
record.channel = event.channel;
|
||||
record.deliveryKind = event.deliveryKind;
|
||||
record.durationMs = event.durationMs;
|
||||
record.resultCount = event.resultCount;
|
||||
record.outcome = "completed";
|
||||
break;
|
||||
case "message.delivery.error":
|
||||
record.channel = event.channel;
|
||||
record.deliveryKind = event.deliveryKind;
|
||||
record.durationMs = event.durationMs;
|
||||
record.outcome = "error";
|
||||
assignReasonCode(record, event.errorCategory);
|
||||
break;
|
||||
case "session.state":
|
||||
record.outcome = event.state;
|
||||
assignReasonCode(record, event.reason);
|
||||
|
||||
@@ -14,7 +14,12 @@ import {
|
||||
pruneDiagnosticSessionStates,
|
||||
resetDiagnosticSessionStateForTest,
|
||||
} from "./diagnostic-session-state.js";
|
||||
import { getDiagnosticStabilitySnapshot } from "./diagnostic-stability.js";
|
||||
import {
|
||||
getDiagnosticStabilitySnapshot,
|
||||
resetDiagnosticStabilityRecorderForTest,
|
||||
startDiagnosticStabilityRecorder,
|
||||
stopDiagnosticStabilityRecorder,
|
||||
} from "./diagnostic-stability.js";
|
||||
import {
|
||||
logSessionStateChange,
|
||||
resetDiagnosticStateForTest,
|
||||
@@ -32,6 +37,10 @@ function createEmitMemorySampleMock() {
|
||||
}));
|
||||
}
|
||||
|
||||
function flushDiagnosticEvents() {
|
||||
return new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
|
||||
describe("diagnostic session state pruning", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
@@ -232,3 +241,44 @@ describe("stuck session diagnostics threshold", () => {
|
||||
expect(resolveStuckSessionWarnMs()).toBe(120_000);
|
||||
});
|
||||
});
|
||||
|
||||
describe("diagnostic stability snapshots", () => {
|
||||
beforeEach(() => {
|
||||
resetDiagnosticEventsForTest();
|
||||
resetDiagnosticStabilityRecorderForTest();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
stopDiagnosticStabilityRecorder();
|
||||
resetDiagnosticStabilityRecorderForTest();
|
||||
resetDiagnosticEventsForTest();
|
||||
});
|
||||
|
||||
it("records bounded outbound delivery diagnostics without session identifiers", async () => {
|
||||
startDiagnosticStabilityRecorder();
|
||||
|
||||
emitDiagnosticEvent({
|
||||
type: "message.delivery.error",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
durationMs: 12,
|
||||
errorCategory: "TypeError",
|
||||
sessionKey: "session-secret",
|
||||
});
|
||||
await flushDiagnosticEvents();
|
||||
|
||||
expect(getDiagnosticStabilitySnapshot({ limit: 10 }).events).toContainEqual(
|
||||
expect.objectContaining({
|
||||
type: "message.delivery.error",
|
||||
channel: "matrix",
|
||||
deliveryKind: "text",
|
||||
durationMs: 12,
|
||||
outcome: "error",
|
||||
reason: "TypeError",
|
||||
}),
|
||||
);
|
||||
const [event] = getDiagnosticStabilitySnapshot({ limit: 10 }).events;
|
||||
expect(event).not.toHaveProperty("sessionKey");
|
||||
expect(event).not.toHaveProperty("sessionId");
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user