diff --git a/src/gateway/server-http.request-trace.test.ts b/src/gateway/server-http.request-trace.test.ts new file mode 100644 index 00000000000..e45e6898bde --- /dev/null +++ b/src/gateway/server-http.request-trace.test.ts @@ -0,0 +1,101 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { + emitDiagnosticEvent, + onDiagnosticEvent, + resetDiagnosticEventsForTest, +} from "../infra/diagnostic-events.js"; +import { + getActiveDiagnosticTraceContext, + resetDiagnosticTraceContextForTest, + type DiagnosticTraceContext, +} from "../infra/diagnostic-trace-context.js"; +import { getLogger, resetLogger, setLoggerOverride } from "../logging.js"; +import type { ResolvedGatewayAuth } from "./auth.js"; +import { createGatewayHttpServer } from "./server-http.js"; +import { withTempConfig } from "./test-temp-config.js"; + +const resolvedAuth: ResolvedGatewayAuth = { mode: "none", allowTailscale: false }; + +async function listen(server: ReturnType): Promise { + return await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + resolve(typeof address === "object" && address ? address.port : 0); + }); + }); +} + +async function closeServer(server: ReturnType): Promise { + await new Promise((resolve, reject) => + server.close((err) => (err ? reject(err) : resolve())), + ); +} + +afterEach(() => { + resetDiagnosticEventsForTest(); + resetDiagnosticTraceContextForTest(); + setLoggerOverride(null); + resetLogger(); +}); + +describe("gateway HTTP request trace scope", () => { + it("threads active request trace through logs and diagnostics", async () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-gateway-request-trace-")); + const logPath = path.join(dir, "gateway.log"); + const events: Array<{ trace?: DiagnosticTraceContext; type: string }> = []; + const stop = onDiagnosticEvent((event) => { + events.push({ trace: event.trace, type: event.type }); + }); + let activeTraceInHandler: DiagnosticTraceContext | undefined; + + await withTempConfig({ + cfg: { gateway: { auth: { mode: "none" } } }, + run: async () => { + setLoggerOverride({ level: "info", file: logPath }); + const httpServer = createGatewayHttpServer({ + canvasHost: null, + clients: new Set(), + controlUiEnabled: false, + controlUiBasePath: "/__control__", + openAiChatCompletionsEnabled: false, + openResponsesEnabled: false, + handleHooksRequest: async (_req, res) => { + activeTraceInHandler = getActiveDiagnosticTraceContext(); + getLogger().info({ route: "/hook" }, "handled request trace"); + emitDiagnosticEvent({ type: "message.queued", source: "gateway-test" }); + res.statusCode = 204; + res.end(); + return true; + }, + resolvedAuth, + }); + const port = await listen(httpServer); + try { + const response = await fetch(`http://127.0.0.1:${port}/hook`); + expect(response.status).toBe(204); + } finally { + await closeServer(httpServer); + } + }, + }); + + stop(); + try { + expect(activeTraceInHandler?.traceId).toMatch(/^[0-9a-f]{32}$/); + expect(activeTraceInHandler?.spanId).toMatch(/^[0-9a-f]{16}$/); + expect(events).toEqual([{ trace: activeTraceInHandler, type: "message.queued" }]); + + const [line] = fs.readFileSync(logPath, "utf8").trim().split("\n"); + const record = JSON.parse(line ?? "{}") as Record; + expect(record).toMatchObject({ + traceId: activeTraceInHandler?.traceId, + spanId: activeTraceInHandler?.spanId, + }); + } finally { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 26f3c6e8edd..8d3f0982e74 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -13,6 +13,10 @@ import type { CanvasHostHandler } from "../canvas-host/server.js"; import { resolveBundledChannelGatewayAuthBypassPaths } from "../channels/plugins/gateway-auth-bypass.js"; import { loadConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { + createDiagnosticTraceContext, + runWithDiagnosticTraceContext, +} from "../infra/diagnostic-trace-context.js"; import type { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveHookExternalContentSource as resolveHookExternalContentSourceFromSession } from "../security/external-content.js"; import { safeEqualSecret } from "../security/secret-equal.js"; @@ -911,12 +915,18 @@ export function createGatewayHttpServer(opts: { const openAiCompatEnabled = openAiChatCompletionsEnabled || openResponsesEnabled; const httpServer: HttpServer = opts.tlsOptions ? createHttpsServer(opts.tlsOptions, (req, res) => { - void handleRequest(req, res); + void handleRequestWithTrace(req, res); }) : createHttpServer((req, res) => { - void handleRequest(req, res); + void handleRequestWithTrace(req, res); }); + function handleRequestWithTrace(req: IncomingMessage, res: ServerResponse) { + return runWithDiagnosticTraceContext(createDiagnosticTraceContext(), () => + handleRequest(req, res), + ); + } + async function handleRequest(req: IncomingMessage, res: ServerResponse) { setDefaultSecurityHeaders(res, { strictTransportSecurity: strictTransportSecurityHeader, @@ -1206,7 +1216,7 @@ export function attachGatewayUpgradeHandler(opts: { } = opts; const getResolvedAuth = opts.getResolvedAuth ?? (() => resolvedAuth); httpServer.on("upgrade", (req, socket, head) => { - void (async () => { + void runWithDiagnosticTraceContext(createDiagnosticTraceContext(), async () => { const configSnapshot = loadConfig(); const trustedProxies = configSnapshot.gateway?.trustedProxies ?? []; const allowRealIpFallback = configSnapshot.gateway?.allowRealIpFallback === true; @@ -1325,7 +1335,7 @@ export function attachGatewayUpgradeHandler(opts: { releaseUpgradeBudget(); throw new Error("gateway websocket upgrade failed"); } - })().catch((err) => { + }).catch((err) => { const remoteAddress = (socket as { remoteAddress?: string }).remoteAddress ?? "unknown"; const errorMessage = err instanceof Error ? err.message : String(err); log?.warn(`ws upgrade error from ${remoteAddress}: ${errorMessage}`); diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index f3bf99bc8ed..e9406df7e01 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -1,6 +1,6 @@ import type { IncomingMessage } from "node:http"; import os from "node:os"; -import type { WebSocket } from "ws"; +import type { RawData, WebSocket } from "ws"; import { loadConfig } from "../../../config/config.js"; import { getBoundDeviceBootstrapProfile, @@ -26,6 +26,10 @@ import { updatePairedDeviceMetadata, verifyDeviceToken, } from "../../../infra/device-pairing.js"; +import { + createDiagnosticTraceContext, + runWithDiagnosticTraceContext, +} from "../../../infra/diagnostic-trace-context.js"; import { getPairedNode, requestNodePairing, @@ -321,7 +325,7 @@ export function attachGatewayWsMessageHandler(params: { authRateLimiter, } = browserSecurity; - socket.on("message", async (data) => { + const handleMessage = async (data: RawData) => { if (isClosed()) { return; } @@ -1576,6 +1580,10 @@ export function attachGatewayWsMessageHandler(params: { close(); } } + }; + + socket.on("message", (data) => { + void runWithDiagnosticTraceContext(createDiagnosticTraceContext(), () => handleMessage(data)); }); } diff --git a/src/infra/diagnostic-events.test.ts b/src/infra/diagnostic-events.test.ts index 7153fe63bcc..954c0fa06dc 100644 --- a/src/infra/diagnostic-events.test.ts +++ b/src/infra/diagnostic-events.test.ts @@ -9,7 +9,11 @@ import { resetDiagnosticEventsForTest, setDiagnosticsEnabledForProcess, } from "./diagnostic-events.js"; -import { createDiagnosticTraceContext } from "./diagnostic-trace-context.js"; +import { + createDiagnosticTraceContext, + resetDiagnosticTraceContextForTest, + runWithDiagnosticTraceContext, +} from "./diagnostic-trace-context.js"; describe("diagnostic-events", () => { beforeEach(() => { @@ -18,6 +22,7 @@ describe("diagnostic-events", () => { afterEach(() => { resetDiagnosticEventsForTest(); + resetDiagnosticTraceContextForTest(); vi.restoreAllMocks(); }); @@ -117,6 +122,39 @@ describe("diagnostic-events", () => { expect(events).toEqual([{ trace, type: "message.queued" }]); }); + it("uses active request trace context when events omit explicit trace", () => { + const trace = createDiagnosticTraceContext({ + traceId: "4bf92f3577b34da6a3ce929d0e0e4736", + spanId: "00f067aa0ba902b7", + }); + const explicitTrace = createDiagnosticTraceContext({ + traceId: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + spanId: "bbbbbbbbbbbbbbbb", + }); + const events: Array<{ trace: typeof trace | undefined; type: string }> = []; + const stop = onDiagnosticEvent((event) => { + events.push({ trace: event.trace, type: event.type }); + }); + + runWithDiagnosticTraceContext(trace, () => { + emitDiagnosticEvent({ + type: "message.queued", + source: "telegram", + }); + emitDiagnosticEvent({ + type: "message.queued", + source: "telegram", + trace: explicitTrace, + }); + }); + stop(); + + expect(events).toEqual([ + { trace, type: "message.queued" }, + { trace: explicitTrace, type: "message.queued" }, + ]); + }); + it("marks only internal trusted diagnostic emissions as trusted", async () => { const events: Array<{ metadataTrusted: boolean; diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 6e29e2c8990..3892f96a665 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -1,6 +1,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { formatDiagnosticTraceparent, + getActiveDiagnosticTraceContext, type DiagnosticTraceContext, } from "./diagnostic-trace-context.js"; import { isBlockedObjectKey } from "./prototype-keys.js"; @@ -659,6 +660,7 @@ function enrichDiagnosticEvent( } enriched[key] = value; } + enriched.trace ??= getActiveDiagnosticTraceContext(); state.seq += 1; enriched.seq = state.seq; enriched.ts = Date.now(); diff --git a/src/infra/diagnostic-trace-context.test.ts b/src/infra/diagnostic-trace-context.test.ts index c1660c439f9..91c24dd9649 100644 --- a/src/infra/diagnostic-trace-context.test.ts +++ b/src/infra/diagnostic-trace-context.test.ts @@ -1,13 +1,16 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it } from "vitest"; import { createChildDiagnosticTraceContext, createDiagnosticTraceContext, freezeDiagnosticTraceContext, formatDiagnosticTraceparent, + getActiveDiagnosticTraceContext, isValidDiagnosticSpanId, isValidDiagnosticTraceFlags, isValidDiagnosticTraceId, parseDiagnosticTraceparent, + resetDiagnosticTraceContextForTest, + runWithDiagnosticTraceContext, } from "./diagnostic-trace-context.js"; const TRACE_ID = "4bf92f3577b34da6a3ce929d0e0e4736"; @@ -15,6 +18,10 @@ const SPAN_ID = "00f067aa0ba902b7"; const CHILD_SPAN_ID = "7ad6b9a982deb2c9"; describe("diagnostic-trace-context", () => { + afterEach(() => { + resetDiagnosticTraceContextForTest(); + }); + it("validates W3C trace ids, span ids, and trace flags", () => { expect(isValidDiagnosticTraceId(TRACE_ID)).toBe(true); expect(isValidDiagnosticSpanId(SPAN_ID)).toBe(true); @@ -127,4 +134,28 @@ describe("diagnostic-trace-context", () => { expect(frozen).not.toBe(context); expect(Object.isFrozen(frozen)).toBe(true); }); + + it("carries active trace context across async work and restores outer scopes", async () => { + const outer = createDiagnosticTraceContext({ + traceId: TRACE_ID, + spanId: SPAN_ID, + }); + const inner = createChildDiagnosticTraceContext(outer, { + spanId: CHILD_SPAN_ID, + }); + + await runWithDiagnosticTraceContext(outer, async () => { + expect(getActiveDiagnosticTraceContext()).toEqual(outer); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(getActiveDiagnosticTraceContext()).toEqual(outer); + + runWithDiagnosticTraceContext(inner, () => { + expect(getActiveDiagnosticTraceContext()).toEqual(inner); + }); + + expect(getActiveDiagnosticTraceContext()).toEqual(outer); + }); + + expect(getActiveDiagnosticTraceContext()).toBeUndefined(); + }); }); diff --git a/src/infra/diagnostic-trace-context.ts b/src/infra/diagnostic-trace-context.ts index 9f4f7f0bc5f..87c97d0d567 100644 --- a/src/infra/diagnostic-trace-context.ts +++ b/src/infra/diagnostic-trace-context.ts @@ -1,3 +1,4 @@ +import { AsyncLocalStorage } from "node:async_hooks"; import { randomBytes } from "node:crypto"; const TRACEPARENT_VERSION = "00"; @@ -7,6 +8,7 @@ const TRACE_ID_RE = /^[0-9a-f]{32}$/; const SPAN_ID_RE = /^[0-9a-f]{16}$/; const TRACE_FLAGS_RE = /^[0-9a-f]{2}$/; const TRACEPARENT_VERSION_RE = /^[0-9a-f]{2}$/; +const DIAGNOSTIC_TRACE_SCOPE_STATE_KEY = Symbol.for("openclaw.diagnosticTraceScope.state.v1"); export type DiagnosticTraceContext = { /** W3C trace id, 32 lowercase hex chars. */ @@ -23,6 +25,11 @@ export type DiagnosticTraceContextInput = Partial & { traceparent?: string; }; +type DiagnosticTraceScopeState = { + marker: symbol; + storage: AsyncLocalStorage; +}; + function randomHex(bytes: number): string { return randomBytes(bytes).toString("hex"); } @@ -47,6 +54,40 @@ function randomSpanId(): string { return spanId; } +function createDiagnosticTraceScopeState(): DiagnosticTraceScopeState { + return { + marker: DIAGNOSTIC_TRACE_SCOPE_STATE_KEY, + storage: new AsyncLocalStorage(), + }; +} + +function isDiagnosticTraceScopeState(value: unknown): value is DiagnosticTraceScopeState { + if (!value || typeof value !== "object") { + return false; + } + const candidate = value as Partial; + return ( + candidate.marker === DIAGNOSTIC_TRACE_SCOPE_STATE_KEY && + candidate.storage instanceof AsyncLocalStorage + ); +} + +function getDiagnosticTraceScopeState(): DiagnosticTraceScopeState { + const globalRecord = globalThis as Record; + const existing = globalRecord[DIAGNOSTIC_TRACE_SCOPE_STATE_KEY]; + if (isDiagnosticTraceScopeState(existing)) { + return existing; + } + const state = createDiagnosticTraceScopeState(); + Object.defineProperty(globalThis, DIAGNOSTIC_TRACE_SCOPE_STATE_KEY, { + configurable: true, + enumerable: false, + value: state, + writable: false, + }); + return state; +} + export function isValidDiagnosticTraceId(value: unknown): value is string { return typeof value === "string" && TRACE_ID_RE.test(value) && isNonZeroHex(value); } @@ -167,3 +208,25 @@ export function freezeDiagnosticTraceContext( ...(context.traceFlags ? { traceFlags: context.traceFlags } : {}), }); } + +export function getActiveDiagnosticTraceContext(): DiagnosticTraceContext | undefined { + return getDiagnosticTraceScopeState().storage.getStore(); +} + +export function runWithDiagnosticTraceContext( + trace: DiagnosticTraceContext, + callback: () => T, +): T { + return getDiagnosticTraceScopeState().storage.run(freezeDiagnosticTraceContext(trace), callback); +} + +export function runWithNewDiagnosticTraceContext( + input: DiagnosticTraceContextInput, + callback: () => T, +): T { + return runWithDiagnosticTraceContext(createDiagnosticTraceContext(input), callback); +} + +export function resetDiagnosticTraceContextForTest(): void { + getDiagnosticTraceScopeState().storage.disable(); +} diff --git a/src/logging/diagnostic-log-events.test.ts b/src/logging/diagnostic-log-events.test.ts index 080038cfee5..d0e618747c7 100644 --- a/src/logging/diagnostic-log-events.test.ts +++ b/src/logging/diagnostic-log-events.test.ts @@ -4,6 +4,11 @@ import { resetDiagnosticEventsForTest, type DiagnosticEventPayload, } from "../infra/diagnostic-events.js"; +import { + createDiagnosticTraceContext, + resetDiagnosticTraceContextForTest, + runWithDiagnosticTraceContext, +} from "../infra/diagnostic-trace-context.js"; import { getChildLogger, resetLogger, setLoggerOverride } from "./logger.js"; const TRACE_ID = "4bf92f3577b34da6a3ce929d0e0e4736"; @@ -22,6 +27,7 @@ beforeEach(() => { afterEach(() => { resetDiagnosticEventsForTest(); + resetDiagnosticTraceContextForTest(); setLoggerOverride(null); resetLogger(); }); @@ -59,6 +65,29 @@ describe("diagnostic log events", () => { }); }); + it("uses active request trace context for unbound log records", async () => { + const trace = createDiagnosticTraceContext({ + traceId: TRACE_ID, + spanId: SPAN_ID, + }); + const received: Array> = []; + const unsubscribe = onInternalDiagnosticEvent((evt) => { + if (evt.type === "log.record") { + received.push(evt); + } + }); + + runWithDiagnosticTraceContext(trace, () => { + const logger = getChildLogger({ subsystem: "diagnostic" }); + logger.info({ runId: "run-1" }, "request-scoped diagnostic log"); + }); + await flushDiagnosticEvents(); + unsubscribe(); + + expect(received).toHaveLength(1); + expect(received[0]?.trace).toEqual(trace); + }); + it("redacts and bounds internal log records before diagnostic emission", async () => { const received: Array> = []; const unsubscribe = onInternalDiagnosticEvent((evt) => { diff --git a/src/logging/logger-redaction-behavior.test.ts b/src/logging/logger-redaction-behavior.test.ts index 0c1f6ebc2ff..47ba0acbfd2 100644 --- a/src/logging/logger-redaction-behavior.test.ts +++ b/src/logging/logger-redaction-behavior.test.ts @@ -1,5 +1,10 @@ import fs from "node:fs"; import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; +import { + createDiagnosticTraceContext, + resetDiagnosticTraceContextForTest, + runWithDiagnosticTraceContext, +} from "../infra/diagnostic-trace-context.js"; import { getChildLogger, getLogger, resetLogger, setLoggerOverride } from "../logging.js"; import { createSuiteLogPathTracker } from "./log-test-helpers.js"; @@ -25,6 +30,7 @@ afterEach(() => { } else { process.env.OPENCLAW_TEST_FILE_LOG = originalTestFileLog; } + resetDiagnosticTraceContextForTest(); resetLogger(); setLoggerOverride(null); }); @@ -97,4 +103,24 @@ describe("file log redaction", () => { spanId: SPAN_ID, }); }); + + it("writes active request trace context as top-level JSONL fields", () => { + const logPath = logPathTracker.nextPath(); + setLoggerOverride({ level: "info", file: logPath }); + const trace = createDiagnosticTraceContext({ + traceId: TRACE_ID, + spanId: SPAN_ID, + }); + + runWithDiagnosticTraceContext(trace, () => { + getLogger().info({ route: "/api/health" }, "request completed"); + }); + + const [line] = fs.readFileSync(logPath, "utf8").trim().split("\n"); + const record = JSON.parse(line ?? "{}") as Record; + expect(record).toMatchObject({ + traceId: TRACE_ID, + spanId: SPAN_ID, + }); + }); }); diff --git a/src/logging/logger.ts b/src/logging/logger.ts index 21dae48761d..481ac95512f 100644 --- a/src/logging/logger.ts +++ b/src/logging/logger.ts @@ -4,6 +4,7 @@ import { Logger as TsLogger } from "tslog"; import type { OpenClawConfig } from "../config/types.js"; import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; import { + getActiveDiagnosticTraceContext, isValidDiagnosticSpanId, isValidDiagnosticTraceFlags, isValidDiagnosticTraceId, @@ -252,7 +253,7 @@ function findLogTraceContext( function buildTraceFileLogFields(logObj: TsLogRecord): Record | undefined { const { bindings, args } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj)); - const trace = findLogTraceContext(bindings, args); + const trace = findLogTraceContext(bindings, args) ?? getActiveDiagnosticTraceContext(); if (!trace) { return undefined; } @@ -282,7 +283,7 @@ function buildDiagnosticLogRecord(logObj: TsLogRecord) { | undefined; const { bindings, args: numericArgs } = extractLogBindingPrefix(getSortedNumericLogArgs(logObj)); - const trace = findLogTraceContext(bindings, numericArgs); + const trace = findLogTraceContext(bindings, numericArgs) ?? getActiveDiagnosticTraceContext(); const structuredArg = numericArgs[0]; const structuredBindings = isPlainLogRecordObject(structuredArg) ? structuredArg : undefined; if (structuredBindings) {