mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-22 13:38:07 +00:00
Fix OTLP log trace correlation (#92276)
* fix diagnostics otel log trace correlation * test diagnostics trace provenance contract
This commit is contained in:
@@ -171,6 +171,7 @@ import {
|
||||
type DiagnosticEventPrivateData,
|
||||
} from "openclaw/plugin-sdk/diagnostic-runtime";
|
||||
import {
|
||||
emitDiagnosticEventWithTrustedTraceContext,
|
||||
emitInternalDiagnosticEventForTest,
|
||||
logMessageDispatchStarted,
|
||||
logMessageProcessed,
|
||||
@@ -362,7 +363,11 @@ function histogramCreateOptions(name: string) {
|
||||
|
||||
async function emitAndCaptureLog(
|
||||
event: Omit<Extract<Parameters<typeof emitDiagnosticEvent>[0], { type: "log.record" }>, "type">,
|
||||
options: { captureContent?: OtelContextFlags["captureContent"]; trusted?: boolean } = {},
|
||||
options: {
|
||||
captureContent?: OtelContextFlags["captureContent"];
|
||||
trusted?: boolean;
|
||||
trustedTraceContext?: boolean;
|
||||
} = {},
|
||||
) {
|
||||
const service = createDiagnosticsOtelService();
|
||||
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, {
|
||||
@@ -370,7 +375,11 @@ async function emitAndCaptureLog(
|
||||
...(options.captureContent !== undefined ? { captureContent: options.captureContent } : {}),
|
||||
});
|
||||
await service.start(ctx);
|
||||
const emit = options.trusted ? emitTrustedDiagnosticEvent : emitDiagnosticEvent;
|
||||
const emit = options.trusted
|
||||
? emitTrustedDiagnosticEvent
|
||||
: options.trustedTraceContext
|
||||
? emitDiagnosticEventWithTrustedTraceContext
|
||||
: emitDiagnosticEvent;
|
||||
emit({
|
||||
type: "log.record",
|
||||
...event,
|
||||
@@ -1391,6 +1400,28 @@ describe("diagnostics-otel service", () => {
|
||||
expect(emitCall?.context).toBeUndefined();
|
||||
});
|
||||
|
||||
test("attaches trace-only trusted context to exported logs", async () => {
|
||||
const emitCall = await emitAndCaptureLog(
|
||||
{
|
||||
level: "INFO",
|
||||
message: "traceable log",
|
||||
trace: {
|
||||
traceId: TRACE_ID,
|
||||
spanId: SPAN_ID,
|
||||
traceFlags: "01",
|
||||
},
|
||||
},
|
||||
{ trustedTraceContext: true },
|
||||
);
|
||||
|
||||
expect(emitCall?.body).toBe("log");
|
||||
expect(telemetryState.tracer.setSpanContext).toHaveBeenCalledTimes(1);
|
||||
const emitContext = emitCall?.context as { spanContext?: Record<string, unknown> } | undefined;
|
||||
const emitSpanContext = emitContext?.spanContext;
|
||||
expect(emitSpanContext?.traceId).toBe(TRACE_ID);
|
||||
expect(emitSpanContext?.spanId).toBe(SPAN_ID);
|
||||
});
|
||||
|
||||
test("attaches trusted diagnostic trace context to exported logs", async () => {
|
||||
const emitCall = await emitAndCaptureLog(
|
||||
{
|
||||
|
||||
@@ -1031,7 +1031,9 @@ function contextForTrustedTraceContext(
|
||||
evt: DiagnosticEventPayload,
|
||||
metadata: DiagnosticEventMetadata,
|
||||
) {
|
||||
return metadata.trusted ? contextForTraceContext(evt.trace) : undefined;
|
||||
return metadata.trusted || metadata.trustedTraceContext === true
|
||||
? contextForTraceContext(evt.trace)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function addTraceAttributes(
|
||||
@@ -1626,7 +1628,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
|
||||
if (evt.code?.functionName) {
|
||||
assignOtelLogAttribute(attributes, "code.function", evt.code.functionName);
|
||||
}
|
||||
if (metadata.trusted) {
|
||||
if (metadata.trusted || metadata.trustedTraceContext === true) {
|
||||
addTraceAttributes(attributes, evt.trace);
|
||||
}
|
||||
|
||||
|
||||
@@ -79,6 +79,8 @@ type CapturedMetric = {
|
||||
|
||||
type CapturedLogRecord = {
|
||||
body: string | number | boolean | string[];
|
||||
spanId: string;
|
||||
traceId: string;
|
||||
};
|
||||
|
||||
const DEFAULT_SCENARIO_ID = "otel-trace-smoke";
|
||||
@@ -646,15 +648,21 @@ function decodeMetricRequest(body: Buffer): CapturedMetric[] {
|
||||
function decodeLogRecord(message: Uint8Array): CapturedLogRecord {
|
||||
const reader = new ProtoReader(message);
|
||||
let body: string | number | boolean | string[] = "";
|
||||
let traceId = "";
|
||||
let spanId = "";
|
||||
while (!reader.done()) {
|
||||
const { field, wire } = reader.tag();
|
||||
if (field === 5 && wire === 2) {
|
||||
body = normalizeOtlpValue(decodeAnyValue(reader.bytes()));
|
||||
} else if (field === 9 && wire === 2) {
|
||||
traceId = Buffer.from(reader.bytes()).toString("hex");
|
||||
} else if (field === 10 && wire === 2) {
|
||||
spanId = Buffer.from(reader.bytes()).toString("hex");
|
||||
} else {
|
||||
reader.skip(wire);
|
||||
}
|
||||
}
|
||||
return { body };
|
||||
return { body, spanId, traceId };
|
||||
}
|
||||
|
||||
function decodeScopeLogs(message: Uint8Array): CapturedLogRecord[] {
|
||||
@@ -1439,6 +1447,12 @@ function assertSmoke(params: {
|
||||
if (rawLogBodies.length > 0) {
|
||||
failures.push(`OTLP log records exported ${rawLogBodies.length} non-placeholder bodies`);
|
||||
}
|
||||
const correlatedLogRecords = params.logRecords.filter(
|
||||
(record) => record.traceId && record.spanId,
|
||||
);
|
||||
if (correlatedLogRecords.length === 0) {
|
||||
failures.push("no OTLP log records included trace/span correlation ids");
|
||||
}
|
||||
|
||||
const attributeKeys = collectAttributeKeys(params.spans);
|
||||
const disallowed = [...DISALLOWED_ATTRIBUTE_KEYS].filter((key) => attributeKeys.has(key));
|
||||
@@ -1568,6 +1582,9 @@ async function main() {
|
||||
spanCount: receiver.capturedSpans.length,
|
||||
metricCount: receiver.capturedMetrics.length,
|
||||
logRecordCount: receiver.capturedLogRecords.length,
|
||||
logRecordsWithTraceContext: receiver.capturedLogRecords.filter(
|
||||
(record) => record.traceId && record.spanId,
|
||||
).length,
|
||||
spanNames: assertion.spanNames,
|
||||
metricNames: assertion.metricNames,
|
||||
signalRequestCounts: assertion.signalRequestCounts,
|
||||
|
||||
@@ -240,6 +240,12 @@ describe("diagnostic-events", () => {
|
||||
|
||||
expect(traceparents).toEqual([undefined, `00-${trace.traceId}-${trace.spanId}-01`]);
|
||||
expect(formatDiagnosticTraceparentForPropagation({ trace }, { trusted: true })).toBeUndefined();
|
||||
expect(
|
||||
formatDiagnosticTraceparentForPropagation(
|
||||
{ trace },
|
||||
{ trusted: false, trustedTraceContext: true },
|
||||
),
|
||||
).toBeUndefined();
|
||||
});
|
||||
|
||||
it("shares diagnostic state across duplicate module instances", async () => {
|
||||
|
||||
@@ -704,6 +704,7 @@ export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event
|
||||
|
||||
export type DiagnosticEventMetadata = Readonly<{
|
||||
internal?: boolean;
|
||||
trustedTraceContext?: boolean;
|
||||
trusted: boolean;
|
||||
}>;
|
||||
|
||||
@@ -1068,6 +1069,7 @@ function createInternalDiagnosticMetadata(trusted: boolean): DiagnosticEventMeta
|
||||
type EmitDiagnosticEventOptions = {
|
||||
internal?: boolean;
|
||||
privateData?: DiagnosticEventPrivateData;
|
||||
trustedTraceContext?: boolean;
|
||||
};
|
||||
|
||||
function emitDiagnosticEventWithTrust(
|
||||
@@ -1082,7 +1084,11 @@ function emitDiagnosticEventWithTrust(
|
||||
|
||||
const enriched = enrichDiagnosticEvent(state, event);
|
||||
const { internal = false, privateData } = options;
|
||||
const metadata = internal ? createInternalDiagnosticMetadata(trusted) : { trusted };
|
||||
const trustedTraceContext = options.trustedTraceContext === true;
|
||||
const metadata = {
|
||||
...(internal ? createInternalDiagnosticMetadata(trusted) : { trusted }),
|
||||
...(trustedTraceContext ? { trustedTraceContext } : {}),
|
||||
};
|
||||
|
||||
if (ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) {
|
||||
if (state.asyncQueue.length >= MAX_ASYNC_DIAGNOSTIC_EVENTS) {
|
||||
@@ -1108,6 +1114,11 @@ export function emitDiagnosticEvent(event: DiagnosticEventInput) {
|
||||
emitDiagnosticEventWithTrust(event, false);
|
||||
}
|
||||
|
||||
/** Emits an untrusted event whose trace context came from OpenClaw-owned scope. */
|
||||
export function emitDiagnosticEventWithTrustedTraceContext(event: DiagnosticEventInput) {
|
||||
emitDiagnosticEventWithTrust(event, false, { trustedTraceContext: true });
|
||||
}
|
||||
|
||||
/** Emits an untrusted diagnostic event tagged as internal dispatcher provenance. */
|
||||
export function emitInternalDiagnosticEvent(event: DiagnosticEventInput) {
|
||||
emitDiagnosticEventWithTrust(event, false, { internal: true });
|
||||
|
||||
@@ -3,6 +3,7 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
onInternalDiagnosticEvent,
|
||||
resetDiagnosticEventsForTest,
|
||||
type DiagnosticEventMetadata,
|
||||
type DiagnosticEventPayload,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
import {
|
||||
@@ -37,10 +38,13 @@ afterEach(() => {
|
||||
|
||||
describe("diagnostic log events", () => {
|
||||
it("emits structured log records through diagnostics", async () => {
|
||||
const received: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((evt) => {
|
||||
const received: Array<{
|
||||
event: Extract<DiagnosticEventPayload, { type: "log.record" }>;
|
||||
metadata: DiagnosticEventMetadata;
|
||||
}> = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((evt, metadata) => {
|
||||
if (evt.type === "log.record") {
|
||||
received.push(evt);
|
||||
received.push({ event: evt, metadata });
|
||||
}
|
||||
});
|
||||
|
||||
@@ -53,10 +57,11 @@ describe("diagnostic log events", () => {
|
||||
unsubscribe();
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
const [event] = received;
|
||||
if (!event) {
|
||||
const [record] = received;
|
||||
if (!record) {
|
||||
throw new Error("missing diagnostic log event");
|
||||
}
|
||||
const { event, metadata } = record;
|
||||
expect(event.type).toBe("log.record");
|
||||
expect(event.level).toBe("INFO");
|
||||
expect(event.message).toBe("hello diagnostic logs");
|
||||
@@ -68,6 +73,8 @@ describe("diagnostic log events", () => {
|
||||
traceId: TRACE_ID,
|
||||
spanId: SPAN_ID,
|
||||
});
|
||||
expect(metadata.trusted).toBe(false);
|
||||
expect(metadata.trustedTraceContext).toBeUndefined();
|
||||
});
|
||||
|
||||
it("uses active request trace context for unbound log records", async () => {
|
||||
@@ -75,10 +82,13 @@ describe("diagnostic log events", () => {
|
||||
traceId: TRACE_ID,
|
||||
spanId: SPAN_ID,
|
||||
});
|
||||
const received: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((evt) => {
|
||||
const received: Array<{
|
||||
event: Extract<DiagnosticEventPayload, { type: "log.record" }>;
|
||||
metadata: DiagnosticEventMetadata;
|
||||
}> = [];
|
||||
const unsubscribe = onInternalDiagnosticEvent((evt, metadata) => {
|
||||
if (evt.type === "log.record") {
|
||||
received.push(evt);
|
||||
received.push({ event: evt, metadata });
|
||||
}
|
||||
});
|
||||
|
||||
@@ -90,7 +100,9 @@ describe("diagnostic log events", () => {
|
||||
unsubscribe();
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]?.trace).toEqual(trace);
|
||||
expect(received[0]?.event.trace).toEqual(trace);
|
||||
expect(received[0]?.metadata.trusted).toBe(false);
|
||||
expect(received[0]?.metadata.trustedTraceContext).toBe(true);
|
||||
});
|
||||
|
||||
it("redacts and bounds internal log records before diagnostic emission", async () => {
|
||||
|
||||
@@ -4,7 +4,10 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { Logger as TsLogger } from "tslog";
|
||||
import type { OpenClawConfig } from "../config/types.js";
|
||||
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
|
||||
import {
|
||||
emitDiagnosticEvent,
|
||||
emitDiagnosticEventWithTrustedTraceContext,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
import {
|
||||
getActiveDiagnosticTraceContext,
|
||||
isValidDiagnosticSpanId,
|
||||
@@ -359,9 +362,23 @@ function findLogTraceContext(
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function resolveLogTraceContext(
|
||||
bindings: Record<string, unknown> | undefined,
|
||||
numericArgs: readonly unknown[],
|
||||
): { trace?: DiagnosticTraceContext; trustedTraceContext: boolean } {
|
||||
const explicitTrace = findLogTraceContext(bindings, numericArgs);
|
||||
if (explicitTrace) {
|
||||
return { trace: explicitTrace, trustedTraceContext: false };
|
||||
}
|
||||
const activeTrace = getActiveDiagnosticTraceContext();
|
||||
return activeTrace
|
||||
? { trace: activeTrace, trustedTraceContext: true }
|
||||
: { trustedTraceContext: false };
|
||||
}
|
||||
|
||||
function buildTraceFileLogFields(logObj: TsLogRecord): Record<string, string> | undefined {
|
||||
const { bindings, args } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj));
|
||||
const trace = findLogTraceContext(bindings, args) ?? getActiveDiagnosticTraceContext();
|
||||
const { trace } = resolveLogTraceContext(bindings, args);
|
||||
if (!trace) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -410,7 +427,7 @@ function buildDiagnosticLogRecord(logObj: TsLogRecord) {
|
||||
| undefined;
|
||||
const { bindings, args: numericArgs } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj));
|
||||
|
||||
const trace = findLogTraceContext(bindings, numericArgs) ?? getActiveDiagnosticTraceContext();
|
||||
const { trace, trustedTraceContext } = resolveLogTraceContext(bindings, numericArgs);
|
||||
const structuredArg = numericArgs[0];
|
||||
const structuredBindings = isPlainLogRecordObject(structuredArg) ? structuredArg : undefined;
|
||||
if (structuredBindings) {
|
||||
@@ -456,14 +473,17 @@ function buildDiagnosticLogRecord(logObj: TsLogRecord) {
|
||||
.filter((name): name is string => Boolean(name));
|
||||
|
||||
return {
|
||||
type: "log.record" as const,
|
||||
level: meta?.logLevelName ?? "INFO",
|
||||
message,
|
||||
...(loggerName ? { loggerName } : {}),
|
||||
...(loggerParents?.length ? { loggerParents } : {}),
|
||||
...(Object.keys(attributes).length > 0 ? { attributes } : {}),
|
||||
...(Object.keys(code).length > 0 ? { code } : {}),
|
||||
...(trace ? { trace } : {}),
|
||||
event: {
|
||||
type: "log.record" as const,
|
||||
level: meta?.logLevelName ?? "INFO",
|
||||
message,
|
||||
...(loggerName ? { loggerName } : {}),
|
||||
...(loggerParents?.length ? { loggerParents } : {}),
|
||||
...(Object.keys(attributes).length > 0 ? { attributes } : {}),
|
||||
...(Object.keys(code).length > 0 ? { code } : {}),
|
||||
...(trace ? { trace } : {}),
|
||||
},
|
||||
trustedTraceContext,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -478,9 +498,11 @@ function redactLogRecordForTransport<T extends LogObj>(record: T): T {
|
||||
function attachDiagnosticEventTransport(logger: TsLogger<LogObj>): void {
|
||||
logger.attachTransport((logObj: LogObj) => {
|
||||
try {
|
||||
emitDiagnosticEvent(
|
||||
buildDiagnosticLogRecord(redactLogRecordForTransport(logObj) as TsLogRecord),
|
||||
);
|
||||
const record = buildDiagnosticLogRecord(redactLogRecordForTransport(logObj) as TsLogRecord);
|
||||
const emit = record.trustedTraceContext
|
||||
? emitDiagnosticEventWithTrustedTraceContext
|
||||
: emitDiagnosticEvent;
|
||||
emit(record.event);
|
||||
} catch {
|
||||
// never block on logging failures
|
||||
}
|
||||
|
||||
@@ -14,7 +14,10 @@ export {
|
||||
resolveWebSearchProviderContractEntriesForPluginId,
|
||||
} from "../plugins/contracts/registry.js";
|
||||
export { loadPluginManifestRegistry } from "../plugins/manifest-registry.js";
|
||||
export { emitInternalDiagnosticEvent as emitInternalDiagnosticEventForTest } from "../infra/diagnostic-events.js";
|
||||
export {
|
||||
emitDiagnosticEventWithTrustedTraceContext,
|
||||
emitInternalDiagnosticEvent as emitInternalDiagnosticEventForTest,
|
||||
} from "../infra/diagnostic-events.js";
|
||||
export { runWithDiagnosticTraceContext } from "../infra/diagnostic-trace-context.js";
|
||||
export { logMessageDispatchStarted, logMessageProcessed } from "../logging/diagnostic.js";
|
||||
export { resolveBundledExplicitProviderContractsFromPublicArtifacts } from "../plugins/provider-contract-public-artifacts.js";
|
||||
|
||||
Reference in New Issue
Block a user