fix(ui): prefer Talk source-reply final text

Fix Control UI Talk consults so an empty final chat event no longer forces the no-text realtime tool result when a later source-reply or delivery-mirror final contains the answer displayed in the UI.

Also makes agent.wait use the chat-side terminal snapshot while a same-runId chat.send is active, so lifecycle completion cannot beat chat post-dispatch/source-reply delivery.

Adds regression coverage for delayed source replies, agent.wait failure/timeout handling, the wait-before-source-reply race, gateway wait ordering, and punctuation-only skill searches.

Fixes #85275.

Co-authored-by: Andy Ye <35905412+TurboTheTurtle@users.noreply.github.com>
This commit is contained in:
Andy Ye
2026-05-31 02:29:19 -07:00
committed by GitHub
parent a0d2febe6b
commit 17c2e95334
5 changed files with 556 additions and 29 deletions

View File

@@ -170,6 +170,12 @@ describe("skill_workshop tool", () => {
skillKey: "weather-planner",
}),
]);
const punctuationOnly = await tool.execute("call-3b", {
action: "list",
status: "pending",
query: "!!!",
});
expect((punctuationOnly.details as { proposals: unknown[] }).proposals).toEqual([]);
const inspected = await tool.execute("call-4", {
action: "inspect",

View File

@@ -2416,14 +2416,6 @@ export const agentHandlers: GatewayRequestHandlers = {
const lifecycleAbortController = new AbortController();
const dedupeAbortController = new AbortController();
const lifecyclePromise = waitForAgentJob({
runId,
timeoutMs,
signal: lifecycleAbortController.signal,
// When chat.send is active with the same runId, ignore cached lifecycle
// snapshots so stale agent results do not preempt the active chat run.
ignoreCachedSnapshot: hasActiveChatRun,
});
const dedupePromise = waitForTerminalGatewayDedupe({
dedupe: context.dedupe,
runId,
@@ -2432,6 +2424,39 @@ export const agentHandlers: GatewayRequestHandlers = {
ignoreAgentTerminalSnapshot: hasActiveChatRun,
});
if (hasActiveChatRun) {
const snapshot = await dedupePromise;
dedupeAbortController.abort();
if (!snapshot) {
respond(true, {
runId,
status: "timeout",
timeoutPhase: "gateway_draining",
});
return;
}
respond(true, {
runId,
status: snapshot.status,
startedAt: snapshot.startedAt,
endedAt: snapshot.endedAt,
error: snapshot.error,
stopReason: snapshot.stopReason,
livenessState: snapshot.livenessState,
yielded: snapshot.yielded,
pendingError: snapshot.pendingError,
timeoutPhase: snapshot.timeoutPhase,
providerStarted: snapshot.providerStarted,
});
return;
}
const lifecyclePromise = waitForAgentJob({
runId,
timeoutMs,
signal: lifecycleAbortController.signal,
});
const first = await Promise.race([
lifecyclePromise.then((snapshot) => ({ source: "lifecycle" as const, snapshot })),
dedupePromise.then((snapshot) => ({ source: "dedupe" as const, snapshot })),

View File

@@ -1524,7 +1524,7 @@ describe("gateway server chat", () => {
});
});
test("agent.wait keeps lifecycle wait active while same-runId chat.send is active", async () => {
test("agent.wait ignores lifecycle completion while same-runId chat.send is active", async () => {
await withMainSessionStore(async () => {
const runId = "idem-wait-chat-active-with-agent-lifecycle";
const releaseBlockedReply = mockBlockedChatReply();
@@ -1532,12 +1532,6 @@ describe("gateway server chat", () => {
try {
await sendChatAndExpectStarted(runId, "hold chat run open");
const waitP = rpcReq(ws, "agent.wait", {
runId,
timeoutMs: 1_000,
});
await waitForLifecycleWaiter(runId);
emitAgentEvent({
runId,
stream: "lifecycle",
@@ -1549,11 +1543,14 @@ describe("gateway server chat", () => {
data: { phase: "end", startedAt: 1, endedAt: 2 },
});
const waitRes = await waitP;
expect(waitRes.ok).toBe(true);
expect(waitRes.payload?.status).toBe("ok");
const waitWhileChatActive = await rpcReq(ws, "agent.wait", {
runId,
timeoutMs: 40,
});
expectAgentWaitTimeout(waitWhileChatActive);
await abortChatRun(runId);
releaseBlockedReply();
await waitForAgentRunOk(runId);
} finally {
releaseBlockedReply();
}

View File

@@ -197,6 +197,21 @@ type ChatPayload = {
message?: unknown;
};
type AgentWaitResult = {
status?: string;
error?: string;
stopReason?: string;
endedAt?: number;
pendingError?: boolean;
timeoutPhase?: string;
providerStarted?: boolean;
aborted?: boolean;
livenessState?: string;
yielded?: boolean;
};
const EMPTY_FINAL_FALLBACK_GRACE_MS = 500;
function extractTextFromMessage(message: unknown): string {
if (!message || typeof message !== "object") {
return "";
@@ -218,6 +233,37 @@ function extractTextFromMessage(message: unknown): string {
return parts.join("\n\n").trim();
}
function getTerminalAgentWaitError(result: AgentWaitResult | undefined): Error | undefined {
if (!result) {
return undefined;
}
const message = result.error?.trim();
if (result.status === "error") {
return new Error(message || "OpenClaw tool call failed");
}
if (result.status !== "timeout" || result.pendingError) {
return undefined;
}
const stopReason = result.stopReason?.trim();
const timeoutPhase = result.timeoutPhase?.trim();
const livenessState = result.livenessState?.trim();
const hasTerminalTimeoutMetadata =
result.endedAt !== undefined ||
message !== undefined ||
result.aborted === true ||
(livenessState !== undefined && livenessState.length > 0) ||
result.yielded === true ||
(stopReason !== undefined && stopReason.length > 0) ||
timeoutPhase === "preflight" ||
timeoutPhase === "provider" ||
timeoutPhase === "post_turn" ||
result.providerStarted === true;
if (hasTerminalTimeoutMetadata) {
return new Error(message || "OpenClaw tool call timed out");
}
return undefined;
}
function waitForChatResult(params: {
client: GatewayBrowserClient;
runId: string;
@@ -231,15 +277,62 @@ function waitForChatResult(params: {
return;
}
const timer = window.setTimeout(() => {
cleanup();
reject(new Error("OpenClaw tool call timed out"));
settleReject(new Error("OpenClaw tool call timed out"));
}, params.timeoutMs);
let settled = false;
let emptyFinalWaitStarted = false;
let emptyFinalFallbackTimer: number | undefined;
const onAbort = () => {
cleanup();
reject(new DOMException("OpenClaw tool call aborted", "AbortError"));
settleReject(new DOMException("OpenClaw tool call aborted", "AbortError"));
};
params.signal?.addEventListener("abort", onAbort, { once: true });
let unsubscribe: () => void = () => undefined;
const settleResolve = (value: string) => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve(value);
};
const settleReject = (error: Error | DOMException) => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(error);
};
const waitForEmptyFinalFallback = () => {
if (emptyFinalWaitStarted) {
return;
}
emptyFinalWaitStarted = true;
void params.client
.request<AgentWaitResult>("agent.wait", {
runId: params.runId,
timeoutMs: params.timeoutMs,
})
.then((result) => {
if (settled) {
return;
}
const waitError = getTerminalAgentWaitError(result);
if (waitError) {
settleReject(waitError);
return;
}
if (result?.status === "timeout") {
return;
}
emptyFinalFallbackTimer = window.setTimeout(() => {
settleResolve("OpenClaw finished with no text.");
}, EMPTY_FINAL_FALLBACK_GRACE_MS);
})
.catch((error) => {
settleReject(error instanceof Error ? error : new Error(String(error)));
});
};
unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => {
if (evt.event !== "chat") {
return;
@@ -250,20 +343,25 @@ function waitForChatResult(params: {
}
emitRealtimeTalkAgentProgress(params.emitTalkEvent, payload);
if (payload.state === "final") {
cleanup();
resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text.");
const finalText = extractTextFromMessage(payload.message);
if (finalText) {
settleResolve(finalText);
return;
}
waitForEmptyFinalFallback();
} else if (payload.state === "aborted") {
cleanup();
reject(
settleReject(
new DOMException(payload.errorMessage ?? "OpenClaw tool call aborted", "AbortError"),
);
} else if (payload.state === "error") {
cleanup();
reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed"));
settleReject(new Error(payload.errorMessage ?? "OpenClaw tool call failed"));
}
});
function cleanup() {
window.clearTimeout(timer);
if (emptyFinalFallbackTimer !== undefined) {
window.clearTimeout(emptyFinalFallbackTimer);
}
params.signal?.removeEventListener("abort", onAbort);
unsubscribe();
}

View File

@@ -62,6 +62,407 @@ describe("RealtimeTalkSession consult handoff", () => {
expect(submit).toHaveBeenCalledWith("call-1", { result: "Basement lights are off." });
});
it("prefers source-reply final text over an earlier empty Talk consult final", async () => {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
setImmediate(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: {
role: "assistant",
provider: "openclaw",
model: "delivery-mirror",
text: "The requested status is green.",
},
},
});
});
return { runId: "run-1" };
}
if (method === "agent.wait") {
return { runId: "run-1", status: "ok" };
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
await submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
expect(submit).toHaveBeenCalledWith("call-1", {
result: "The requested status is green.",
});
});
it("waits past the old empty-final grace window for delayed source-reply final text", async () => {
vi.useFakeTimers();
try {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
window.setTimeout(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
window.setTimeout(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: {
role: "assistant",
provider: "openclaw",
model: "delivery-mirror",
text: "The slow source reply wins.",
},
},
});
}, 300);
}, 0);
return { runId: "run-1" };
}
if (method === "agent.wait") {
return new Promise(() => undefined);
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
const consult = submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
await vi.advanceTimersByTimeAsync(251);
expect(submit).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(50);
await consult;
expect(submit).toHaveBeenCalledWith("call-1", {
result: "The slow source reply wins.",
});
} finally {
vi.useRealTimers();
}
});
it("keeps source-reply final text when the empty-final wait completes later", async () => {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
let resolveWait: ((value: { runId: string; status: "ok" }) => void) | undefined;
const waitResult = new Promise<{ runId: string; status: "ok" }>((resolve) => {
resolveWait = resolve;
});
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
setImmediate(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
setImmediate(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: {
role: "assistant",
provider: "openclaw",
model: "delivery-mirror",
text: "The source reply still wins.",
},
},
});
});
});
return { runId: "run-1" };
}
if (method === "agent.wait") {
return await waitResult;
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
await submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
resolveWait?.({ runId: "run-1", status: "ok" });
await Promise.resolve();
expect(request).toHaveBeenCalledWith("agent.wait", {
runId: "run-1",
timeoutMs: 120_000,
});
expect(submit).toHaveBeenCalledTimes(1);
expect(submit).toHaveBeenCalledWith("call-1", {
result: "The source reply still wins.",
});
});
it("keeps source-reply final text when the empty-final wait completes first", async () => {
vi.useFakeTimers();
try {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
window.setTimeout(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
window.setTimeout(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: {
role: "assistant",
provider: "openclaw",
model: "delivery-mirror",
text: "The source reply beats the fallback.",
},
},
});
}, 300);
}, 0);
return { runId: "run-1" };
}
if (method === "agent.wait") {
return { runId: "run-1", status: "ok" };
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
const consult = submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
await vi.advanceTimersByTimeAsync(300);
await consult;
expect(submit).toHaveBeenCalledTimes(1);
expect(submit).toHaveBeenCalledWith("call-1", {
result: "The source reply beats the fallback.",
});
} finally {
vi.useRealTimers();
}
});
it("submits the no-text fallback after an empty final and completed Gateway run", async () => {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
setImmediate(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
});
return { runId: "run-1" };
}
if (method === "agent.wait") {
return { runId: "run-1", status: "ok" };
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
await submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
expect(request).toHaveBeenCalledWith("agent.wait", {
runId: "run-1",
timeoutMs: 120_000,
});
expect(submit).toHaveBeenCalledWith("call-1", {
result: "OpenClaw finished with no text.",
});
});
it.each([
{
waitResult: { runId: "run-1", status: "error", error: "provider authentication failed" },
expected: "provider authentication failed",
},
{
waitResult: {
runId: "run-1",
status: "timeout",
stopReason: "rpc",
error: "aborted by operator",
},
expected: "aborted by operator",
},
{
waitResult: {
runId: "run-1",
status: "timeout",
error: "preflight setup timed out",
timeoutPhase: "preflight",
},
expected: "preflight setup timed out",
},
{
waitResult: {
runId: "run-1",
status: "timeout",
error: "post-turn cleanup timed out",
timeoutPhase: "post_turn",
},
expected: "post-turn cleanup timed out",
},
{
waitResult: {
runId: "run-1",
status: "timeout",
stopReason: "timeout",
error: "agent runtime timeout",
},
expected: "agent runtime timeout",
},
])("submits $expected from terminal empty-final waits", async ({ waitResult, expected }) => {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {
if (method === "talk.client.toolCall") {
setImmediate(() => {
listener?.({
event: "chat",
payload: {
runId: "run-1",
state: "final",
message: undefined,
},
});
});
return { runId: "run-1" };
}
if (method === "agent.wait") {
return waitResult;
}
throw new Error(`unexpected request: ${method}`);
});
const addEventListener = vi.fn((callback: typeof listener) => {
listener = callback;
return () => {
listener = undefined;
};
});
const submit = vi.fn();
await submitRealtimeTalkConsult({
ctx: {
client: { request, addEventListener },
sessionKey: "agent:main:main",
callbacks: {},
} as never,
callId: "call-1",
args: { question: "Check status" },
submit,
});
expect(submit).toHaveBeenCalledWith("call-1", { error: expected });
});
it("emits Talk progress from chat tool events while waiting for the consult result", async () => {
let listener: ((event: { event: string; payload?: unknown }) => void) | undefined;
const request = vi.fn(async (method: string) => {