mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:40:44 +00:00
fix(diagnostics): harden event emission (#71164)
This commit is contained in:
@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Diagnostics: harden tool and model diagnostic events against hostile errors, blocking listeners, and unsafe stability reason fields. Thanks @vincentkoc.
|
||||
- Plugins/onboarding: record local plugin install source metadata without duplicating raw absolute local paths in persisted `plugins.installs`, while preserving linked load-path cleanup. (#70970) Thanks @vincentkoc.
|
||||
- Browser/tool: tell agents not to pass per-call `timeoutMs` on existing-session type, evaluate, and other Chrome MCP actions that reject timeout overrides.
|
||||
- Codex/GPT-5.4: harden fallback, auth-profile, tool-schema, and replay edge cases across native and embedded runtime paths. (#70743) Thanks @100yenadmin.
|
||||
|
||||
@@ -17,6 +17,7 @@ async function collectModelCallEvents(run: () => Promise<void>): Promise<Diagnos
|
||||
});
|
||||
try {
|
||||
await run();
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
return events;
|
||||
} finally {
|
||||
stop();
|
||||
@@ -66,7 +67,7 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
|
||||
{} as never,
|
||||
{} as never,
|
||||
) as unknown as typeof originalStream;
|
||||
expect(returned).toBe(originalStream);
|
||||
expect(returned).not.toBe(originalStream);
|
||||
expect(await returned.result()).toBe("kept");
|
||||
await drain(returned);
|
||||
});
|
||||
@@ -130,6 +131,42 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not mutate non-configurable provider streams", async () => {
|
||||
const stream = {};
|
||||
Object.defineProperty(stream, Symbol.asyncIterator, {
|
||||
configurable: false,
|
||||
value: async function* () {
|
||||
yield { type: "text", text: "ok" };
|
||||
},
|
||||
});
|
||||
Object.freeze(stream);
|
||||
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
|
||||
(() => stream) as unknown as StreamFn,
|
||||
{
|
||||
runId: "run-1",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
trace: createDiagnosticTraceContext(),
|
||||
nextCallId: () => "call-frozen",
|
||||
},
|
||||
);
|
||||
|
||||
const events = await collectModelCallEvents(async () => {
|
||||
const returned = wrapped(
|
||||
{} as never,
|
||||
{} as never,
|
||||
{} as never,
|
||||
) as unknown as AsyncIterable<unknown>;
|
||||
expect(returned).not.toBe(stream);
|
||||
await drain(returned);
|
||||
});
|
||||
|
||||
expect(events.map((event) => event.type)).toEqual([
|
||||
"model.call.started",
|
||||
"model.call.completed",
|
||||
]);
|
||||
});
|
||||
|
||||
it("emits error events when stream consumption stops early", async () => {
|
||||
async function* stream() {
|
||||
yield { type: "text", text: "first" };
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import { diagnosticErrorCategory } from "../../../infra/diagnostic-error-metadata.js";
|
||||
import {
|
||||
emitDiagnosticEvent,
|
||||
type DiagnosticEventInput,
|
||||
@@ -26,27 +27,32 @@ type ModelCallEventBase = Omit<
|
||||
"type"
|
||||
>;
|
||||
|
||||
export function diagnosticErrorCategory(err: unknown): string {
|
||||
if (err instanceof Error && err.name.trim()) {
|
||||
return err.name;
|
||||
}
|
||||
return typeof err;
|
||||
}
|
||||
const MODEL_CALL_STREAM_RETURN_TIMEOUT_MS = 1000;
|
||||
|
||||
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
|
||||
return (
|
||||
value !== null &&
|
||||
(typeof value === "object" || typeof value === "function") &&
|
||||
typeof (value as { then?: unknown }).then === "function"
|
||||
);
|
||||
if (value === null || (typeof value !== "object" && typeof value !== "function")) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
return typeof (value as { then?: unknown }).then === "function";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
|
||||
return (
|
||||
value !== null &&
|
||||
typeof value === "object" &&
|
||||
typeof (value as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === "function"
|
||||
);
|
||||
function asyncIteratorFactory(value: unknown): (() => AsyncIterator<unknown>) | undefined {
|
||||
if (value === null || typeof value !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const asyncIterator = (value as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator];
|
||||
if (typeof asyncIterator !== "function") {
|
||||
return undefined;
|
||||
}
|
||||
return () => asyncIterator.call(value) as AsyncIterator<unknown>;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function baseModelCallEvent(
|
||||
@@ -67,6 +73,38 @@ function baseModelCallEvent(
|
||||
};
|
||||
}
|
||||
|
||||
async function safeReturnIterator(iterator: AsyncIterator<unknown>): Promise<void> {
|
||||
let returnResult: PromiseLike<unknown> | unknown;
|
||||
try {
|
||||
returnResult = iterator.return?.();
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (!returnResult) {
|
||||
return;
|
||||
}
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
await Promise.race([
|
||||
Promise.resolve(returnResult).catch(() => undefined),
|
||||
new Promise<void>((resolve) => {
|
||||
timeout = setTimeout(resolve, MODEL_CALL_STREAM_RETURN_TIMEOUT_MS);
|
||||
const unref =
|
||||
typeof timeout === "object" && timeout
|
||||
? (timeout as { unref?: () => void }).unref
|
||||
: undefined;
|
||||
if (unref) {
|
||||
unref.call(timeout);
|
||||
}
|
||||
}),
|
||||
]);
|
||||
} finally {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function* observeModelCallIterator<T>(
|
||||
iterator: AsyncIterator<T>,
|
||||
eventBase: ModelCallEventBase,
|
||||
@@ -98,7 +136,7 @@ async function* observeModelCallIterator<T>(
|
||||
throw err;
|
||||
} finally {
|
||||
if (!terminalEmitted) {
|
||||
await iterator.return?.();
|
||||
await safeReturnIterator(iterator);
|
||||
emitDiagnosticEvent({
|
||||
type: "model.call.error",
|
||||
...eventBase,
|
||||
@@ -111,16 +149,33 @@ async function* observeModelCallIterator<T>(
|
||||
|
||||
function observeModelCallStream<T extends AsyncIterable<unknown>>(
|
||||
stream: T,
|
||||
createIterator: () => AsyncIterator<unknown>,
|
||||
eventBase: ModelCallEventBase,
|
||||
startedAt: number,
|
||||
): T {
|
||||
const createIterator = stream[Symbol.asyncIterator].bind(stream);
|
||||
Object.defineProperty(stream, Symbol.asyncIterator, {
|
||||
configurable: true,
|
||||
value: () =>
|
||||
observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator](),
|
||||
const observedIterator = () =>
|
||||
observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator]();
|
||||
let hasNonConfigurableIterator = false;
|
||||
try {
|
||||
hasNonConfigurableIterator =
|
||||
Object.getOwnPropertyDescriptor(stream, Symbol.asyncIterator)?.configurable === false;
|
||||
} catch {
|
||||
hasNonConfigurableIterator = true;
|
||||
}
|
||||
if (hasNonConfigurableIterator) {
|
||||
return {
|
||||
[Symbol.asyncIterator]: observedIterator,
|
||||
} as T;
|
||||
}
|
||||
return new Proxy(stream, {
|
||||
get(target, property, receiver) {
|
||||
if (property === Symbol.asyncIterator) {
|
||||
return observedIterator;
|
||||
}
|
||||
const value = Reflect.get(target, property, receiver);
|
||||
return typeof value === "function" ? value.bind(target) : value;
|
||||
},
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
|
||||
function observeModelCallResult(
|
||||
@@ -128,8 +183,14 @@ function observeModelCallResult(
|
||||
eventBase: ModelCallEventBase,
|
||||
startedAt: number,
|
||||
): unknown {
|
||||
if (isAsyncIterable(result)) {
|
||||
return observeModelCallStream(result, eventBase, startedAt);
|
||||
const createIterator = asyncIteratorFactory(result);
|
||||
if (createIterator) {
|
||||
return observeModelCallStream(
|
||||
result as AsyncIterable<unknown>,
|
||||
createIterator,
|
||||
eventBase,
|
||||
startedAt,
|
||||
);
|
||||
}
|
||||
emitDiagnosticEvent({
|
||||
type: "model.call.completed",
|
||||
|
||||
@@ -87,7 +87,7 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
}
|
||||
|
||||
async function withToolExecutionEvents(
|
||||
run: (emitted: DiagnosticEventPayload[]) => Promise<void>,
|
||||
run: (emitted: DiagnosticEventPayload[], flush: () => Promise<void>) => Promise<void>,
|
||||
) {
|
||||
const emitted: DiagnosticEventPayload[] = [];
|
||||
const stop = onDiagnosticEvent((evt) => {
|
||||
@@ -95,8 +95,9 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
emitted.push(evt);
|
||||
}
|
||||
});
|
||||
const flush = () => new Promise<void>((resolve) => setImmediate(resolve));
|
||||
try {
|
||||
await run(emitted);
|
||||
await run(emitted, flush);
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
@@ -367,13 +368,14 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
loopDetection: { enabled: false },
|
||||
});
|
||||
|
||||
await withToolExecutionEvents(async (emitted) => {
|
||||
await withToolExecutionEvents(async (emitted, flush) => {
|
||||
await tool.execute(
|
||||
"tool-call-1",
|
||||
{ command: "pwd", token: "sk-1234567890abcdef1234567890abcdef" },
|
||||
undefined,
|
||||
undefined,
|
||||
);
|
||||
await flush();
|
||||
|
||||
expect(emitted.map((evt) => evt.type)).toEqual([
|
||||
"tool.execution.started",
|
||||
@@ -412,10 +414,11 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
loopDetection: { enabled: false },
|
||||
});
|
||||
|
||||
await withToolExecutionEvents(async (emitted) => {
|
||||
await withToolExecutionEvents(async (emitted, flush) => {
|
||||
await expect(
|
||||
tool.execute("tool-call-error", { path: "/tmp/file" }, undefined, undefined),
|
||||
).rejects.toThrow("failed with key");
|
||||
await flush();
|
||||
|
||||
expect(emitted.map((evt) => evt.type)).toEqual([
|
||||
"tool.execution.started",
|
||||
@@ -432,6 +435,71 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("does not let hostile thrown values break diagnostic error emission", async () => {
|
||||
const hostileError = new Proxy(
|
||||
{},
|
||||
{
|
||||
get() {
|
||||
throw new Error("diagnostic getter should not run");
|
||||
},
|
||||
getOwnPropertyDescriptor() {
|
||||
throw new Error("diagnostic descriptor failed");
|
||||
},
|
||||
},
|
||||
);
|
||||
const execute = vi.fn().mockRejectedValue(hostileError);
|
||||
const tool = wrapToolWithBeforeToolCallHook({ name: "read", execute } as any, {
|
||||
agentId: "main",
|
||||
sessionKey: "session-key",
|
||||
loopDetection: { enabled: false },
|
||||
});
|
||||
|
||||
await withToolExecutionEvents(async (emitted, flush) => {
|
||||
await expect(
|
||||
tool.execute("tool-call-hostile-error", { path: "/tmp/file" }, undefined, undefined),
|
||||
).rejects.toBe(hostileError);
|
||||
await flush();
|
||||
|
||||
expect(emitted.map((evt) => evt.type)).toEqual([
|
||||
"tool.execution.started",
|
||||
"tool.execution.error",
|
||||
]);
|
||||
expect(emitted[1]).toMatchObject({
|
||||
type: "tool.execution.error",
|
||||
toolName: "read",
|
||||
toolCallId: "tool-call-hostile-error",
|
||||
errorCategory: "object",
|
||||
});
|
||||
expect(emitted[1]).not.toHaveProperty("errorCode");
|
||||
});
|
||||
});
|
||||
|
||||
it("emits only numeric HTTP status codes as diagnostic tool error codes", async () => {
|
||||
const error = Object.assign(new Error("rate limited"), {
|
||||
code: "SECRET_TOKEN",
|
||||
status: 429,
|
||||
});
|
||||
const execute = vi.fn().mockRejectedValue(error);
|
||||
const tool = wrapToolWithBeforeToolCallHook({ name: "read", execute } as any, {
|
||||
agentId: "main",
|
||||
sessionKey: "session-key",
|
||||
loopDetection: { enabled: false },
|
||||
});
|
||||
|
||||
await withToolExecutionEvents(async (emitted, flush) => {
|
||||
await expect(
|
||||
tool.execute("tool-call-status-code", { path: "/tmp/file" }, undefined, undefined),
|
||||
).rejects.toThrow("rate limited");
|
||||
await flush();
|
||||
|
||||
expect(emitted[1]).toMatchObject({
|
||||
type: "tool.execution.error",
|
||||
errorCode: "429",
|
||||
});
|
||||
expect(JSON.stringify(emitted[1])).not.toContain("SECRET_TOKEN");
|
||||
});
|
||||
});
|
||||
|
||||
it("summarizes hostile object params without enumerating keys", async () => {
|
||||
const execute = vi.fn().mockResolvedValue({ content: [{ type: "text", text: "ok" }] });
|
||||
const tool = wrapToolWithBeforeToolCallHook({ name: "bash", execute } as any, {
|
||||
@@ -448,8 +516,9 @@ describe("before_tool_call loop detection behavior", () => {
|
||||
},
|
||||
);
|
||||
|
||||
await withToolExecutionEvents(async (emitted) => {
|
||||
await withToolExecutionEvents(async (emitted, flush) => {
|
||||
await tool.execute("tool-call-proxy", params, undefined, undefined);
|
||||
await flush();
|
||||
|
||||
expect(emitted[0]).toMatchObject({
|
||||
type: "tool.execution.started",
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
import type { ToolLoopDetectionConfig } from "../config/types.tools.js";
|
||||
import {
|
||||
diagnosticErrorCategory,
|
||||
diagnosticHttpStatusCode,
|
||||
} from "../infra/diagnostic-error-metadata.js";
|
||||
import {
|
||||
emitDiagnosticEvent,
|
||||
type DiagnosticToolParamsSummary,
|
||||
@@ -79,8 +83,16 @@ function isAbortSignalCancellation(err: unknown, signal?: AbortSignal): boolean
|
||||
}
|
||||
|
||||
function unwrapErrorCause(err: unknown): unknown {
|
||||
if (err instanceof Error && err.cause !== undefined) {
|
||||
return err.cause;
|
||||
try {
|
||||
if (!(err instanceof Error)) {
|
||||
return err;
|
||||
}
|
||||
const cause = Object.getOwnPropertyDescriptor(err, "cause");
|
||||
if (cause && "value" in cause && cause.value !== undefined) {
|
||||
return cause.value;
|
||||
}
|
||||
} catch {
|
||||
return err;
|
||||
}
|
||||
return err;
|
||||
}
|
||||
@@ -110,32 +122,6 @@ function summarizeToolParams(params: unknown): DiagnosticToolParamsSummary {
|
||||
return { kind: "other" };
|
||||
}
|
||||
|
||||
function errorCategory(err: unknown): string {
|
||||
if (err instanceof Error && err.name.trim()) {
|
||||
return err.name;
|
||||
}
|
||||
return typeof err;
|
||||
}
|
||||
|
||||
function diagnosticErrorCode(err: unknown): string | undefined {
|
||||
if (!err || typeof err !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const candidate = err as { code?: unknown; status?: unknown; statusCode?: unknown };
|
||||
const code = candidate.code ?? candidate.status ?? candidate.statusCode;
|
||||
if (typeof code === "number" && Number.isFinite(code)) {
|
||||
return String(code);
|
||||
}
|
||||
if (typeof code !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const trimmed = code.trim();
|
||||
if (!trimmed) {
|
||||
return undefined;
|
||||
}
|
||||
return trimmed.slice(0, 64);
|
||||
}
|
||||
|
||||
function shouldEmitLoopWarning(state: SessionState, warningKey: string, count: number): boolean {
|
||||
if (!state.toolLoopWarningBuckets) {
|
||||
state.toolLoopWarningBuckets = new Map();
|
||||
@@ -486,11 +472,7 @@ export function wrapToolWithBeforeToolCallHook(
|
||||
const startedAt = Date.now();
|
||||
try {
|
||||
const result = await execute(toolCallId, outcome.params, signal, onUpdate);
|
||||
emitDiagnosticEvent({
|
||||
type: "tool.execution.completed",
|
||||
...eventBase,
|
||||
durationMs: Date.now() - startedAt,
|
||||
});
|
||||
const durationMs = Date.now() - startedAt;
|
||||
await recordLoopOutcome({
|
||||
ctx,
|
||||
toolName: normalizedToolName,
|
||||
@@ -498,15 +480,20 @@ export function wrapToolWithBeforeToolCallHook(
|
||||
toolCallId,
|
||||
result,
|
||||
});
|
||||
emitDiagnosticEvent({
|
||||
type: "tool.execution.completed",
|
||||
...eventBase,
|
||||
durationMs,
|
||||
});
|
||||
return result;
|
||||
} catch (err) {
|
||||
const cause = unwrapErrorCause(err);
|
||||
const errorCode = diagnosticErrorCode(cause);
|
||||
const errorCode = diagnosticHttpStatusCode(cause);
|
||||
emitDiagnosticEvent({
|
||||
type: "tool.execution.error",
|
||||
...eventBase,
|
||||
durationMs: Date.now() - startedAt,
|
||||
errorCategory: errorCategory(cause),
|
||||
errorCategory: diagnosticErrorCategory(cause),
|
||||
...(errorCode ? { errorCode } : {}),
|
||||
});
|
||||
await recordLoopOutcome({
|
||||
|
||||
50
src/infra/diagnostic-error-metadata.test.ts
Normal file
50
src/infra/diagnostic-error-metadata.test.ts
Normal file
@@ -0,0 +1,50 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { diagnosticErrorCategory, diagnosticHttpStatusCode } from "./diagnostic-error-metadata.js";
|
||||
|
||||
describe("diagnostic error metadata", () => {
|
||||
it("returns stable categories without reading mutable Error.name", () => {
|
||||
const namedFailure = new Error("bad");
|
||||
Object.defineProperty(namedFailure, "name", {
|
||||
get() {
|
||||
throw new Error("should not read name");
|
||||
},
|
||||
});
|
||||
|
||||
expect(diagnosticErrorCategory(new TypeError("bad"))).toBe("TypeError");
|
||||
expect(diagnosticErrorCategory(namedFailure)).toBe("Error");
|
||||
expect(diagnosticErrorCategory("bad")).toBe("string");
|
||||
expect(diagnosticErrorCategory(null)).toBe("null");
|
||||
});
|
||||
|
||||
it("accepts only own HTTP status data properties as error codes", () => {
|
||||
expect(diagnosticHttpStatusCode({ status: 429 })).toBe("429");
|
||||
expect(diagnosticHttpStatusCode({ statusCode: 503 })).toBe("503");
|
||||
expect(diagnosticHttpStatusCode({ code: "SECRET_TOKEN" })).toBeUndefined();
|
||||
expect(diagnosticHttpStatusCode({ status: 99 })).toBeUndefined();
|
||||
expect(diagnosticHttpStatusCode({ status: "https://example.invalid/secret" })).toBeUndefined();
|
||||
});
|
||||
|
||||
it("does not invoke throwing getters while extracting status codes", () => {
|
||||
const errorLike = {};
|
||||
Object.defineProperty(errorLike, "status", {
|
||||
get() {
|
||||
throw new Error("should not read getter");
|
||||
},
|
||||
});
|
||||
|
||||
expect(diagnosticHttpStatusCode(errorLike)).toBeUndefined();
|
||||
});
|
||||
|
||||
it("contains proxy traps during extraction", () => {
|
||||
const errorLike = new Proxy(
|
||||
{},
|
||||
{
|
||||
getOwnPropertyDescriptor() {
|
||||
throw new Error("hostile descriptor");
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
expect(diagnosticHttpStatusCode(errorLike)).toBeUndefined();
|
||||
});
|
||||
});
|
||||
71
src/infra/diagnostic-error-metadata.ts
Normal file
71
src/infra/diagnostic-error-metadata.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
const HTTP_STATUS_MIN = 100;
|
||||
const HTTP_STATUS_MAX = 599;
|
||||
|
||||
function isObjectLike(value: unknown): value is object {
|
||||
return (typeof value === "object" || typeof value === "function") && value !== null;
|
||||
}
|
||||
|
||||
function readOwnDataProperty(value: unknown, key: string): unknown {
|
||||
if (!isObjectLike(value)) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const descriptor = Object.getOwnPropertyDescriptor(value, key);
|
||||
return descriptor && "value" in descriptor ? descriptor.value : undefined;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function isHttpStatusCode(value: unknown): value is number {
|
||||
return (
|
||||
typeof value === "number" &&
|
||||
Number.isInteger(value) &&
|
||||
value >= HTTP_STATUS_MIN &&
|
||||
value <= HTTP_STATUS_MAX
|
||||
);
|
||||
}
|
||||
|
||||
export function diagnosticErrorCategory(err: unknown): string {
|
||||
try {
|
||||
if (err instanceof TypeError) {
|
||||
return "TypeError";
|
||||
}
|
||||
if (err instanceof RangeError) {
|
||||
return "RangeError";
|
||||
}
|
||||
if (err instanceof ReferenceError) {
|
||||
return "ReferenceError";
|
||||
}
|
||||
if (err instanceof SyntaxError) {
|
||||
return "SyntaxError";
|
||||
}
|
||||
if (err instanceof URIError) {
|
||||
return "URIError";
|
||||
}
|
||||
if (typeof AggregateError !== "undefined" && err instanceof AggregateError) {
|
||||
return "AggregateError";
|
||||
}
|
||||
if (err instanceof Error) {
|
||||
return "Error";
|
||||
}
|
||||
} catch {
|
||||
return "unknown";
|
||||
}
|
||||
if (err === null) {
|
||||
return "null";
|
||||
}
|
||||
return typeof err;
|
||||
}
|
||||
|
||||
export function diagnosticHttpStatusCode(err: unknown): string | undefined {
|
||||
const status = readOwnDataProperty(err, "status");
|
||||
if (isHttpStatusCode(status)) {
|
||||
return String(status);
|
||||
}
|
||||
const statusCode = readOwnDataProperty(err, "statusCode");
|
||||
if (isHttpStatusCode(statusCode)) {
|
||||
return String(statusCode);
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
@@ -114,6 +114,29 @@ describe("diagnostic-events", () => {
|
||||
expect(events).toEqual([{ trace, type: "message.queued" }]);
|
||||
});
|
||||
|
||||
it("dispatches high-frequency tool and model lifecycle events asynchronously", async () => {
|
||||
const events: string[] = [];
|
||||
onDiagnosticEvent((event) => {
|
||||
events.push(event.type);
|
||||
});
|
||||
|
||||
emitDiagnosticEvent({
|
||||
type: "tool.execution.started",
|
||||
toolName: "read",
|
||||
});
|
||||
emitDiagnosticEvent({
|
||||
type: "model.call.started",
|
||||
runId: "run-1",
|
||||
callId: "call-1",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
});
|
||||
|
||||
expect(events).toEqual([]);
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
expect(events).toEqual(["tool.execution.started", "model.call.started"]);
|
||||
});
|
||||
|
||||
it("skips event enrichment and subscribers when diagnostics are disabled", () => {
|
||||
const nowSpy = vi.spyOn(Date, "now");
|
||||
const seen: string[] = [];
|
||||
|
||||
@@ -306,8 +306,20 @@ type DiagnosticEventsGlobalState = {
|
||||
seq: number;
|
||||
listeners: Set<(evt: DiagnosticEventPayload) => void>;
|
||||
dispatchDepth: number;
|
||||
asyncQueue: DiagnosticEventPayload[];
|
||||
asyncDrainScheduled: boolean;
|
||||
};
|
||||
|
||||
const MAX_ASYNC_DIAGNOSTIC_EVENTS = 10_000;
|
||||
const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set<DiagnosticEventPayload["type"]>([
|
||||
"tool.execution.started",
|
||||
"tool.execution.completed",
|
||||
"tool.execution.error",
|
||||
"model.call.started",
|
||||
"model.call.completed",
|
||||
"model.call.error",
|
||||
]);
|
||||
|
||||
function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
|
||||
const globalStore = globalThis as typeof globalThis & {
|
||||
__openclawDiagnosticEventsState?: DiagnosticEventsGlobalState;
|
||||
@@ -318,6 +330,8 @@ function getDiagnosticEventsState(): DiagnosticEventsGlobalState {
|
||||
seq: 0,
|
||||
listeners: new Set<(evt: DiagnosticEventPayload) => void>(),
|
||||
dispatchDepth: 0,
|
||||
asyncQueue: [],
|
||||
asyncDrainScheduled: false,
|
||||
};
|
||||
}
|
||||
return globalStore.__openclawDiagnosticEventsState;
|
||||
@@ -335,41 +349,79 @@ export function areDiagnosticsEnabledForProcess(): boolean {
|
||||
return getDiagnosticEventsState().enabled;
|
||||
}
|
||||
|
||||
function dispatchDiagnosticEvent(
|
||||
state: DiagnosticEventsGlobalState,
|
||||
enriched: DiagnosticEventPayload,
|
||||
): void {
|
||||
if (state.dispatchDepth > 100) {
|
||||
console.error(
|
||||
`[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${enriched.type}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
state.dispatchDepth += 1;
|
||||
try {
|
||||
for (const listener of state.listeners) {
|
||||
try {
|
||||
listener(enriched);
|
||||
} catch (err) {
|
||||
const errorMessage =
|
||||
err instanceof Error
|
||||
? (err.stack ?? err.message)
|
||||
: typeof err === "string"
|
||||
? err
|
||||
: String(err);
|
||||
console.error(
|
||||
`[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`,
|
||||
);
|
||||
// Ignore listener failures.
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
state.dispatchDepth -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
function scheduleAsyncDiagnosticDrain(state: DiagnosticEventsGlobalState): void {
|
||||
if (state.asyncDrainScheduled) {
|
||||
return;
|
||||
}
|
||||
state.asyncDrainScheduled = true;
|
||||
setImmediate(() => {
|
||||
state.asyncDrainScheduled = false;
|
||||
const batch = state.asyncQueue.splice(0);
|
||||
for (const event of batch) {
|
||||
dispatchDiagnosticEvent(state, event);
|
||||
}
|
||||
if (state.asyncQueue.length > 0) {
|
||||
scheduleAsyncDiagnosticDrain(state);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function emitDiagnosticEvent(event: DiagnosticEventInput) {
|
||||
const state = getDiagnosticEventsState();
|
||||
if (!state.enabled) {
|
||||
return;
|
||||
}
|
||||
if (state.dispatchDepth > 100) {
|
||||
console.error(
|
||||
`[diagnostic-events] recursion guard tripped at depth=${state.dispatchDepth}, dropping type=${event.type}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const enriched = {
|
||||
...event,
|
||||
seq: (state.seq += 1),
|
||||
ts: Date.now(),
|
||||
} satisfies DiagnosticEventPayload;
|
||||
state.dispatchDepth += 1;
|
||||
for (const listener of state.listeners) {
|
||||
try {
|
||||
listener(enriched);
|
||||
} catch (err) {
|
||||
const errorMessage =
|
||||
err instanceof Error
|
||||
? (err.stack ?? err.message)
|
||||
: typeof err === "string"
|
||||
? err
|
||||
: String(err);
|
||||
console.error(
|
||||
`[diagnostic-events] listener error type=${enriched.type} seq=${enriched.seq}: ${errorMessage}`,
|
||||
);
|
||||
// Ignore listener failures.
|
||||
|
||||
if (ASYNC_DIAGNOSTIC_EVENT_TYPES.has(enriched.type)) {
|
||||
if (state.asyncQueue.length >= MAX_ASYNC_DIAGNOSTIC_EVENTS) {
|
||||
return;
|
||||
}
|
||||
state.asyncQueue.push(enriched);
|
||||
scheduleAsyncDiagnosticDrain(state);
|
||||
return;
|
||||
}
|
||||
state.dispatchDepth -= 1;
|
||||
|
||||
dispatchDiagnosticEvent(state, enriched);
|
||||
}
|
||||
|
||||
export function onDiagnosticEvent(listener: (evt: DiagnosticEventPayload) => void): () => void {
|
||||
@@ -386,4 +438,6 @@ export function resetDiagnosticEventsForTest(): void {
|
||||
state.seq = 0;
|
||||
state.listeners.clear();
|
||||
state.dispatchDepth = 0;
|
||||
state.asyncQueue = [];
|
||||
state.asyncDrainScheduled = false;
|
||||
}
|
||||
|
||||
@@ -97,6 +97,41 @@ describe("diagnostic stability recorder", () => {
|
||||
expect(snapshot.events[1]).not.toHaveProperty("reason");
|
||||
});
|
||||
|
||||
it("sanitizes tool and model diagnostic error categories", async () => {
|
||||
startDiagnosticStabilityRecorder();
|
||||
|
||||
emitDiagnosticEvent({
|
||||
type: "tool.execution.error",
|
||||
toolName: "read",
|
||||
durationMs: 1,
|
||||
errorCategory: "bad reason\nwith content",
|
||||
});
|
||||
emitDiagnosticEvent({
|
||||
type: "model.call.error",
|
||||
runId: "run-1",
|
||||
callId: "call-1",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
durationMs: 1,
|
||||
errorCategory: "TypeError",
|
||||
});
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
const snapshot = getDiagnosticStabilitySnapshot({ limit: 10 });
|
||||
|
||||
expect(snapshot.events[0]).toMatchObject({
|
||||
type: "tool.execution.error",
|
||||
toolName: "read",
|
||||
});
|
||||
expect(snapshot.events[0]).not.toHaveProperty("reason");
|
||||
expect(snapshot.events[1]).toMatchObject({
|
||||
type: "model.call.error",
|
||||
provider: "openai",
|
||||
model: "gpt-5.4",
|
||||
reason: "TypeError",
|
||||
});
|
||||
});
|
||||
|
||||
it("summarizes memory and large payload events", () => {
|
||||
startDiagnosticStabilityRecorder();
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
|
||||
case "tool.execution.error":
|
||||
record.toolName = event.toolName;
|
||||
record.durationMs = event.durationMs;
|
||||
record.reason = event.errorCategory;
|
||||
assignReasonCode(record, event.errorCategory);
|
||||
break;
|
||||
case "run.started":
|
||||
record.provider = event.provider;
|
||||
@@ -273,7 +273,7 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
|
||||
record.provider = event.provider;
|
||||
record.model = event.model;
|
||||
record.durationMs = event.durationMs;
|
||||
record.reason = event.errorCategory;
|
||||
assignReasonCode(record, event.errorCategory);
|
||||
break;
|
||||
case "diagnostic.memory.sample":
|
||||
record.memory = copyMemory(event.memory);
|
||||
|
||||
Reference in New Issue
Block a user