fix(diagnostics): trust internal trace parents (#71574)

* fix(diagnostics): trust internal trace parents

* fix(diagnostics): harden trusted trace metadata

* fix(tooling): honor explicit oxlint threads

* fix(agents): use stable nonmutating sort helpers

* chore(plugin-sdk): refresh api baseline

* fix(diagnostics): gate internal event subscriptions

* fix(diagnostics): isolate listener event copies

* chore(plugin-sdk): refresh internal diagnostics baseline

* chore(plugin-sdk): refresh diagnostics event baseline

* fix(diagnostics): keep event state module local

* fix(diagnostics): harden internal subscription capability

* fix(diagnostics): freeze listener metadata
This commit is contained in:
Vincent Koc
2026-04-25 10:18:52 -07:00
committed by GitHub
parent 8e7d382c37
commit dcdf97685b
21 changed files with 555 additions and 94 deletions

View File

@@ -1,2 +1,2 @@
d5bad55d588ecafab1298a2a79578ce13becced8bc33d2b8543161ab528feca4 plugin-sdk-api-baseline.json
373ded33d5ecc61229de5179827182f0c6f805a804e1f0666cf2da68301153be plugin-sdk-api-baseline.jsonl
f6d9588737310773031e744b6726ba80a9ca742205db335aae95fbd1e2925dc8 plugin-sdk-api-baseline.json
a4c86fe92b7bea538f33139e9b57cfada766b7d504323c2e20a7ca205994be44 plugin-sdk-api-baseline.jsonl

View File

@@ -111,6 +111,10 @@ vi.mock("@opentelemetry/semantic-conventions", () => ({
ATTR_SERVICE_NAME: "service.name",
}));
import {
emitTrustedDiagnosticEvent,
onInternalDiagnosticEvent,
} from "../../../src/infra/diagnostic-events.js";
import type { OpenClawPluginServiceContext } from "../api.js";
import { emitDiagnosticEvent } from "../api.js";
import { createDiagnosticsOtelService } from "./service.js";
@@ -122,6 +126,7 @@ const TRACE_ID = "4bf92f3577b34da6a3ce929d0e0e4736";
const SPAN_ID = "00f067aa0ba902b7";
const CHILD_SPAN_ID = "1111111111111111";
const GRANDCHILD_SPAN_ID = "2222222222222222";
const TOOL_SPAN_ID = "3333333333333333";
const PROTO_KEY = "__proto__";
const MAX_TEST_OTEL_CONTENT_ATTRIBUTE_CHARS = 4096;
const OTEL_TRUNCATED_SUFFIX_MAX_CHARS = 20;
@@ -165,6 +170,7 @@ function createOtelContext(
},
logger: createLogger(),
stateDir: OTEL_TEST_STATE_DIR,
internalDiagnostics: { onEvent: onInternalDiagnosticEvent },
};
}
@@ -174,11 +180,13 @@ function createTraceOnlyContext(endpoint: string): OpenClawPluginServiceContext
async function emitAndCaptureLog(
event: Omit<Extract<Parameters<typeof emitDiagnosticEvent>[0], { type: "log.record" }>, "type">,
options: { trusted?: boolean } = {},
) {
const service = createDiagnosticsOtelService();
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { logs: true });
await service.start(ctx);
emitDiagnosticEvent({
const emit = options.trusted ? emitTrustedDiagnosticEvent : emitDiagnosticEvent;
emit({
type: "log.record",
...event,
});
@@ -499,7 +507,7 @@ describe("diagnostics-otel service", () => {
}
});
test("attaches diagnostic trace context to exported logs", async () => {
test("does not attach untrusted diagnostic trace context to exported logs", async () => {
const emitCall = await emitAndCaptureLog({
level: "INFO",
message: "traceable log",
@@ -513,15 +521,31 @@ describe("diagnostics-otel service", () => {
},
});
expect(emitCall?.attributes).toMatchObject({
"openclaw.traceFlags": "01",
});
expect(emitCall?.attributes).toEqual(
expect.not.objectContaining({
"openclaw.traceId": expect.anything(),
"openclaw.spanId": expect.anything(),
"openclaw.traceFlags": expect.anything(),
}),
);
expect(telemetryState.tracer.setSpanContext).not.toHaveBeenCalled();
expect(emitCall?.context).toBeUndefined();
});
test("attaches trusted diagnostic trace context to exported logs", async () => {
const emitCall = await emitAndCaptureLog(
{
level: "INFO",
message: "traceable log",
trace: {
traceId: TRACE_ID,
spanId: SPAN_ID,
traceFlags: "01",
},
},
{ trusted: true },
);
expect(telemetryState.tracer.setSpanContext).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({
@@ -817,6 +841,75 @@ describe("diagnostics-otel service", () => {
await service.stop?.(ctx);
});
test("parents trusted diagnostic lifecycle spans from explicit parent ids", async () => {
const service = createDiagnosticsOtelService();
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true });
await service.start(ctx);
emitTrustedDiagnosticEvent({
type: "run.completed",
runId: "run-1",
provider: "openai",
model: "gpt-5.4",
outcome: "completed",
durationMs: 100,
trace: {
traceId: TRACE_ID,
spanId: CHILD_SPAN_ID,
parentSpanId: SPAN_ID,
traceFlags: "01",
},
});
emitTrustedDiagnosticEvent({
type: "model.call.completed",
runId: "run-1",
callId: "call-1",
provider: "openai",
model: "gpt-5.4",
durationMs: 80,
trace: {
traceId: TRACE_ID,
spanId: GRANDCHILD_SPAN_ID,
parentSpanId: CHILD_SPAN_ID,
traceFlags: "01",
},
});
emitTrustedDiagnosticEvent({
type: "tool.execution.error",
runId: "run-1",
toolName: "read",
durationMs: 20,
errorCategory: "TypeError",
trace: {
traceId: TRACE_ID,
spanId: TOOL_SPAN_ID,
parentSpanId: GRANDCHILD_SPAN_ID,
traceFlags: "01",
},
});
await flushDiagnosticEvents();
expect(telemetryState.tracer.setSpanContext).toHaveBeenCalledTimes(3);
expect(telemetryState.tracer.setSpanContext.mock.calls.map((call) => call[1])).toEqual([
expect.objectContaining({ traceId: TRACE_ID, spanId: SPAN_ID }),
expect.objectContaining({ traceId: TRACE_ID, spanId: CHILD_SPAN_ID }),
expect.objectContaining({ traceId: TRACE_ID, spanId: GRANDCHILD_SPAN_ID }),
]);
const parentBySpanName = Object.fromEntries(
telemetryState.tracer.startSpan.mock.calls.map((call) => [
call[0],
(call[2] as { spanContext?: { spanId?: string } } | undefined)?.spanContext?.spanId,
]),
);
expect(parentBySpanName).toMatchObject({
"openclaw.run": SPAN_ID,
"openclaw.model.call": CHILD_SPAN_ID,
"openclaw.tool.execution": GRANDCHILD_SPAN_ID,
});
await service.stop?.(ctx);
});
test("exports exec process spans without command text", async () => {
const service = createDiagnosticsOtelService();
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true });

View File

@@ -16,6 +16,7 @@ import { NodeSDK } from "@opentelemetry/sdk-node";
import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
import type {
DiagnosticEventMetadata,
DiagnosticEventPayload,
DiagnosticTraceContext,
OpenClawPluginService,
@@ -24,7 +25,6 @@ import {
isValidDiagnosticSpanId,
isValidDiagnosticTraceFlags,
isValidDiagnosticTraceId,
onInternalDiagnosticEvent,
redactSensitiveText,
} from "../api.js";
@@ -339,6 +339,33 @@ function contextForTraceContext(traceContext: DiagnosticTraceContext | undefined
});
}
function contextForDiagnosticSpanParent(traceContext: DiagnosticTraceContext | undefined) {
const normalized = normalizeTraceContext(traceContext);
if (!normalized?.parentSpanId) {
return undefined;
}
return trace.setSpanContext(otelContextApi.active(), {
traceId: normalized.traceId,
spanId: normalized.parentSpanId,
traceFlags: traceFlagsToOtel(normalized.traceFlags),
isRemote: true,
});
}
function contextForTrustedTraceContext(
evt: DiagnosticEventPayload,
metadata: DiagnosticEventMetadata,
) {
return metadata.trusted ? contextForTraceContext(evt.trace) : undefined;
}
function contextForTrustedDiagnosticSpanParent(
evt: DiagnosticEventPayload,
metadata: DiagnosticEventMetadata,
) {
return metadata.trusted ? contextForDiagnosticSpanParent(evt.trace) : undefined;
}
function addTraceAttributes(
attributes: Record<string, string | number | boolean>,
traceContext: DiagnosticTraceContext | undefined,
@@ -584,7 +611,10 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
});
let recordLogRecord:
| ((evt: Extract<DiagnosticEventPayload, { type: "log.record" }>) => void)
| ((
evt: Extract<DiagnosticEventPayload, { type: "log.record" }>,
metadata: DiagnosticEventMetadata,
) => void)
| undefined;
if (logsEnabled) {
let logRecordExportFailureLastReportedAt = Number.NEGATIVE_INFINITY;
@@ -603,7 +633,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
processors: [logProcessor],
});
const otelLogger = logProvider.getLogger("openclaw");
recordLogRecord = (evt) => {
recordLogRecord = (evt, metadata) => {
try {
const logLevelName = evt.level || "INFO";
const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber);
@@ -626,7 +656,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
if (evt.code?.functionName) {
assignOtelLogAttribute(attributes, "code.function", evt.code.functionName);
}
addTraceAttributes(attributes, evt.trace);
if (metadata.trusted) {
addTraceAttributes(attributes, evt.trace);
}
const logRecord: LogRecord = {
body: normalizeOtelLogString(evt.message || "log", MAX_OTEL_LOG_BODY_CHARS),
@@ -635,7 +667,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
attributes: redactOtelAttributes(attributes),
timestamp: evt.ts,
};
const logContext = contextForTraceContext(evt.trace);
const logContext = contextForTrustedTraceContext(evt, metadata);
if (logContext) {
logRecord.context = logContext;
}
@@ -719,7 +751,10 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
};
};
const recordModelUsage = (evt: Extract<DiagnosticEventPayload, { type: "model.usage" }>) => {
const recordModelUsage = (
evt: Extract<DiagnosticEventPayload, { type: "model.usage" }>,
metadata: DiagnosticEventMetadata,
) => {
const attrs = {
"openclaw.channel": evt.channel ?? "unknown",
"openclaw.provider": evt.provider ?? "unknown",
@@ -777,8 +812,11 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
"openclaw.tokens.total": usage.total ?? 0,
};
const span = spanWithDuration("openclaw.model.usage", spanAttrs, evt.durationMs);
span.end();
const span = spanWithDuration("openclaw.model.usage", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
span.end(evt.ts);
};
const recordWebhookReceived = (
@@ -994,6 +1032,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
const recordRunCompleted = (
evt: Extract<DiagnosticEventPayload, { type: "run.completed" }>,
metadata: DiagnosticEventMetadata,
) => {
const attrs: Record<string, string | number> = {
"openclaw.outcome": evt.outcome,
@@ -1015,6 +1054,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
spanAttrs["openclaw.errorCategory"] = lowCardinalityAttr(evt.errorCategory, "other");
}
const span = spanWithDuration("openclaw.run", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
if (evt.outcome === "error") {
@@ -1037,6 +1077,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
const recordModelCallCompleted = (
evt: Extract<DiagnosticEventPayload, { type: "model.call.completed" }>,
metadata: DiagnosticEventMetadata,
) => {
modelCallDurationHistogram.record(evt.durationMs, modelCallMetricAttrs(evt));
if (!tracesEnabled) {
@@ -1061,6 +1102,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
contentCapturePolicy,
);
const span = spanWithDuration("openclaw.model.call", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
span.end(evt.ts);
@@ -1068,6 +1110,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
const recordModelCallError = (
evt: Extract<DiagnosticEventPayload, { type: "model.call.error" }>,
metadata: DiagnosticEventMetadata,
) => {
modelCallDurationHistogram.record(evt.durationMs, {
...modelCallMetricAttrs(evt),
@@ -1096,6 +1139,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
contentCapturePolicy,
);
const span = spanWithDuration("openclaw.model.call", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
span.setStatus({
@@ -1107,6 +1151,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
const recordToolExecutionCompleted = (
evt: Extract<DiagnosticEventPayload, { type: "tool.execution.completed" }>,
metadata: DiagnosticEventMetadata,
) => {
const attrs = {
"openclaw.toolName": evt.toolName,
@@ -1128,6 +1173,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
contentCapturePolicy,
);
const span = spanWithDuration("openclaw.tool.execution", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
span.end(evt.ts);
@@ -1135,6 +1181,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
const recordToolExecutionError = (
evt: Extract<DiagnosticEventPayload, { type: "tool.execution.error" }>,
metadata: DiagnosticEventMetadata,
) => {
const attrs = {
"openclaw.toolName": evt.toolName,
@@ -1161,6 +1208,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
contentCapturePolicy,
);
const span = spanWithDuration("openclaw.tool.execution", spanAttrs, evt.durationMs, {
parentContext: contextForTrustedDiagnosticSpanParent(evt, metadata),
endTimeMs: evt.ts,
});
span.setStatus({
@@ -1218,11 +1266,17 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
queueDepthHistogram.record(evt.queued, { "openclaw.channel": "heartbeat" });
};
unsubscribe = onInternalDiagnosticEvent((evt: DiagnosticEventPayload) => {
const subscribe = ctx.internalDiagnostics?.onEvent;
if (!subscribe) {
ctx.logger.error("diagnostics-otel: internal diagnostics capability unavailable");
return;
}
unsubscribe = subscribe((evt: DiagnosticEventPayload, metadata: DiagnosticEventMetadata) => {
try {
switch (evt.type) {
case "model.usage":
recordModelUsage(evt);
recordModelUsage(evt, metadata);
return;
case "webhook.received":
recordWebhookReceived(evt);
@@ -1267,25 +1321,25 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
recordHeartbeat(evt);
return;
case "run.completed":
recordRunCompleted(evt);
recordRunCompleted(evt, metadata);
return;
case "model.call.completed":
recordModelCallCompleted(evt);
recordModelCallCompleted(evt, metadata);
return;
case "model.call.error":
recordModelCallError(evt);
recordModelCallError(evt, metadata);
return;
case "tool.execution.completed":
recordToolExecutionCompleted(evt);
recordToolExecutionCompleted(evt, metadata);
return;
case "tool.execution.error":
recordToolExecutionError(evt);
recordToolExecutionError(evt, metadata);
return;
case "exec.process.completed":
recordExecProcessCompleted(evt);
return;
case "log.record":
recordLogRecord?.(evt);
recordLogRecord?.(evt, metadata);
return;
case "tool.loop":
case "tool.execution.started":

View File

@@ -92,7 +92,7 @@ export function applyLocalOxlintPolicy(args, env, hostResources) {
insertBeforeSeparator(nextArgs, "--report-unused-disable-directives-severity", "error");
}
if (shouldThrottleLocalHeavyChecks(nextEnv, hostResources)) {
if (shouldThrottleLocalHeavyChecks(nextEnv, hostResources) && !hasFlag(nextArgs, "--threads")) {
insertBeforeSeparator(nextArgs, "--threads=1");
}

View File

@@ -1,7 +1,7 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { diagnosticErrorCategory } from "../../../infra/diagnostic-error-metadata.js";
import {
emitDiagnosticEvent,
emitTrustedDiagnosticEvent,
type DiagnosticEventInput,
} from "../../../infra/diagnostic-events.js";
import {
@@ -122,14 +122,14 @@ async function* observeModelCallIterator<T>(
yield next.value;
}
terminalEmitted = true;
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.completed",
...eventBase,
durationMs: Date.now() - startedAt,
});
} catch (err) {
terminalEmitted = true;
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
@@ -139,7 +139,7 @@ async function* observeModelCallIterator<T>(
} finally {
if (!terminalEmitted) {
await safeReturnIterator(iterator);
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
@@ -194,7 +194,7 @@ function observeModelCallResult(
startedAt,
);
}
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.completed",
...eventBase,
durationMs: Date.now() - startedAt,
@@ -210,7 +210,7 @@ export function wrapStreamFnWithDiagnosticModelCallEvents(
const callId = ctx.nextCallId();
const trace = freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace));
const eventBase = baseModelCallEvent(ctx, callId, trace);
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.started",
...eventBase,
});
@@ -222,7 +222,7 @@ export function wrapStreamFnWithDiagnosticModelCallEvents(
return result.then(
(resolved) => observeModelCallResult(resolved, eventBase, startedAt),
(err) => {
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
@@ -234,7 +234,7 @@ export function wrapStreamFnWithDiagnosticModelCallEvents(
}
return observeModelCallResult(result, eventBase, startedAt);
} catch (err) {
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,

View File

@@ -9,7 +9,7 @@ import {
} from "@mariozechner/pi-coding-agent";
import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { emitDiagnosticEvent } from "../../../infra/diagnostic-events.js";
import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js";
import {
createDiagnosticTraceContext,
createChildDiagnosticTraceContext,
@@ -649,7 +649,7 @@ export async function runEmbeddedAttempt(
: {}),
trace: runTrace,
};
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "run.started",
...diagnosticRunBase,
});
@@ -660,7 +660,7 @@ export async function runEmbeddedAttempt(
return;
}
diagnosticRunCompleted = true;
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "run.completed",
...diagnosticRunBase,
durationMs: Date.now() - diagnosticRunStartedAt,
@@ -673,7 +673,7 @@ export async function runEmbeddedAttempt(
: (() => {
const allTools = createOpenClawCodingTools({
agentId: sessionAgentId,
...buildEmbeddedAttemptToolRunContext({ ...params, trace: diagnosticTrace }),
...buildEmbeddedAttemptToolRunContext({ ...params, trace: runTrace }),
exec: {
...params.execOverrides,
elevated: params.bashElevated,

View File

@@ -391,7 +391,12 @@ describe("before_tool_call loop detection behavior", () => {
paramsSummary: {
kind: "object",
},
trace,
trace: {
traceId: trace.traceId,
parentSpanId: trace.spanId,
spanId: expect.any(String),
traceFlags: trace.traceFlags,
},
});
expect(emitted[0]?.trace).not.toBe(trace);
expect(Object.isFrozen(emitted[0]?.trace)).toBe(true);

View File

@@ -4,10 +4,11 @@ import {
diagnosticHttpStatusCode,
} from "../infra/diagnostic-error-metadata.js";
import {
emitDiagnosticEvent,
emitTrustedDiagnosticEvent,
type DiagnosticToolParamsSummary,
} from "../infra/diagnostic-events.js";
import {
createChildDiagnosticTraceContext,
freezeDiagnosticTraceContext,
type DiagnosticTraceContext,
} from "../infra/diagnostic-trace-context.js";
@@ -456,16 +457,19 @@ export function wrapToolWithBeforeToolCallHook(
}
}
const normalizedToolName = normalizeToolName(toolName || "tool");
const trace = ctx?.trace
? freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace))
: undefined;
const eventBase = {
...(ctx?.runId && { runId: ctx.runId }),
...(ctx?.sessionKey && { sessionKey: ctx.sessionKey }),
...(ctx?.sessionId && { sessionId: ctx.sessionId }),
...(ctx?.trace && { trace: freezeDiagnosticTraceContext(ctx.trace) }),
...(trace && { trace }),
toolName: normalizedToolName,
...(toolCallId && { toolCallId }),
paramsSummary: summarizeToolParams(outcome.params),
};
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "tool.execution.started",
...eventBase,
});
@@ -480,7 +484,7 @@ export function wrapToolWithBeforeToolCallHook(
toolCallId,
result,
});
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "tool.execution.completed",
...eventBase,
durationMs,
@@ -489,7 +493,7 @@ export function wrapToolWithBeforeToolCallHook(
} catch (err) {
const cause = unwrapErrorCause(err);
const errorCode = diagnosticHttpStatusCode(cause);
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "tool.execution.error",
...eventBase,
durationMs: Date.now() - startedAt,

View File

@@ -440,7 +440,7 @@ function loadSkillEntries(
const suspicious = childDirs.length > limits.maxCandidatesPerRoot;
const maxCandidates = Math.max(0, limits.maxSkillsLoadedPerSource);
const limitedChildren = childDirs.slice().sort().slice(0, maxCandidates);
const limitedChildren = childDirs.toSorted().slice(0, maxCandidates);
if (suspicious) {
skillsLogger.warn("Skills root looks suspiciously large, truncating discovery.", {

View File

@@ -126,8 +126,8 @@ export function buildLatestSubagentRunIndex(
}
childSessionsByController.set(controllerSessionKey, [childSessionKey]);
}
for (const childSessions of childSessionsByController.values()) {
childSessions.sort();
for (const [controllerSessionKey, childSessions] of childSessionsByController) {
childSessionsByController.set(controllerSessionKey, childSessions.toSorted());
}
return {

View File

@@ -16,8 +16,11 @@ import {
import type { TypingMode } from "../../config/types.js";
import { resolveSessionTranscriptCandidates } from "../../gateway/session-utils.fs.js";
import { emitAgentEvent } from "../../infra/agent-events.js";
import { emitDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import { freezeDiagnosticTraceContext } from "../../infra/diagnostic-trace-context.js";
import { emitTrustedDiagnosticEvent, isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import {
createChildDiagnosticTraceContext,
freezeDiagnosticTraceContext,
} from "../../infra/diagnostic-trace-context.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
@@ -1433,10 +1436,14 @@ export async function runReplyAgent(params: {
config: cfg,
});
const costUsd = estimateUsageCost({ usage, cost: costConfig });
emitDiagnosticEvent({
emitTrustedDiagnosticEvent({
type: "model.usage",
...(runResult.diagnosticTrace
? { trace: freezeDiagnosticTraceContext(runResult.diagnosticTrace) }
? {
trace: freezeDiagnosticTraceContext(
createChildDiagnosticTraceContext(runResult.diagnosticTrace),
),
}
: {}),
sessionKey,
sessionId: followupRun.run.sessionId,

View File

@@ -1,6 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
emitDiagnosticEvent,
emitTrustedDiagnosticEvent,
isDiagnosticsEnabled,
onInternalDiagnosticEvent,
onDiagnosticEvent,
@@ -115,6 +116,184 @@ describe("diagnostic-events", () => {
expect(events).toEqual([{ trace, type: "message.queued" }]);
});
it("marks only internal trusted diagnostic emissions as trusted", async () => {
const events: Array<{
metadataTrusted: boolean;
type: string;
}> = [];
onInternalDiagnosticEvent((event, metadata) => {
events.push({
metadataTrusted: metadata.trusted,
type: event.type,
});
});
emitDiagnosticEvent({
type: "message.queued",
source: "plugin",
});
emitTrustedDiagnosticEvent({
type: "model.call.started",
runId: "run-1",
callId: "call-1",
provider: "openai",
model: "gpt-5.4",
});
await new Promise<void>((resolve) => setImmediate(resolve));
expect(events).toEqual([
{ metadataTrusted: false, type: "message.queued" },
{ metadataTrusted: true, type: "model.call.started" },
]);
});
it("does not expose mutable diagnostic state on a global symbol", async () => {
const globalStore = globalThis as Record<PropertyKey, unknown>;
const events: boolean[] = [];
globalStore[Symbol.for("openclaw.diagnosticEventsState")] = {
listeners: new Set([() => events.push(true)]),
};
onInternalDiagnosticEvent((_event, metadata) => {
events.push(metadata.trusted);
});
emitDiagnosticEvent({
type: "model.call.started",
runId: "run-1",
callId: "call-1",
provider: "openai",
model: "gpt-5.4",
});
await new Promise<void>((resolve) => setImmediate(resolve));
expect(events).toEqual([false]);
delete globalStore[Symbol.for("openclaw.diagnosticEventsState")];
});
it("keeps trusted internal events off the public diagnostic stream", async () => {
const publicEvents: string[] = [];
const internalEvents: Array<{ trusted: boolean; type: string }> = [];
onDiagnosticEvent((event) => {
publicEvents.push(event.type);
});
onInternalDiagnosticEvent((event, metadata) => {
internalEvents.push({ trusted: metadata.trusted, type: event.type });
});
emitTrustedDiagnosticEvent({
type: "model.call.started",
runId: "run-1",
callId: "call-1",
provider: "openai",
model: "gpt-5.4",
});
await new Promise<void>((resolve) => setImmediate(resolve));
expect(publicEvents).toEqual([]);
expect(internalEvents).toEqual([{ trusted: true, type: "model.call.started" }]);
});
it("isolates diagnostic metadata from listener mutation", () => {
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
const seen: boolean[] = [];
onInternalDiagnosticEvent((_event, metadata) => {
(metadata as { trusted: boolean }).trusted = true;
});
onInternalDiagnosticEvent((_event, metadata) => {
seen.push(metadata.trusted);
});
emitDiagnosticEvent({
type: "message.queued",
source: "plugin",
});
expect(seen).toEqual([false]);
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining("listener error type=message.queued seq=1: TypeError"),
);
});
it("isolates trusted event trace context from listener mutation", async () => {
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
const trace = createDiagnosticTraceContext({
traceId: "4bf92f3577b34da6a3ce929d0e0e4736",
spanId: "00f067aa0ba902b7",
});
const seen: Array<{ traceId: string | undefined; trusted: boolean }> = [];
onInternalDiagnosticEvent((event) => {
(event.trace as { traceId: string }).traceId = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
});
onInternalDiagnosticEvent((event, metadata) => {
seen.push({ traceId: event.trace?.traceId, trusted: metadata.trusted });
});
emitTrustedDiagnosticEvent({
type: "model.call.started",
runId: "run-1",
callId: "call-1",
provider: "openai",
model: "gpt-5.4",
trace,
});
await new Promise<void>((resolve) => setImmediate(resolve));
expect(seen).toEqual([{ traceId: trace.traceId, trusted: true }]);
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining("listener error type=model.call.started seq=1: TypeError"),
);
});
it("isolates nested diagnostic payloads from listener mutation", () => {
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
const seen: Array<{ total: number | undefined; trusted: boolean }> = [];
onInternalDiagnosticEvent((event) => {
if (event.type === "model.usage") {
event.usage.total = 0;
}
});
onInternalDiagnosticEvent((event, metadata) => {
if (event.type === "model.usage") {
seen.push({ total: event.usage.total, trusted: metadata.trusted });
}
});
emitTrustedDiagnosticEvent({
type: "model.usage",
usage: { total: 42 },
});
expect(seen).toEqual([{ total: 42, trusted: true }]);
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining("listener error type=model.usage seq=1: TypeError"),
);
});
it("drops prototype-pollution keys during event enrichment", () => {
const eventInput = Object.assign(Object.create(null), {
type: "message.queued",
source: "plugin",
constructor: "blocked",
prototype: "blocked",
}) as Parameters<typeof emitDiagnosticEvent>[0] & Record<string, unknown>;
Object.defineProperty(eventInput, "__proto__", {
enumerable: true,
value: { polluted: true },
});
const events: Array<Parameters<Parameters<typeof onInternalDiagnosticEvent>[0]>[0]> = [];
onInternalDiagnosticEvent((event) => {
events.push(event);
});
emitDiagnosticEvent(eventInput);
expect(events).toHaveLength(1);
expect(Object.hasOwn(events[0] ?? {}, "__proto__")).toBe(false);
expect(Object.hasOwn(events[0] ?? {}, "constructor")).toBe(false);
expect(Object.hasOwn(events[0] ?? {}, "prototype")).toBe(false);
expect((Object.prototype as Record<string, unknown>).polluted).toBeUndefined();
});
it("dispatches high-frequency tool and model lifecycle events asynchronously", async () => {
const events: string[] = [];
onDiagnosticEvent((event) => {

View File

@@ -1,5 +1,6 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { DiagnosticTraceContext } from "./diagnostic-trace-context.js";
import { isBlockedObjectKey } from "./prototype-keys.js";
export type DiagnosticSessionState = "idle" | "processing" | "waiting";
@@ -364,12 +365,26 @@ export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event
: never
: never;
export type DiagnosticEventMetadata = Readonly<{
trusted: boolean;
}>;
type DiagnosticEventListener = (
evt: DiagnosticEventPayload,
metadata: DiagnosticEventMetadata,
) => void;
type QueuedDiagnosticEvent = {
event: DiagnosticEventPayload;
metadata: DiagnosticEventMetadata;
};
type DiagnosticEventsGlobalState = {
enabled: boolean;
seq: number;
listeners: Set<(evt: DiagnosticEventPayload) => void>;
listeners: Set<DiagnosticEventListener>;
dispatchDepth: number;
asyncQueue: DiagnosticEventPayload[];
asyncQueue: QueuedDiagnosticEvent[];
asyncDrainScheduled: boolean;
};
@@ -388,21 +403,17 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
"log.record",
]);
const diagnosticEventsState: DiagnosticEventsGlobalState = {
enabled: true,
seq: 0,
listeners: new Set<DiagnosticEventListener>(),
dispatchDepth: 0,
asyncQueue: [],
asyncDrainScheduled: false,
};
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
const globalStore = globalThis as typeof globalThis & {
__openclawDiagnosticEventsState?: DiagnosticEventsGlobalState;
};
if (!globalStore.__openclawDiagnosticEventsState) {
globalStore.__openclawDiagnosticEventsState = {
enabled: true,
seq: 0,
listeners: new Set<(evt: DiagnosticEventPayload) => void>(),
dispatchDepth: 0,
asyncQueue: [],
asyncDrainScheduled: false,
};
}
return globalStore.__openclawDiagnosticEventsState;
return diagnosticEventsState;
}
export function isDiagnosticsEnabled(config?: OpenClawConfig): boolean {
@@ -420,6 +431,7 @@ export function areDiagnosticsEnabledForProcess(): boolean {
function dispatchDiagnosticEvent(
state: DiagnosticEventsGlobalState,
enriched: DiagnosticEventPayload,
metadata: DiagnosticEventMetadata,
): void {
if (state.dispatchDepth > 100) {
console.error(
@@ -432,7 +444,7 @@ function dispatchDiagnosticEvent(
try {
for (const listener of state.listeners) {
try {
listener(enriched);
listener(cloneDiagnosticEventForListener(enriched), Object.freeze({ ...metadata }));
} catch (err) {
const errorMessage =
err instanceof Error
@@ -451,6 +463,30 @@ function dispatchDiagnosticEvent(
}
}
function cloneDiagnosticEventForListener(event: DiagnosticEventPayload): DiagnosticEventPayload {
return deepFreezeDiagnosticValue(structuredClone(event)) as DiagnosticEventPayload;
}
function deepFreezeDiagnosticValue(value: unknown, seen = new WeakSet<object>()): unknown {
if (!value || typeof value !== "object") {
return value;
}
if (seen.has(value)) {
return value;
}
seen.add(value);
if (Array.isArray(value)) {
for (const item of value) {
deepFreezeDiagnosticValue(item, seen);
}
return Object.freeze(value);
}
for (const nested of Object.values(value as Record<string, unknown>)) {
deepFreezeDiagnosticValue(nested, seen);
}
return Object.freeze(value);
}
function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void {
if (state.asyncDrainScheduled) {
return;
@@ -459,8 +495,8 @@ function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void
setImmediate(() => {
state.asyncDrainScheduled = false;
const batch = state.asyncQueue.splice(0);
for (const event of batch) {
dispatchDiagnosticEvent(state, event);
for (const entry of batch) {
dispatchDiagnosticEvent(state, entry.event, entry.metadata);
}
if (state.asyncQueue.length > 0) {
scheduleAsyncDiagnosticDrain(state);
@@ -468,33 +504,53 @@ function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void
});
}
export function emitDiagnosticEvent(event: DiagnosticEventInput) {
function enrichDiagnosticEvent(
state: DiagnosticEventsGlobalState,
event: DiagnosticEventInput,
): DiagnosticEventPayload {
const enriched = {} as DiagnosticEventPayload & Record<string, unknown>;
for (const [key, value] of Object.entries(event as Record<string, unknown>)) {
if (isBlockedObjectKey(key)) {
continue;
}
enriched[key] = value;
}
state.seq += 1;
enriched.seq = state.seq;
enriched.ts = Date.now();
return enriched;
}
function emitDiagnosticEventWithTrust(event: DiagnosticEventInput, trusted: boolean) {
const state = getDiagnosticEventsState();
if (!state.enabled) {
return;
}
const enriched = {
...event,
seq: (state.seq += 1),
ts: Date.now(),
} satisfies DiagnosticEventPayload;
const enriched = enrichDiagnosticEvent(state, event);
const metadata: DiagnosticEventMetadata = { trusted };
if (ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) {
if (state.asyncQueue.length >= MAX_ASYNC_DIAGNOSTIC_EVENTS) {
return;
}
state.asyncQueue.push(enriched);
state.asyncQueue.push({ event: enriched, metadata });
scheduleAsyncDiagnosticDrain(state);
return;
}
dispatchDiagnosticEvent(state, enriched);
dispatchDiagnosticEvent(state, enriched, metadata);
}
export function onInternalDiagnosticEvent(
listener: (evt: DiagnosticEventPayload) => void,
): () => void {
export function emitDiagnosticEvent(event: DiagnosticEventInput) {
emitDiagnosticEventWithTrust(event, false);
}
export function emitTrustedDiagnosticEvent(event: DiagnosticEventInput) {
emitDiagnosticEventWithTrust(event, true);
}
export function onInternalDiagnosticEvent(listener: DiagnosticEventListener): () => void {
const state = getDiagnosticEventsState();
state.listeners.add(listener);
return () => {
@@ -503,8 +559,8 @@ export function onInternalDiagnosticEvent(
}
export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void {
return onInternalDiagnosticEvent((event) => {
if (event.type === "log.record") {
return onInternalDiagnosticEvent((event, metadata) => {
if (metadata.trusted || event.type === "log.record") {
return;
}
listener(event);

View File

@@ -2,12 +2,9 @@
// Keep this list additive and scoped to the bundled diagnostics-otel surface.
export type { DiagnosticEventPayload } from "../infra/diagnostic-events.js";
export type { DiagnosticEventMetadata } from "../infra/diagnostic-events.js";
export type { DiagnosticTraceContext } from "../infra/diagnostic-trace-context.js";
export {
emitDiagnosticEvent,
onDiagnosticEvent,
onInternalDiagnosticEvent,
} from "../infra/diagnostic-events.js";
export { emitDiagnosticEvent, onDiagnosticEvent } from "../infra/diagnostic-events.js";
export {
createChildDiagnosticTraceContext,
createDiagnosticTraceContext,

View File

@@ -31,7 +31,13 @@ export async function drainPendingDeliveries(opts: DrainPendingDeliveriesOptions
export * from "../infra/backoff.js";
export * from "../infra/channel-activity.js";
export * from "../infra/dedupe.js";
export * from "../infra/diagnostic-events.js";
export type * from "../infra/diagnostic-events.js";
export {
areDiagnosticsEnabledForProcess,
emitDiagnosticEvent,
isDiagnosticsEnabled,
onDiagnosticEvent,
} from "../infra/diagnostic-events.js";
export * from "../infra/diagnostic-flags.js";
export * from "../infra/env.js";
export * from "../infra/errors.js";

View File

@@ -189,6 +189,7 @@ export type PluginServiceRegistration = {
pluginName?: string;
service: OpenClawPluginService;
source: string;
origin: PluginOrigin;
rootDir?: string;
};

View File

@@ -1198,6 +1198,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
pluginName: record.name,
service,
source: record.source,
origin: record.origin,
rootDir: record.rootDir,
});
};

View File

@@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { PluginOrigin } from "./plugin-origin.types.js";
import { createEmptyPluginRegistry } from "./registry.js";
import type { OpenClawPluginService, OpenClawPluginServiceContext } from "./types.js";
@@ -17,12 +18,17 @@ vi.mock("../logging/subsystem.js", () => ({
import { STATE_DIR } from "../config/paths.js";
import { startPluginServices } from "./services.js";
function createRegistry(services: OpenClawPluginService[]) {
function createRegistry(
services: OpenClawPluginService[],
pluginId = "plugin:test",
origin: PluginOrigin = "workspace",
) {
const registry = createEmptyPluginRegistry();
registry.services = services.map((service) => ({
pluginId: "plugin:test",
pluginId,
service,
source: "test",
origin,
rootDir: "/plugins/test-plugin",
})) as typeof registry.services;
return registry;
@@ -173,4 +179,26 @@ describe("startPluginServices", () => {
expect(stopOk).toHaveBeenCalledOnce();
expect(stopThrows).toHaveBeenCalledOnce();
});
it("grants internal diagnostics only to the bundled diagnostics OTEL service", async () => {
const contexts: OpenClawPluginServiceContext[] = [];
const diagnosticsService = createTrackingService("diagnostics-otel", { contexts });
await startPluginServices({
registry: createRegistry([diagnosticsService], "diagnostics-otel", "bundled"),
config: createServiceConfig(),
});
expect(contexts[0]?.internalDiagnostics?.onEvent).toBeTypeOf("function");
const untrustedContexts: OpenClawPluginServiceContext[] = [];
const untrustedService = createTrackingService("diagnostics-otel", {
contexts: untrustedContexts,
});
await startPluginServices({
registry: createRegistry([untrustedService], "diagnostics-otel", "workspace"),
config: createServiceConfig(),
});
expect(untrustedContexts[0]?.internalDiagnostics).toBeUndefined();
});
});

View File

@@ -1,6 +1,8 @@
import { STATE_DIR } from "../config/paths.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { onInternalDiagnosticEvent } from "../infra/diagnostic-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { PluginServiceRegistration } from "./registry-types.js";
import type { PluginRegistry } from "./registry.js";
import type { OpenClawPluginServiceContext, PluginLogger } from "./types.js";
@@ -17,12 +19,18 @@ function createPluginLogger(): PluginLogger {
function createServiceContext(params: {
config: OpenClawConfig;
workspaceDir?: string;
service?: PluginServiceRegistration;
}): OpenClawPluginServiceContext {
return {
config: params.config,
workspaceDir: params.workspaceDir,
stateDir: STATE_DIR,
logger: createPluginLogger(),
...(params.service?.origin === "bundled" &&
params.service.pluginId === "diagnostics-otel" &&
params.service.service.id === "diagnostics-otel"
? { internalDiagnostics: { onEvent: onInternalDiagnosticEvent } }
: {}),
};
}
@@ -39,13 +47,13 @@ export async function startPluginServices(params: {
id: string;
stop?: () => void | Promise<void>;
}> = [];
const serviceContext = createServiceContext({
config: params.config,
workspaceDir: params.workspaceDir,
});
for (const entry of params.registry.services) {
const service = entry.service;
const serviceContext = createServiceContext({
config: params.config,
workspaceDir: params.workspaceDir,
service: entry,
});
try {
await service.start(serviceContext);
running.push({

View File

@@ -27,6 +27,10 @@ import type { OperatorScope } from "../gateway/operator-scopes.js";
import type { GatewayRequestHandler } from "../gateway/server-methods/types.js";
import type { InternalHookHandler } from "../hooks/internal-hook-types.js";
import type { ImageGenerationProvider } from "../image-generation/types.js";
import type {
DiagnosticEventMetadata,
DiagnosticEventPayload,
} from "../infra/diagnostic-events.js";
import type { ProviderUsageSnapshot } from "../infra/provider-usage.types.js";
import type { MediaUnderstandingProvider } from "../media-understanding/types.js";
import type { MusicGenerationProvider } from "../music-generation/types.js";
@@ -1971,6 +1975,11 @@ export type OpenClawPluginServiceContext = {
workspaceDir?: string;
stateDir: string;
logger: PluginLogger;
internalDiagnostics?: {
onEvent: (
listener: (event: DiagnosticEventPayload, metadata: DiagnosticEventMetadata) => void,
) => () => void;
};
};
/** Background service registered by a plugin during `register(api)`. */

View File

@@ -251,6 +251,19 @@ describe("local-heavy-check-runtime", () => {
]);
});
it("honors an explicit oxlint thread count", () => {
const { args } = applyLocalOxlintPolicy(["--threads=8"], makeEnv(), ROOMY_HOST);
expect(args).toEqual([
"--threads=8",
"--type-aware",
"--tsconfig",
"tsconfig.oxlint.json",
"--report-unused-disable-directives-severity",
"error",
]);
});
it("allows forcing full-speed oxlint runs on roomy hosts", () => {
const { args } = applyLocalOxlintPolicy(
[],