Files
openclaw/src/agents/pi-embedded-subscribe.handlers.lifecycle.test.ts
Gustavo Madeira Santana 32fa5c3be5 fix(agents): resolve compaction wait before channel flush (#59308)
Merged via squash.

Prepared head SHA: bf17502df8
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
2026-04-01 21:40:23 -04:00

228 lines
7.5 KiB
TypeScript

import { describe, expect, it, vi } from "vitest";
import { createInlineCodeState } from "../markdown/code-spans.js";
import { handleAgentEnd } from "./pi-embedded-subscribe.handlers.lifecycle.js";
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
vi.mock("../infra/agent-events.js", () => ({
emitAgentEvent: vi.fn(),
}));
function createContext(
lastAssistant: unknown,
overrides?: {
onAgentEvent?: (event: unknown) => void;
onBlockReplyFlush?: () => void | Promise<void>;
},
): EmbeddedPiSubscribeContext {
const onBlockReply = vi.fn();
return {
params: {
runId: "run-1",
config: {},
sessionKey: "agent:main:main",
onAgentEvent: overrides?.onAgentEvent,
onBlockReply,
onBlockReplyFlush: overrides?.onBlockReplyFlush,
},
state: {
lastAssistant: lastAssistant as EmbeddedPiSubscribeContext["state"]["lastAssistant"],
pendingCompactionRetry: 0,
pendingToolMediaUrls: [],
pendingToolAudioAsVoice: false,
blockState: {
thinking: true,
final: true,
inlineCode: createInlineCodeState(),
},
},
log: {
debug: vi.fn(),
warn: vi.fn(),
},
flushBlockReplyBuffer: vi.fn(),
emitBlockReply: onBlockReply,
resolveCompactionRetry: vi.fn(),
maybeResolveCompactionWait: vi.fn(),
} as unknown as EmbeddedPiSubscribeContext;
}
describe("handleAgentEnd", () => {
it("logs the resolved error message when run ends with assistant error", async () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
role: "assistant",
stopReason: "error",
errorMessage: "connection refused",
content: [{ type: "text", text: "" }],
},
{ onAgentEvent },
);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "LLM request failed: connection refused by the provider endpoint.",
rawErrorPreview: "connection refused",
consoleMessage:
"embedded run agent end: runId=run-1 isError=true model=unknown provider=unknown error=LLM request failed: connection refused by the provider endpoint. rawError=connection refused",
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "error",
error: "LLM request failed: connection refused by the provider endpoint.",
},
});
});
it("attaches raw provider error metadata and includes model/provider in console output", async () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
provider: "anthropic",
model: "claude-test",
errorMessage: '{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
content: [{ type: "text", text: "" }],
});
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toBe("embedded run agent end");
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
runId: "run-1",
error: "The AI service is temporarily overloaded. Please try again in a moment.",
failoverReason: "overloaded",
providerErrorType: "overloaded_error",
consoleMessage:
'embedded run agent end: runId=run-1 isError=true model=claude-test provider=anthropic error=The AI service is temporarily overloaded. Please try again in a moment. rawError={"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}',
});
});
it("sanitizes model and provider before writing consoleMessage", async () => {
const ctx = createContext({
role: "assistant",
stopReason: "error",
provider: "anthropic\u001b]8;;https://evil.test\u0007",
model: "claude\tsonnet\n4",
errorMessage: "connection refused",
content: [{ type: "text", text: "" }],
});
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
const meta = warn.mock.calls[0]?.[1];
expect(meta).toMatchObject({
consoleMessage:
"embedded run agent end: runId=run-1 isError=true model=claude sonnet 4 provider=anthropic]8;;https://evil.test error=LLM request failed: connection refused by the provider endpoint. rawError=connection refused",
});
expect(meta?.consoleMessage).not.toContain("\n");
expect(meta?.consoleMessage).not.toContain("\r");
expect(meta?.consoleMessage).not.toContain("\t");
expect(meta?.consoleMessage).not.toContain("\u001b");
});
it("redacts logged error text before emitting lifecycle events", async () => {
const onAgentEvent = vi.fn();
const ctx = createContext(
{
role: "assistant",
stopReason: "error",
errorMessage: "x-api-key: sk-abcdefghijklmnopqrstuvwxyz123456",
content: [{ type: "text", text: "" }],
},
{ onAgentEvent },
);
await handleAgentEnd(ctx);
const warn = vi.mocked(ctx.log.warn);
expect(warn.mock.calls[0]?.[1]).toMatchObject({
event: "embedded_run_agent_end",
error: "x-api-key: ***",
rawErrorPreview: "x-api-key: ***",
});
expect(onAgentEvent).toHaveBeenCalledWith({
stream: "lifecycle",
data: {
phase: "error",
error: "x-api-key: ***",
},
});
});
it("keeps non-error run-end logging on debug only", async () => {
const ctx = createContext(undefined);
await handleAgentEnd(ctx);
expect(ctx.log.warn).not.toHaveBeenCalled();
expect(ctx.log.debug).toHaveBeenCalledWith("embedded run agent end: runId=run-1 isError=false");
});
it("flushes orphaned tool media as a media-only block reply", async () => {
const ctx = createContext(undefined);
ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"];
ctx.state.pendingToolAudioAsVoice = true;
await handleAgentEnd(ctx);
expect(ctx.emitBlockReply).toHaveBeenCalledWith({
mediaUrls: ["/tmp/reply.opus"],
audioAsVoice: true,
});
expect(ctx.state.pendingToolMediaUrls).toEqual([]);
expect(ctx.state.pendingToolAudioAsVoice).toBe(false);
});
it("resolves compaction wait before awaiting an async block reply flush", async () => {
let resolveFlush: (() => void) | undefined;
const ctx = createContext(undefined);
ctx.flushBlockReplyBuffer = vi
.fn()
.mockImplementationOnce(
() =>
new Promise<void>((resolve) => {
resolveFlush = resolve;
}),
)
.mockImplementation(() => {});
const endPromise = handleAgentEnd(ctx);
expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1);
expect(ctx.resolveCompactionRetry).not.toHaveBeenCalled();
resolveFlush?.();
await endPromise;
});
it("resolves compaction wait before awaiting an async channel flush", async () => {
let resolveChannelFlush: (() => void) | undefined;
const onBlockReplyFlush = vi.fn(
() =>
new Promise<void>((resolve) => {
resolveChannelFlush = resolve;
}),
);
const ctx = createContext(undefined, { onBlockReplyFlush });
const endPromise = handleAgentEnd(ctx);
expect(ctx.maybeResolveCompactionWait).toHaveBeenCalledTimes(1);
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
resolveChannelFlush?.();
await endPromise;
});
});