From 17c2e953340a5376ff2f9357b0dca7718aadba1c Mon Sep 17 00:00:00 2001 From: Andy Ye <35905412+TurboTheTurtle@users.noreply.github.com> Date: Sun, 31 May 2026 02:29:19 -0700 Subject: [PATCH] fix(ui): prefer Talk source-reply final text Fix Control UI Talk consults so an empty final chat event no longer forces the no-text realtime tool result when a later source-reply or delivery-mirror final contains the answer displayed in the UI. Also makes agent.wait use the chat-side terminal snapshot while a same-runId chat.send is active, so lifecycle completion cannot beat chat post-dispatch/source-reply delivery. Adds regression coverage for delayed source replies, agent.wait failure/timeout handling, the wait-before-source-reply race, gateway wait ordering, and punctuation-only skill searches. Fixes #85275. Co-authored-by: Andy Ye <35905412+TurboTheTurtle@users.noreply.github.com> --- src/agents/tools/skill-workshop-tool.test.ts | 6 + src/gateway/server-methods/agent.ts | 41 +- .../server.chat.gateway-server-chat.test.ts | 19 +- ui/src/ui/chat/realtime-talk-shared.ts | 118 +++++- ui/src/ui/realtime-talk-consult.test.ts | 401 ++++++++++++++++++ 5 files changed, 556 insertions(+), 29 deletions(-) diff --git a/src/agents/tools/skill-workshop-tool.test.ts b/src/agents/tools/skill-workshop-tool.test.ts index c86bcfb49da..64bca28abfb 100644 --- a/src/agents/tools/skill-workshop-tool.test.ts +++ b/src/agents/tools/skill-workshop-tool.test.ts @@ -170,6 +170,12 @@ describe("skill_workshop tool", () => { skillKey: "weather-planner", }), ]); + const punctuationOnly = await tool.execute("call-3b", { + action: "list", + status: "pending", + query: "!!!", + }); + expect((punctuationOnly.details as { proposals: unknown[] }).proposals).toEqual([]); const inspected = await tool.execute("call-4", { action: "inspect", diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 85d9e4c7e86..ddded911ecc 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -2416,14 +2416,6 @@ export const agentHandlers: GatewayRequestHandlers = { const lifecycleAbortController = new AbortController(); const dedupeAbortController = new AbortController(); - const lifecyclePromise = waitForAgentJob({ - runId, - timeoutMs, - signal: lifecycleAbortController.signal, - // When chat.send is active with the same runId, ignore cached lifecycle - // snapshots so stale agent results do not preempt the active chat run. - ignoreCachedSnapshot: hasActiveChatRun, - }); const dedupePromise = waitForTerminalGatewayDedupe({ dedupe: context.dedupe, runId, @@ -2432,6 +2424,39 @@ export const agentHandlers: GatewayRequestHandlers = { ignoreAgentTerminalSnapshot: hasActiveChatRun, }); + if (hasActiveChatRun) { + const snapshot = await dedupePromise; + dedupeAbortController.abort(); + if (!snapshot) { + respond(true, { + runId, + status: "timeout", + timeoutPhase: "gateway_draining", + }); + return; + } + respond(true, { + runId, + status: snapshot.status, + startedAt: snapshot.startedAt, + endedAt: snapshot.endedAt, + error: snapshot.error, + stopReason: snapshot.stopReason, + livenessState: snapshot.livenessState, + yielded: snapshot.yielded, + pendingError: snapshot.pendingError, + timeoutPhase: snapshot.timeoutPhase, + providerStarted: snapshot.providerStarted, + }); + return; + } + + const lifecyclePromise = waitForAgentJob({ + runId, + timeoutMs, + signal: lifecycleAbortController.signal, + }); + const first = await Promise.race([ lifecyclePromise.then((snapshot) => ({ source: "lifecycle" as const, snapshot })), dedupePromise.then((snapshot) => ({ source: "dedupe" as const, snapshot })), diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index 14b20987a18..8ba322d41ba 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -1524,7 +1524,7 @@ describe("gateway server chat", () => { }); }); - test("agent.wait keeps lifecycle wait active while same-runId chat.send is active", async () => { + test("agent.wait ignores lifecycle completion while same-runId chat.send is active", async () => { await withMainSessionStore(async () => { const runId = "idem-wait-chat-active-with-agent-lifecycle"; const releaseBlockedReply = mockBlockedChatReply(); @@ -1532,12 +1532,6 @@ describe("gateway server chat", () => { try { await sendChatAndExpectStarted(runId, "hold chat run open"); - const waitP = rpcReq(ws, "agent.wait", { - runId, - timeoutMs: 1_000, - }); - - await waitForLifecycleWaiter(runId); emitAgentEvent({ runId, stream: "lifecycle", @@ -1549,11 +1543,14 @@ describe("gateway server chat", () => { data: { phase: "end", startedAt: 1, endedAt: 2 }, }); - const waitRes = await waitP; - expect(waitRes.ok).toBe(true); - expect(waitRes.payload?.status).toBe("ok"); + const waitWhileChatActive = await rpcReq(ws, "agent.wait", { + runId, + timeoutMs: 40, + }); + expectAgentWaitTimeout(waitWhileChatActive); - await abortChatRun(runId); + releaseBlockedReply(); + await waitForAgentRunOk(runId); } finally { releaseBlockedReply(); } diff --git a/ui/src/ui/chat/realtime-talk-shared.ts b/ui/src/ui/chat/realtime-talk-shared.ts index 55198484527..836042b5907 100644 --- a/ui/src/ui/chat/realtime-talk-shared.ts +++ b/ui/src/ui/chat/realtime-talk-shared.ts @@ -197,6 +197,21 @@ type ChatPayload = { message?: unknown; }; +type AgentWaitResult = { + status?: string; + error?: string; + stopReason?: string; + endedAt?: number; + pendingError?: boolean; + timeoutPhase?: string; + providerStarted?: boolean; + aborted?: boolean; + livenessState?: string; + yielded?: boolean; +}; + +const EMPTY_FINAL_FALLBACK_GRACE_MS = 500; + function extractTextFromMessage(message: unknown): string { if (!message || typeof message !== "object") { return ""; @@ -218,6 +233,37 @@ function extractTextFromMessage(message: unknown): string { return parts.join("\n\n").trim(); } +function getTerminalAgentWaitError(result: AgentWaitResult | undefined): Error | undefined { + if (!result) { + return undefined; + } + const message = result.error?.trim(); + if (result.status === "error") { + return new Error(message || "OpenClaw tool call failed"); + } + if (result.status !== "timeout" || result.pendingError) { + return undefined; + } + const stopReason = result.stopReason?.trim(); + const timeoutPhase = result.timeoutPhase?.trim(); + const livenessState = result.livenessState?.trim(); + const hasTerminalTimeoutMetadata = + result.endedAt !== undefined || + message !== undefined || + result.aborted === true || + (livenessState !== undefined && livenessState.length > 0) || + result.yielded === true || + (stopReason !== undefined && stopReason.length > 0) || + timeoutPhase === "preflight" || + timeoutPhase === "provider" || + timeoutPhase === "post_turn" || + result.providerStarted === true; + if (hasTerminalTimeoutMetadata) { + return new Error(message || "OpenClaw tool call timed out"); + } + return undefined; +} + function waitForChatResult(params: { client: GatewayBrowserClient; runId: string; @@ -231,15 +277,62 @@ function waitForChatResult(params: { return; } const timer = window.setTimeout(() => { - cleanup(); - reject(new Error("OpenClaw tool call timed out")); + settleReject(new Error("OpenClaw tool call timed out")); }, params.timeoutMs); + let settled = false; + let emptyFinalWaitStarted = false; + let emptyFinalFallbackTimer: number | undefined; const onAbort = () => { - cleanup(); - reject(new DOMException("OpenClaw tool call aborted", "AbortError")); + settleReject(new DOMException("OpenClaw tool call aborted", "AbortError")); }; params.signal?.addEventListener("abort", onAbort, { once: true }); let unsubscribe: () => void = () => undefined; + const settleResolve = (value: string) => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(value); + }; + const settleReject = (error: Error | DOMException) => { + if (settled) { + return; + } + settled = true; + cleanup(); + reject(error); + }; + const waitForEmptyFinalFallback = () => { + if (emptyFinalWaitStarted) { + return; + } + emptyFinalWaitStarted = true; + void params.client + .request("agent.wait", { + runId: params.runId, + timeoutMs: params.timeoutMs, + }) + .then((result) => { + if (settled) { + return; + } + const waitError = getTerminalAgentWaitError(result); + if (waitError) { + settleReject(waitError); + return; + } + if (result?.status === "timeout") { + return; + } + emptyFinalFallbackTimer = window.setTimeout(() => { + settleResolve("OpenClaw finished with no text."); + }, EMPTY_FINAL_FALLBACK_GRACE_MS); + }) + .catch((error) => { + settleReject(error instanceof Error ? error : new Error(String(error))); + }); + }; unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => { if (evt.event !== "chat") { return; @@ -250,20 +343,25 @@ function waitForChatResult(params: { } emitRealtimeTalkAgentProgress(params.emitTalkEvent, payload); if (payload.state === "final") { - cleanup(); - resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text."); + const finalText = extractTextFromMessage(payload.message); + if (finalText) { + settleResolve(finalText); + return; + } + waitForEmptyFinalFallback(); } else if (payload.state === "aborted") { - cleanup(); - reject( + settleReject( new DOMException(payload.errorMessage ?? "OpenClaw tool call aborted", "AbortError"), ); } else if (payload.state === "error") { - cleanup(); - reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed")); + settleReject(new Error(payload.errorMessage ?? "OpenClaw tool call failed")); } }); function cleanup() { window.clearTimeout(timer); + if (emptyFinalFallbackTimer !== undefined) { + window.clearTimeout(emptyFinalFallbackTimer); + } params.signal?.removeEventListener("abort", onAbort); unsubscribe(); } diff --git a/ui/src/ui/realtime-talk-consult.test.ts b/ui/src/ui/realtime-talk-consult.test.ts index 7b081ef66e0..68afe2af205 100644 --- a/ui/src/ui/realtime-talk-consult.test.ts +++ b/ui/src/ui/realtime-talk-consult.test.ts @@ -62,6 +62,407 @@ describe("RealtimeTalkSession consult handoff", () => { expect(submit).toHaveBeenCalledWith("call-1", { result: "Basement lights are off." }); }); + it("prefers source-reply final text over an earlier empty Talk consult final", async () => { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + setImmediate(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: { + role: "assistant", + provider: "openclaw", + model: "delivery-mirror", + text: "The requested status is green.", + }, + }, + }); + }); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return { runId: "run-1", status: "ok" }; + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + await submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + + expect(submit).toHaveBeenCalledWith("call-1", { + result: "The requested status is green.", + }); + }); + + it("waits past the old empty-final grace window for delayed source-reply final text", async () => { + vi.useFakeTimers(); + try { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + window.setTimeout(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + window.setTimeout(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: { + role: "assistant", + provider: "openclaw", + model: "delivery-mirror", + text: "The slow source reply wins.", + }, + }, + }); + }, 300); + }, 0); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return new Promise(() => undefined); + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + const consult = submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + + await vi.advanceTimersByTimeAsync(251); + expect(submit).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(50); + await consult; + + expect(submit).toHaveBeenCalledWith("call-1", { + result: "The slow source reply wins.", + }); + } finally { + vi.useRealTimers(); + } + }); + + it("keeps source-reply final text when the empty-final wait completes later", async () => { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + let resolveWait: ((value: { runId: string; status: "ok" }) => void) | undefined; + const waitResult = new Promise<{ runId: string; status: "ok" }>((resolve) => { + resolveWait = resolve; + }); + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + setImmediate(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + setImmediate(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: { + role: "assistant", + provider: "openclaw", + model: "delivery-mirror", + text: "The source reply still wins.", + }, + }, + }); + }); + }); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return await waitResult; + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + await submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + resolveWait?.({ runId: "run-1", status: "ok" }); + await Promise.resolve(); + + expect(request).toHaveBeenCalledWith("agent.wait", { + runId: "run-1", + timeoutMs: 120_000, + }); + expect(submit).toHaveBeenCalledTimes(1); + expect(submit).toHaveBeenCalledWith("call-1", { + result: "The source reply still wins.", + }); + }); + + it("keeps source-reply final text when the empty-final wait completes first", async () => { + vi.useFakeTimers(); + try { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + window.setTimeout(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + window.setTimeout(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: { + role: "assistant", + provider: "openclaw", + model: "delivery-mirror", + text: "The source reply beats the fallback.", + }, + }, + }); + }, 300); + }, 0); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return { runId: "run-1", status: "ok" }; + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + const consult = submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + + await vi.advanceTimersByTimeAsync(300); + await consult; + + expect(submit).toHaveBeenCalledTimes(1); + expect(submit).toHaveBeenCalledWith("call-1", { + result: "The source reply beats the fallback.", + }); + } finally { + vi.useRealTimers(); + } + }); + + it("submits the no-text fallback after an empty final and completed Gateway run", async () => { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + setImmediate(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + }); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return { runId: "run-1", status: "ok" }; + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + await submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + + expect(request).toHaveBeenCalledWith("agent.wait", { + runId: "run-1", + timeoutMs: 120_000, + }); + expect(submit).toHaveBeenCalledWith("call-1", { + result: "OpenClaw finished with no text.", + }); + }); + + it.each([ + { + waitResult: { runId: "run-1", status: "error", error: "provider authentication failed" }, + expected: "provider authentication failed", + }, + { + waitResult: { + runId: "run-1", + status: "timeout", + stopReason: "rpc", + error: "aborted by operator", + }, + expected: "aborted by operator", + }, + { + waitResult: { + runId: "run-1", + status: "timeout", + error: "preflight setup timed out", + timeoutPhase: "preflight", + }, + expected: "preflight setup timed out", + }, + { + waitResult: { + runId: "run-1", + status: "timeout", + error: "post-turn cleanup timed out", + timeoutPhase: "post_turn", + }, + expected: "post-turn cleanup timed out", + }, + { + waitResult: { + runId: "run-1", + status: "timeout", + stopReason: "timeout", + error: "agent runtime timeout", + }, + expected: "agent runtime timeout", + }, + ])("submits $expected from terminal empty-final waits", async ({ waitResult, expected }) => { + let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; + const request = vi.fn(async (method: string) => { + if (method === "talk.client.toolCall") { + setImmediate(() => { + listener?.({ + event: "chat", + payload: { + runId: "run-1", + state: "final", + message: undefined, + }, + }); + }); + return { runId: "run-1" }; + } + if (method === "agent.wait") { + return waitResult; + } + throw new Error(`unexpected request: ${method}`); + }); + const addEventListener = vi.fn((callback: typeof listener) => { + listener = callback; + return () => { + listener = undefined; + }; + }); + const submit = vi.fn(); + + await submitRealtimeTalkConsult({ + ctx: { + client: { request, addEventListener }, + sessionKey: "agent:main:main", + callbacks: {}, + } as never, + callId: "call-1", + args: { question: "Check status" }, + submit, + }); + + expect(submit).toHaveBeenCalledWith("call-1", { error: expected }); + }); + it("emits Talk progress from chat tool events while waiting for the consult result", async () => { let listener: ((event: { event: string; payload?: unknown }) => void) | undefined; const request = vi.fn(async (method: string) => {