feat(logging): propagate request trace scopes

This commit is contained in:
Vincent Koc
2026-04-26 13:55:36 -07:00
parent e3cbad4fb6
commit 3ae6f01d61
10 changed files with 319 additions and 10 deletions

View File

@@ -0,0 +1,101 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import {
emitDiagnosticEvent,
onDiagnosticEvent,
resetDiagnosticEventsForTest,
} from "../infra/diagnostic-events.js";
import {
getActiveDiagnosticTraceContext,
resetDiagnosticTraceContextForTest,
type DiagnosticTraceContext,
} from "../infra/diagnostic-trace-context.js";
import { getLogger, resetLogger, setLoggerOverride } from "../logging.js";
import type { ResolvedGatewayAuth } from "./auth.js";
import { createGatewayHttpServer } from "./server-http.js";
import { withTempConfig } from "./test-temp-config.js";
const resolvedAuth: ResolvedGatewayAuth = { mode: "none", allowTailscale: false };
async function listen(server: ReturnType<typeof createGatewayHttpServer>): Promise<number> {
return await new Promise<number>((resolve) => {
server.listen(0, "127.0.0.1", () => {
const address = server.address();
resolve(typeof address === "object" && address ? address.port : 0);
});
});
}
async function closeServer(server: ReturnType<typeof createGatewayHttpServer>): Promise<void> {
await new Promise<void>((resolve, reject) =>
server.close((err) => (err ? reject(err) : resolve())),
);
}
afterEach(() => {
resetDiagnosticEventsForTest();
resetDiagnosticTraceContextForTest();
setLoggerOverride(null);
resetLogger();
});
describe("gateway HTTP request trace scope", () => {
it("threads active request trace through logs and diagnostics", async () => {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-gateway-request-trace-"));
const logPath = path.join(dir, "gateway.log");
const events: Array<{ trace?: DiagnosticTraceContext; type: string }> = [];
const stop = onDiagnosticEvent((event) => {
events.push({ trace: event.trace, type: event.type });
});
let activeTraceInHandler: DiagnosticTraceContext | undefined;
await withTempConfig({
cfg: { gateway: { auth: { mode: "none" } } },
run: async () => {
setLoggerOverride({ level: "info", file: logPath });
const httpServer = createGatewayHttpServer({
canvasHost: null,
clients: new Set(),
controlUiEnabled: false,
controlUiBasePath: "/__control__",
openAiChatCompletionsEnabled: false,
openResponsesEnabled: false,
handleHooksRequest: async (_req, res) => {
activeTraceInHandler = getActiveDiagnosticTraceContext();
getLogger().info({ route: "/hook" }, "handled request trace");
emitDiagnosticEvent({ type: "message.queued", source: "gateway-test" });
res.statusCode = 204;
res.end();
return true;
},
resolvedAuth,
});
const port = await listen(httpServer);
try {
const response = await fetch(`http://127.0.0.1:${port}/hook`);
expect(response.status).toBe(204);
} finally {
await closeServer(httpServer);
}
},
});
stop();
try {
expect(activeTraceInHandler?.traceId).toMatch(/^[0-9a-f]{32}$/);
expect(activeTraceInHandler?.spanId).toMatch(/^[0-9a-f]{16}$/);
expect(events).toEqual([{ trace: activeTraceInHandler, type: "message.queued" }]);
const [line] = fs.readFileSync(logPath, "utf8").trim().split("\n");
const record = JSON.parse(line ?? "{}") as Record<string, unknown>;
expect(record).toMatchObject({
traceId: activeTraceInHandler?.traceId,
spanId: activeTraceInHandler?.spanId,
});
} finally {
fs.rmSync(dir, { recursive: true, force: true });
}
});
});

View File

@@ -13,6 +13,10 @@ import type { CanvasHostHandler } from "../canvas-host/server.js";
import { resolveBundledChannelGatewayAuthBypassPaths } from "../channels/plugins/gateway-auth-bypass.js";
import { loadConfig } from "../config/config.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
createDiagnosticTraceContext,
runWithDiagnosticTraceContext,
} from "../infra/diagnostic-trace-context.js";
import type { createSubsystemLogger } from "../logging/subsystem.js";
import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../security/external-content.js";
import { safeEqualSecret } from "../security/secret-equal.js";
@@ -911,12 +915,18 @@ export function createGatewayHttpServer(opts: {
const openAiCompatEnabled = openAiChatCompletionsEnabled || openResponsesEnabled;
const httpServer: HttpServer = opts.tlsOptions
? createHttpsServer(opts.tlsOptions, (req, res) => {
void handleRequest(req, res);
void handleRequestWithTrace(req, res);
})
: createHttpServer((req, res) => {
void handleRequest(req, res);
void handleRequestWithTrace(req, res);
});
function handleRequestWithTrace(req: IncomingMessage, res: ServerResponse) {
return runWithDiagnosticTraceContext(createDiagnosticTraceContext(), () =>
handleRequest(req, res),
);
}
async function handleRequest(req: IncomingMessage, res: ServerResponse) {
setDefaultSecurityHeaders(res, {
strictTransportSecurity: strictTransportSecurityHeader,
@@ -1206,7 +1216,7 @@ export function attachGatewayUpgradeHandler(opts: {
} = opts;
const getResolvedAuth = opts.getResolvedAuth ?? (() => resolvedAuth);
httpServer.on("upgrade", (req, socket, head) => {
void (async () => {
void runWithDiagnosticTraceContext(createDiagnosticTraceContext(), async () => {
const configSnapshot = loadConfig();
const trustedProxies = configSnapshot.gateway?.trustedProxies ?? [];
const allowRealIpFallback = configSnapshot.gateway?.allowRealIpFallback === true;
@@ -1325,7 +1335,7 @@ export function attachGatewayUpgradeHandler(opts: {
releaseUpgradeBudget();
throw new Error("gateway websocket upgrade failed");
}
})().catch((err) => {
}).catch((err) => {
const remoteAddress = (socket as { remoteAddress?: string }).remoteAddress ?? "unknown";
const errorMessage = err instanceof Error ? err.message : String(err);
log?.warn(`ws upgrade error from ${remoteAddress}: ${errorMessage}`);

View File

@@ -1,6 +1,6 @@
import type { IncomingMessage } from "node:http";
import os from "node:os";
import type { WebSocket } from "ws";
import type { RawData, WebSocket } from "ws";
import { loadConfig } from "../../../config/config.js";
import {
getBoundDeviceBootstrapProfile,
@@ -26,6 +26,10 @@ import {
updatePairedDeviceMetadata,
verifyDeviceToken,
} from "../../../infra/device-pairing.js";
import {
createDiagnosticTraceContext,
runWithDiagnosticTraceContext,
} from "../../../infra/diagnostic-trace-context.js";
import {
getPairedNode,
requestNodePairing,
@@ -321,7 +325,7 @@ export function attachGatewayWsMessageHandler(params: {
authRateLimiter,
} = browserSecurity;
socket.on("message", async (data) => {
const handleMessage = async (data: RawData) => {
if (isClosed()) {
return;
}
@@ -1576,6 +1580,10 @@ export function attachGatewayWsMessageHandler(params: {
close();
}
}
};
socket.on("message", (data) => {
void runWithDiagnosticTraceContext(createDiagnosticTraceContext(), () => handleMessage(data));
});
}

View File

@@ -9,7 +9,11 @@ import {
resetDiagnosticEventsForTest,
setDiagnosticsEnabledForProcess,
} from "./diagnostic-events.js";
import { createDiagnosticTraceContext } from "./diagnostic-trace-context.js";
import {
createDiagnosticTraceContext,
resetDiagnosticTraceContextForTest,
runWithDiagnosticTraceContext,
} from "./diagnostic-trace-context.js";
describe("diagnostic-events", () => {
beforeEach(() => {
@@ -18,6 +22,7 @@ describe("diagnostic-events", () => {
afterEach(() => {
resetDiagnosticEventsForTest();
resetDiagnosticTraceContextForTest();
vi.restoreAllMocks();
});
@@ -117,6 +122,39 @@ describe("diagnostic-events", () => {
expect(events).toEqual([{ trace, type: "message.queued" }]);
});
it("uses active request trace context when events omit explicit trace", () => {
const trace = createDiagnosticTraceContext({
traceId: "4bf92f3577b34da6a3ce929d0e0e4736",
spanId: "00f067aa0ba902b7",
});
const explicitTrace = createDiagnosticTraceContext({
traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
spanId: "bbbbbbbbbbbbbbbb",
});
const events: Array<{ trace: typeof trace | undefined; type: string }> = [];
const stop = onDiagnosticEvent((event) => {
events.push({ trace: event.trace, type: event.type });
});
runWithDiagnosticTraceContext(trace, () => {
emitDiagnosticEvent({
type: "message.queued",
source: "telegram",
});
emitDiagnosticEvent({
type: "message.queued",
source: "telegram",
trace: explicitTrace,
});
});
stop();
expect(events).toEqual([
{ trace, type: "message.queued" },
{ trace: explicitTrace, type: "message.queued" },
]);
});
it("marks only internal trusted diagnostic emissions as trusted", async () => {
const events: Array<{
metadataTrusted: boolean;

View File

@@ -1,6 +1,7 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
formatDiagnosticTraceparent,
getActiveDiagnosticTraceContext,
type DiagnosticTraceContext,
} from "./diagnostic-trace-context.js";
import { isBlockedObjectKey } from "./prototype-keys.js";
@@ -659,6 +660,7 @@ function enrichDiagnosticEvent(
}
enriched[key] = value;
}
enriched.trace ??= getActiveDiagnosticTraceContext();
state.seq += 1;
enriched.seq = state.seq;
enriched.ts = Date.now();

View File

@@ -1,13 +1,16 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it } from "vitest";
import {
createChildDiagnosticTraceContext,
createDiagnosticTraceContext,
freezeDiagnosticTraceContext,
formatDiagnosticTraceparent,
getActiveDiagnosticTraceContext,
isValidDiagnosticSpanId,
isValidDiagnosticTraceFlags,
isValidDiagnosticTraceId,
parseDiagnosticTraceparent,
resetDiagnosticTraceContextForTest,
runWithDiagnosticTraceContext,
} from "./diagnostic-trace-context.js";
const TRACE_ID = "4bf92f3577b34da6a3ce929d0e0e4736";
@@ -15,6 +18,10 @@ const SPAN_ID = "00f067aa0ba902b7";
const CHILD_SPAN_ID = "7ad6b9a982deb2c9";
describe("diagnostic-trace-context", () => {
afterEach(() => {
resetDiagnosticTraceContextForTest();
});
it("validates W3C trace ids, span ids, and trace flags", () => {
expect(isValidDiagnosticTraceId(TRACE_ID)).toBe(true);
expect(isValidDiagnosticSpanId(SPAN_ID)).toBe(true);
@@ -127,4 +134,28 @@ describe("diagnostic-trace-context", () => {
expect(frozen).not.toBe(context);
expect(Object.isFrozen(frozen)).toBe(true);
});
it("carries active trace context across async work and restores outer scopes", async () => {
const outer = createDiagnosticTraceContext({
traceId: TRACE_ID,
spanId: SPAN_ID,
});
const inner = createChildDiagnosticTraceContext(outer, {
spanId: CHILD_SPAN_ID,
});
await runWithDiagnosticTraceContext(outer, async () => {
expect(getActiveDiagnosticTraceContext()).toEqual(outer);
await new Promise<void>((resolve) => setTimeout(resolve, 0));
expect(getActiveDiagnosticTraceContext()).toEqual(outer);
runWithDiagnosticTraceContext(inner, () => {
expect(getActiveDiagnosticTraceContext()).toEqual(inner);
});
expect(getActiveDiagnosticTraceContext()).toEqual(outer);
});
expect(getActiveDiagnosticTraceContext()).toBeUndefined();
});
});

View File

@@ -1,3 +1,4 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { randomBytes } from "node:crypto";
const TRACEPARENT_VERSION = "00";
@@ -7,6 +8,7 @@ const TRACE_ID_RE = /^[0-9a-f]{32}$/;
const SPAN_ID_RE = /^[0-9a-f]{16}$/;
const TRACE_FLAGS_RE = /^[0-9a-f]{2}$/;
const TRACEPARENT_VERSION_RE = /^[0-9a-f]{2}$/;
const DIAGNOSTIC_TRACE_SCOPE_STATE_KEY = Symbol.for("openclaw.diagnosticTraceScope.state.v1");
export type DiagnosticTraceContext = {
/** W3C trace id, 32 lowercase hex chars. */
@@ -23,6 +25,11 @@ export type DiagnosticTraceContextInput = Partial<DiagnosticTraceContext> & {
traceparent?: string;
};
type DiagnosticTraceScopeState = {
marker: symbol;
storage: AsyncLocalStorage<DiagnosticTraceContext>;
};
function randomHex(bytes: number): string {
return randomBytes(bytes).toString("hex");
}
@@ -47,6 +54,40 @@ function randomSpanId(): string {
return spanId;
}
function createDiagnosticTraceScopeState(): DiagnosticTraceScopeState {
return {
marker: DIAGNOSTIC_TRACE_SCOPE_STATE_KEY,
storage: new AsyncLocalStorage<DiagnosticTraceContext>(),
};
}
function isDiagnosticTraceScopeState(value: unknown): value is DiagnosticTraceScopeState {
if (!value || typeof value !== "object") {
return false;
}
const candidate = value as Partial<DiagnosticTraceScopeState>;
return (
candidate.marker === DIAGNOSTIC_TRACE_SCOPE_STATE_KEY &&
candidate.storage instanceof AsyncLocalStorage
);
}
function getDiagnosticTraceScopeState(): DiagnosticTraceScopeState {
const globalRecord = globalThis as Record<PropertyKey, unknown>;
const existing = globalRecord[DIAGNOSTIC_TRACE_SCOPE_STATE_KEY];
if (isDiagnosticTraceScopeState(existing)) {
return existing;
}
const state = createDiagnosticTraceScopeState();
Object.defineProperty(globalThis, DIAGNOSTIC_TRACE_SCOPE_STATE_KEY, {
configurable: true,
enumerable: false,
value: state,
writable: false,
});
return state;
}
export function isValidDiagnosticTraceId(value: unknown): value is string {
return typeof value === "string" && TRACE_ID_RE.test(value) && isNonZeroHex(value);
}
@@ -167,3 +208,25 @@ export function freezeDiagnosticTraceContext(
...(context.traceFlags ? { traceFlags: context.traceFlags } : {}),
});
}
export function getActiveDiagnosticTraceContext(): DiagnosticTraceContext | undefined {
return getDiagnosticTraceScopeState().storage.getStore();
}
export function runWithDiagnosticTraceContext<T>(
trace: DiagnosticTraceContext,
callback: () => T,
): T {
return getDiagnosticTraceScopeState().storage.run(freezeDiagnosticTraceContext(trace), callback);
}
export function runWithNewDiagnosticTraceContext<T>(
input: DiagnosticTraceContextInput,
callback: () => T,
): T {
return runWithDiagnosticTraceContext(createDiagnosticTraceContext(input), callback);
}
export function resetDiagnosticTraceContextForTest(): void {
getDiagnosticTraceScopeState().storage.disable();
}

View File

@@ -4,6 +4,11 @@ import {
resetDiagnosticEventsForTest,
type DiagnosticEventPayload,
} from "../infra/diagnostic-events.js";
import {
createDiagnosticTraceContext,
resetDiagnosticTraceContextForTest,
runWithDiagnosticTraceContext,
} from "../infra/diagnostic-trace-context.js";
import { getChildLogger, resetLogger, setLoggerOverride } from "./logger.js";
const TRACE_ID = "4bf92f3577b34da6a3ce929d0e0e4736";
@@ -22,6 +27,7 @@ beforeEach(() => {
afterEach(() => {
resetDiagnosticEventsForTest();
resetDiagnosticTraceContextForTest();
setLoggerOverride(null);
resetLogger();
});
@@ -59,6 +65,29 @@ describe("diagnostic log events", () => {
});
});
it("uses active request trace context for unbound log records", async () => {
const trace = createDiagnosticTraceContext({
traceId: TRACE_ID,
spanId: SPAN_ID,
});
const received: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
const unsubscribe = onInternalDiagnosticEvent((evt) => {
if (evt.type === "log.record") {
received.push(evt);
}
});
runWithDiagnosticTraceContext(trace, () => {
const logger = getChildLogger({ subsystem: "diagnostic" });
logger.info({ runId: "run-1" }, "request-scoped diagnostic log");
});
await flushDiagnosticEvents();
unsubscribe();
expect(received).toHaveLength(1);
expect(received[0]?.trace).toEqual(trace);
});
it("redacts and bounds internal log records before diagnostic emission", async () => {
const received: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
const unsubscribe = onInternalDiagnosticEvent((evt) => {

View File

@@ -1,5 +1,10 @@
import fs from "node:fs";
import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest";
import {
createDiagnosticTraceContext,
resetDiagnosticTraceContextForTest,
runWithDiagnosticTraceContext,
} from "../infra/diagnostic-trace-context.js";
import { getChildLogger, getLogger, resetLogger, setLoggerOverride } from "../logging.js";
import { createSuiteLogPathTracker } from "./log-test-helpers.js";
@@ -25,6 +30,7 @@ afterEach(() => {
} else {
process.env.OPENCLAW_TEST_FILE_LOG = originalTestFileLog;
}
resetDiagnosticTraceContextForTest();
resetLogger();
setLoggerOverride(null);
});
@@ -97,4 +103,24 @@ describe("file log redaction", () => {
spanId: SPAN_ID,
});
});
it("writes active request trace context as top-level JSONL fields", () => {
const logPath = logPathTracker.nextPath();
setLoggerOverride({ level: "info", file: logPath });
const trace = createDiagnosticTraceContext({
traceId: TRACE_ID,
spanId: SPAN_ID,
});
runWithDiagnosticTraceContext(trace, () => {
getLogger().info({ route: "/api/health" }, "request completed");
});
const [line] = fs.readFileSync(logPath, "utf8").trim().split("\n");
const record = JSON.parse(line ?? "{}") as Record<string, unknown>;
expect(record).toMatchObject({
traceId: TRACE_ID,
spanId: SPAN_ID,
});
});
});

View File

@@ -4,6 +4,7 @@ import { Logger as TsLogger } from "tslog";
import type { OpenClawConfig } from "../config/types.js";
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import {
getActiveDiagnosticTraceContext,
isValidDiagnosticSpanId,
isValidDiagnosticTraceFlags,
isValidDiagnosticTraceId,
@@ -252,7 +253,7 @@ function findLogTraceContext(
function buildTraceFileLogFields(logObj: TsLogRecord): Record<string, string> | undefined {
const { bindings, args } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj));
const trace = findLogTraceContext(bindings, args);
const trace = findLogTraceContext(bindings, args) ?? getActiveDiagnosticTraceContext();
if (!trace) {
return undefined;
}
@@ -282,7 +283,7 @@ function buildDiagnosticLogRecord(logObj: TsLogRecord) {
| undefined;
const { bindings, args: numericArgs } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj));
const trace = findLogTraceContext(bindings, numericArgs);
const trace = findLogTraceContext(bindings, numericArgs) ?? getActiveDiagnosticTraceContext();
const structuredArg = numericArgs[0];
const structuredBindings = isPlainLogRecordObject(structuredArg) ? structuredArg : undefined;
if (structuredBindings) {