mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 09:41:11 +00:00
fix: don't broadcast state:error on per-attempt lifecycle errors (#60043) (thanks @jwchmodx) (#60043)
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/containers: auto-bind to `0.0.0.0` during container startup for Docker and Podman compatibility, while keeping host-side status and doctor checks on the hardened loopback default when `gateway.bind` is unset. (#61818) Thanks @openperf.
|
||||
- TUI/status: route `/status` through the shared session-status command and move the old gateway-wide diagnostic summary to `/gateway-status` (`/gwstatus`). Thanks @vincentkoc.
|
||||
- Agents/history: use one shared assistant-visible sanitizer across embedded delivery and chat-history extraction so leaked `<tool_call>` and `<tool_result>` XML blocks stay hidden from user-facing replies. (#61729) Thanks @openperf.
|
||||
- Gateway/TUI: defer terminal chat finalization for per-attempt lifecycle errors so fallback retries keep streaming before the run is marked failed. (#60043) Thanks @jwchmodx.
|
||||
|
||||
## 2026.4.5
|
||||
|
||||
|
||||
@@ -48,18 +48,22 @@ describe("agent event handler", () => {
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
resetAgentRunContextForTest();
|
||||
});
|
||||
|
||||
function createHarness(params?: {
|
||||
now?: number;
|
||||
resolveSessionKeyForRun?: (runId: string) => string | undefined;
|
||||
lifecycleErrorRetryGraceMs?: number;
|
||||
isChatSendRunActive?: (runId: string) => boolean;
|
||||
}) {
|
||||
const nowSpy =
|
||||
params?.now === undefined ? undefined : vi.spyOn(Date, "now").mockReturnValue(params.now);
|
||||
const broadcast = vi.fn();
|
||||
const broadcastToConnIds = vi.fn();
|
||||
const nodeSendToSession = vi.fn();
|
||||
const clearAgentRunContext = vi.fn();
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const chatRunState = createChatRunState();
|
||||
const toolEventRecipients = createToolEventRecipientRegistry();
|
||||
@@ -72,9 +76,11 @@ describe("agent event handler", () => {
|
||||
agentRunSeq,
|
||||
chatRunState,
|
||||
resolveSessionKeyForRun: params?.resolveSessionKeyForRun ?? (() => undefined),
|
||||
clearAgentRunContext: vi.fn(),
|
||||
clearAgentRunContext,
|
||||
toolEventRecipients,
|
||||
sessionEventSubscribers,
|
||||
lifecycleErrorRetryGraceMs: params?.lifecycleErrorRetryGraceMs,
|
||||
isChatSendRunActive: params?.isChatSendRunActive,
|
||||
});
|
||||
|
||||
return {
|
||||
@@ -82,6 +88,7 @@ describe("agent event handler", () => {
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
nodeSendToSession,
|
||||
clearAgentRunContext,
|
||||
agentRunSeq,
|
||||
chatRunState,
|
||||
toolEventRecipients,
|
||||
@@ -1066,6 +1073,148 @@ describe("agent event handler", () => {
|
||||
expect(nodePayload.runId).toBe("run-fallback-client");
|
||||
});
|
||||
|
||||
it("keeps chat-linked run remapping alive across per-attempt lifecycle errors", () => {
|
||||
vi.useFakeTimers();
|
||||
const { broadcast, chatRunState, clearAgentRunContext, agentRunSeq, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-fallback",
|
||||
lifecycleErrorRetryGraceMs: 100,
|
||||
});
|
||||
chatRunState.registry.add("run-fallback-retry", {
|
||||
sessionKey: "session-fallback",
|
||||
clientRunId: "run-fallback-client",
|
||||
});
|
||||
|
||||
handler({
|
||||
runId: "run-fallback-retry",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "draft" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-fallback-retry",
|
||||
seq: 2,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "error", error: "provider failed" },
|
||||
});
|
||||
|
||||
expect(chatRunState.registry.peek("run-fallback-retry")).toEqual({
|
||||
sessionKey: "session-fallback",
|
||||
clientRunId: "run-fallback-client",
|
||||
});
|
||||
expect(clearAgentRunContext).not.toHaveBeenCalled();
|
||||
expect(agentRunSeq.get("run-fallback-retry")).toBe(2);
|
||||
|
||||
emitFallbackLifecycle({
|
||||
handler,
|
||||
runId: "run-fallback-retry",
|
||||
seq: 3,
|
||||
sessionKey: "session-fallback",
|
||||
});
|
||||
const agentCalls = broadcast.mock.calls.filter(([event]) => event === "agent");
|
||||
const fallbackPayload = agentCalls.at(-1)?.[1] as {
|
||||
runId?: string;
|
||||
data?: Record<string, unknown>;
|
||||
};
|
||||
expect(fallbackPayload.runId).toBe("run-fallback-client");
|
||||
expect(fallbackPayload.data?.phase).toBe("fallback");
|
||||
|
||||
emitLifecycleEnd(handler, "run-fallback-retry", 4);
|
||||
|
||||
expect(
|
||||
chatBroadcastCalls(broadcast).some(
|
||||
([, payload]) => (payload as { state?: string }).state === "error",
|
||||
),
|
||||
).toBe(false);
|
||||
const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as {
|
||||
state?: string;
|
||||
runId?: string;
|
||||
};
|
||||
expect(finalPayload.state).toBe("final");
|
||||
expect(finalPayload.runId).toBe("run-fallback-client");
|
||||
expect(clearAgentRunContext).toHaveBeenCalledWith("run-fallback-retry");
|
||||
expect(agentRunSeq.has("run-fallback-retry")).toBe(false);
|
||||
});
|
||||
|
||||
it("defers terminal lifecycle-error cleanup for non-chat-send runs until the retry grace expires", () => {
|
||||
vi.useFakeTimers();
|
||||
const { broadcast, clearAgentRunContext, agentRunSeq, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-terminal-error",
|
||||
lifecycleErrorRetryGraceMs: 100,
|
||||
});
|
||||
registerAgentRunContext("run-terminal-error", { sessionKey: "session-terminal-error" });
|
||||
|
||||
handler({
|
||||
runId: "run-terminal-error",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "partial" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-terminal-error",
|
||||
seq: 2,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "error", error: "still broken" },
|
||||
});
|
||||
|
||||
expect(clearAgentRunContext).not.toHaveBeenCalled();
|
||||
expect(agentRunSeq.get("run-terminal-error")).toBe(2);
|
||||
expect(
|
||||
chatBroadcastCalls(broadcast).some(
|
||||
([, payload]) => (payload as { state?: string }).state === "error",
|
||||
),
|
||||
).toBe(false);
|
||||
|
||||
vi.advanceTimersByTime(100);
|
||||
|
||||
const finalPayload = chatBroadcastCalls(broadcast).at(-1)?.[1] as {
|
||||
state?: string;
|
||||
runId?: string;
|
||||
};
|
||||
expect(finalPayload.state).toBe("error");
|
||||
expect(finalPayload.runId).toBe("run-terminal-error");
|
||||
expect(clearAgentRunContext).toHaveBeenCalledWith("run-terminal-error");
|
||||
expect(agentRunSeq.has("run-terminal-error")).toBe(false);
|
||||
});
|
||||
|
||||
it("suppresses delayed lifecycle chat errors for active chat.send runs while still cleaning up", () => {
|
||||
vi.useFakeTimers();
|
||||
const { broadcast, clearAgentRunContext, agentRunSeq, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-chat-send",
|
||||
lifecycleErrorRetryGraceMs: 100,
|
||||
isChatSendRunActive: (runId) => runId === "run-chat-send",
|
||||
});
|
||||
registerAgentRunContext("run-chat-send", { sessionKey: "session-chat-send" });
|
||||
|
||||
handler({
|
||||
runId: "run-chat-send",
|
||||
seq: 1,
|
||||
stream: "assistant",
|
||||
ts: Date.now(),
|
||||
data: { text: "partial" },
|
||||
});
|
||||
handler({
|
||||
runId: "run-chat-send",
|
||||
seq: 2,
|
||||
stream: "lifecycle",
|
||||
ts: Date.now(),
|
||||
data: { phase: "error", error: "chat.send failed" },
|
||||
});
|
||||
|
||||
vi.advanceTimersByTime(100);
|
||||
|
||||
expect(
|
||||
chatBroadcastCalls(broadcast).some(
|
||||
([, payload]) => (payload as { state?: string }).state === "error",
|
||||
),
|
||||
).toBe(false);
|
||||
expect(clearAgentRunContext).toHaveBeenCalledWith("run-chat-send");
|
||||
expect(agentRunSeq.has("run-chat-send")).toBe(false);
|
||||
});
|
||||
|
||||
it("suppresses chat and node session events for non-control-UI-visible runs", () => {
|
||||
const { broadcast, nodeSendToSession, handler } = createHarness({
|
||||
resolveSessionKeyForRun: () => "session-hidden",
|
||||
|
||||
@@ -263,6 +263,11 @@ type ToolRecipientEntry = {
|
||||
|
||||
const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000;
|
||||
const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000;
|
||||
/**
|
||||
* Keep this aligned with the agent.wait lifecycle-error grace so chat surfaces
|
||||
* do not finalize a run before fallback or retry reuses the same runId.
|
||||
*/
|
||||
const AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
|
||||
|
||||
export function createSessionEventSubscriberRegistry(): SessionEventSubscriberRegistry {
|
||||
const connIds = new Set<string>();
|
||||
@@ -449,6 +454,8 @@ export type AgentEventHandlerOptions = {
|
||||
clearAgentRunContext: (runId: string) => void;
|
||||
toolEventRecipients: ToolEventRecipientRegistry;
|
||||
sessionEventSubscribers: SessionEventSubscriberRegistry;
|
||||
lifecycleErrorRetryGraceMs?: number;
|
||||
isChatSendRunActive?: (runId: string) => boolean;
|
||||
};
|
||||
|
||||
export function createAgentEventHandler({
|
||||
@@ -461,7 +468,26 @@ export function createAgentEventHandler({
|
||||
clearAgentRunContext,
|
||||
toolEventRecipients,
|
||||
sessionEventSubscribers,
|
||||
lifecycleErrorRetryGraceMs = AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS,
|
||||
isChatSendRunActive = () => false,
|
||||
}: AgentEventHandlerOptions) {
|
||||
const pendingTerminalLifecycleErrors = new Map<string, NodeJS.Timeout>();
|
||||
|
||||
const clearBufferedChatState = (clientRunId: string) => {
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
|
||||
};
|
||||
|
||||
const clearPendingTerminalLifecycleError = (runId: string) => {
|
||||
const pending = pendingTerminalLifecycleErrors.get(runId);
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(pending);
|
||||
pendingTerminalLifecycleErrors.delete(runId);
|
||||
};
|
||||
|
||||
const buildSessionEventSnapshot = (sessionKey: string, evt?: AgentEventPayload) => {
|
||||
const row = loadGatewaySessionRow(sessionKey);
|
||||
const lifecyclePatch = evt
|
||||
@@ -531,6 +557,110 @@ export function createAgentEventHandler({
|
||||
};
|
||||
};
|
||||
|
||||
const finalizeLifecycleEvent = (
|
||||
evt: AgentEventPayload,
|
||||
opts?: { skipChatErrorFinal?: boolean },
|
||||
) => {
|
||||
const lifecyclePhase =
|
||||
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
if (lifecyclePhase !== "end" && lifecyclePhase !== "error") {
|
||||
return;
|
||||
}
|
||||
|
||||
clearPendingTerminalLifecycleError(evt.runId);
|
||||
|
||||
const chatLink = chatRunState.registry.peek(evt.runId);
|
||||
const eventSessionKey =
|
||||
typeof evt.sessionKey === "string" && evt.sessionKey.trim() ? evt.sessionKey : undefined;
|
||||
const isControlUiVisible = getAgentRunContext(evt.runId)?.isControlUiVisible ?? true;
|
||||
const sessionKey =
|
||||
chatLink?.sessionKey ?? eventSessionKey ?? resolveSessionKeyForRun(evt.runId);
|
||||
const clientRunId = chatLink?.clientRunId ?? evt.runId;
|
||||
const eventRunId = chatLink?.clientRunId ?? evt.runId;
|
||||
const isAborted =
|
||||
chatRunState.abortedRuns.has(clientRunId) || chatRunState.abortedRuns.has(evt.runId);
|
||||
|
||||
if (isControlUiVisible && sessionKey) {
|
||||
if (!isAborted) {
|
||||
const evtStopReason =
|
||||
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
|
||||
if (chatLink) {
|
||||
const finished = chatRunState.registry.shift(evt.runId);
|
||||
if (!finished) {
|
||||
clearAgentRunContext(evt.runId);
|
||||
return;
|
||||
}
|
||||
if (!(opts?.skipChatErrorFinal && lifecyclePhase === "error")) {
|
||||
emitChatFinal(
|
||||
finished.sessionKey,
|
||||
finished.clientRunId,
|
||||
evt.runId,
|
||||
evt.seq,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
evtStopReason,
|
||||
);
|
||||
}
|
||||
} else if (!(opts?.skipChatErrorFinal && lifecyclePhase === "error")) {
|
||||
emitChatFinal(
|
||||
sessionKey,
|
||||
eventRunId,
|
||||
evt.runId,
|
||||
evt.seq,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
evtStopReason,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
chatRunState.abortedRuns.delete(clientRunId);
|
||||
chatRunState.abortedRuns.delete(evt.runId);
|
||||
clearBufferedChatState(clientRunId);
|
||||
if (chatLink) {
|
||||
chatRunState.registry.remove(evt.runId, clientRunId, sessionKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
toolEventRecipients.markFinal(evt.runId);
|
||||
clearAgentRunContext(evt.runId);
|
||||
agentRunSeq.delete(evt.runId);
|
||||
agentRunSeq.delete(clientRunId);
|
||||
|
||||
if (sessionKey) {
|
||||
void persistGatewaySessionLifecycleEvent({ sessionKey, event: evt }).catch(() => undefined);
|
||||
const sessionEventConnIds = sessionEventSubscribers.getAll();
|
||||
if (sessionEventConnIds.size > 0) {
|
||||
broadcastToConnIds(
|
||||
"sessions.changed",
|
||||
{
|
||||
sessionKey,
|
||||
phase: lifecyclePhase,
|
||||
runId: evt.runId,
|
||||
ts: evt.ts,
|
||||
...buildSessionEventSnapshot(sessionKey, evt),
|
||||
},
|
||||
sessionEventConnIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const scheduleTerminalLifecycleError = (
|
||||
evt: AgentEventPayload,
|
||||
opts?: { skipChatErrorFinal?: boolean },
|
||||
) => {
|
||||
clearPendingTerminalLifecycleError(evt.runId);
|
||||
const delayMs = Math.max(1, Math.min(Math.floor(lifecycleErrorRetryGraceMs), 2_147_483_647));
|
||||
const timer = setTimeout(() => {
|
||||
pendingTerminalLifecycleErrors.delete(evt.runId);
|
||||
finalizeLifecycleEvent(evt, opts);
|
||||
}, delayMs);
|
||||
timer.unref?.();
|
||||
pendingTerminalLifecycleErrors.set(evt.runId, timer);
|
||||
};
|
||||
|
||||
const emitChatDelta = (
|
||||
sessionKey: string,
|
||||
clientRunId: string,
|
||||
@@ -714,6 +844,12 @@ export function createAgentEventHandler({
|
||||
};
|
||||
|
||||
return (evt: AgentEventPayload) => {
|
||||
const lifecyclePhase =
|
||||
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
if (evt.stream !== "lifecycle" || lifecyclePhase !== "error") {
|
||||
clearPendingTerminalLifecycleError(evt.runId);
|
||||
}
|
||||
|
||||
const chatLink = chatRunState.registry.peek(evt.runId);
|
||||
const eventSessionKey =
|
||||
typeof evt.sessionKey === "string" && evt.sessionKey.trim() ? evt.sessionKey : undefined;
|
||||
@@ -800,9 +936,6 @@ export function createAgentEventHandler({
|
||||
broadcast("agent", agentPayload);
|
||||
}
|
||||
|
||||
const lifecyclePhase =
|
||||
evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null;
|
||||
|
||||
if (isControlUiVisible && sessionKey) {
|
||||
// Send tool events to node/channel subscribers only when verbose is enabled;
|
||||
// WS clients already received the event above via broadcastToConnIds.
|
||||
@@ -815,57 +948,26 @@ export function createAgentEventHandler({
|
||||
}
|
||||
if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") {
|
||||
emitChatDelta(sessionKey, clientRunId, evt.runId, evt.seq, evt.data.text, evt.data.delta);
|
||||
} else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
|
||||
const evtStopReason =
|
||||
typeof evt.data?.stopReason === "string" ? evt.data.stopReason : undefined;
|
||||
if (chatLink) {
|
||||
const finished = chatRunState.registry.shift(evt.runId);
|
||||
if (!finished) {
|
||||
clearAgentRunContext(evt.runId);
|
||||
return;
|
||||
}
|
||||
emitChatFinal(
|
||||
finished.sessionKey,
|
||||
finished.clientRunId,
|
||||
evt.runId,
|
||||
evt.seq,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
evtStopReason,
|
||||
);
|
||||
} else {
|
||||
emitChatFinal(
|
||||
sessionKey,
|
||||
eventRunId,
|
||||
evt.runId,
|
||||
evt.seq,
|
||||
lifecyclePhase === "error" ? "error" : "done",
|
||||
evt.data?.error,
|
||||
evtStopReason,
|
||||
);
|
||||
}
|
||||
} else if (isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) {
|
||||
chatRunState.abortedRuns.delete(clientRunId);
|
||||
chatRunState.abortedRuns.delete(evt.runId);
|
||||
chatRunState.buffers.delete(clientRunId);
|
||||
chatRunState.deltaSentAt.delete(clientRunId);
|
||||
if (chatLink) {
|
||||
chatRunState.registry.remove(evt.runId, clientRunId, sessionKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
|
||||
toolEventRecipients.markFinal(evt.runId);
|
||||
clearAgentRunContext(evt.runId);
|
||||
agentRunSeq.delete(evt.runId);
|
||||
agentRunSeq.delete(clientRunId);
|
||||
if (lifecyclePhase === "error") {
|
||||
clearBufferedChatState(clientRunId);
|
||||
const skipChatErrorFinal = isChatSendRunActive(evt.runId) && !chatLink;
|
||||
if (isAborted || lifecycleErrorRetryGraceMs <= 0) {
|
||||
finalizeLifecycleEvent(evt, { skipChatErrorFinal });
|
||||
} else {
|
||||
scheduleTerminalLifecycleError(evt, { skipChatErrorFinal });
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
sessionKey &&
|
||||
(lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error")
|
||||
) {
|
||||
if (lifecyclePhase === "end") {
|
||||
finalizeLifecycleEvent(evt);
|
||||
return;
|
||||
}
|
||||
|
||||
if (sessionKey && lifecyclePhase === "start") {
|
||||
void persistGatewaySessionLifecycleEvent({ sessionKey, event: evt }).catch(() => undefined);
|
||||
const sessionEventConnIds = sessionEventSubscribers.getAll();
|
||||
if (sessionEventConnIds.size > 0) {
|
||||
|
||||
@@ -971,6 +971,7 @@ export async function startGatewayServer(
|
||||
clearAgentRunContext,
|
||||
toolEventRecipients,
|
||||
sessionEventSubscribers,
|
||||
isChatSendRunActive: (runId) => chatAbortControllers.has(runId),
|
||||
}),
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user