fix(gateway): flush throttled delta before emitChatFinal (#24856)

* fix(gateway): flush throttled delta before emitChatFinal

The 150ms throttle in emitChatDelta can suppress the last text chunk
before emitChatFinal fires, causing streaming clients (e.g. ACP) to
receive truncated responses. The final event carries the complete text,
but clients that build responses incrementally from deltas miss the
tail end.

Flush one last unthrottled delta with the complete buffered text
immediately before sending the final event. This ensures all streaming
consumers have the full response without needing to reconcile deltas
against the final payload.

* fix(gateway): avoid duplicate delta flush when buffer unchanged

Track the text length at the time of the last broadcast. The flush in
emitChatFinal now only sends a delta if the buffer has grown since the
last broadcast, preventing duplicate sends when the final delta passed
the 150ms throttle and was already broadcast.

* fix(gateway): honor heartbeat suppression in final delta flush

* test(gateway): add final delta flush and dedupe coverage

* fix(gateway): skip final flush for silent lead fragments

* docs(changelog): note gateway final-delta flush fix credits

---------

Co-authored-by: Jonathan Taylor <visionik@pobox.com>
Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
This commit is contained in:
Viz
2026-03-02 23:45:46 -05:00
committed by GitHub
parent 0566845b71
commit a9ec75fe81
5 changed files with 133 additions and 1 deletions

View File

@@ -77,6 +77,7 @@ Docs: https://docs.openclaw.ai
- Gateway/Security canonicalization hardening: decode plugin route path variants to canonical fixpoint (with bounded depth), fail closed on canonicalization anomalies, and enforce gateway auth for deeply encoded `/api/channels/*` variants to prevent alternate-path auth bypass through plugin handlers. Thanks @tdjackey for reporting.
- Browser/Gateway hardening: preserve env credentials for `OPENCLAW_GATEWAY_URL` / `CLAWDBOT_GATEWAY_URL` while treating explicit `--url` as override-only auth, and make container browser hardening flags optional with safer defaults for Docker/LXC stability. (#31504) Thanks @vincentkoc.
- Gateway/Control UI basePath webhook passthrough: let non-read methods under configured `controlUiBasePath` fall through to plugin routes (instead of returning Control UI 405), restoring webhook handlers behind basePath mounts. (#32311) Thanks @ademczuk.
- Gateway/Webchat streaming finalization: flush throttled trailing assistant text before `final` chat events so streaming consumers do not miss tail content, while preserving duplicate suppression and heartbeat/silent lead-fragment guards. (#24856) Thanks @visionik and @vincentkoc.
- Control UI/Legacy browser compatibility: replace `toSorted`-dependent cron suggestion sorting in `app-render` with a compatibility helper so older browsers without `Array.prototype.toSorted` no longer white-screen. (#31775) Thanks @liuxiaopai-ai.
- macOS/PeekabooBridge: add compatibility socket symlinks for legacy `clawdbot`, `clawdis`, and `moltbot` Application Support socket paths so pre-rename clients can still connect. (#6033) Thanks @lumpinif and @vincentkoc.
- Gateway/message tool reliability: avoid false `Unknown channel` failures when `message.*` actions receive platform-specific channel ids by falling back to `toolContext.currentChannelProvider`, and prevent health-monitor restart thrash for channels that just (re)started by adding a per-channel startup-connect grace window. (from #32367) Thanks @MunemHashmi.

View File

@@ -1295,7 +1295,11 @@ export async function runEmbeddedPiAgent(
aborted,
systemPromptReport: attempt.systemPromptReport,
// Handle client tool calls (OpenResponses hosted tools)
stopReason: attempt.clientToolCall ? "tool_calls" : undefined,
// Propagate the LLM stop reason so callers (lifecycle events,
// ACP bridge) can distinguish end_turn from max_tokens.
stopReason: attempt.clientToolCall
? "tool_calls"
: (lastAssistant?.stopReason as string | undefined),
pendingToolCalls: attempt.clientToolCall
? [
{

View File

@@ -873,6 +873,10 @@ async function agentCommandInternal(
fallbackProvider = fallbackResult.provider;
fallbackModel = fallbackResult.model;
if (!lifecycleEnded) {
const stopReason = result.meta.stopReason;
if (stopReason && stopReason !== "end_turn") {
console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`);
}
emitAgentEvent({
runId,
stream: "lifecycle",
@@ -881,6 +885,7 @@ async function agentCommandInternal(
startedAt,
endedAt: Date.now(),
aborted: result.meta.aborted ?? false,
stopReason,
},
});
}

View File

@@ -266,6 +266,89 @@ describe("agent event handler", () => {
nowSpy?.mockRestore();
});
it("flushes buffered text as delta before final when throttle suppresses the latest chunk", () => {
let now = 10_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
chatRunState.registry.add("run-flush", {
sessionKey: "session-flush",
clientRunId: "client-flush",
});
handler({
runId: "run-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Hello" },
});
now = 10_100;
handler({
runId: "run-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Hello world" },
});
emitLifecycleEnd(handler, "run-flush");
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls).toHaveLength(3);
const firstPayload = chatCalls[0]?.[1] as { state?: string };
const secondPayload = chatCalls[1]?.[1] as {
state?: string;
message?: { content?: Array<{ text?: string }> };
};
const thirdPayload = chatCalls[2]?.[1] as { state?: string };
expect(firstPayload.state).toBe("delta");
expect(secondPayload.state).toBe("delta");
expect(secondPayload.message?.content?.[0]?.text).toBe("Hello world");
expect(thirdPayload.state).toBe("final");
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
nowSpy.mockRestore();
});
it("does not flush an extra delta when the latest text already broadcast", () => {
let now = 11_000;
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now);
const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness();
chatRunState.registry.add("run-no-dup-flush", {
sessionKey: "session-no-dup-flush",
clientRunId: "client-no-dup-flush",
});
handler({
runId: "run-no-dup-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Hello" },
});
now = 11_200;
handler({
runId: "run-no-dup-flush",
seq: 1,
stream: "assistant",
ts: Date.now(),
data: { text: "Hello world" },
});
emitLifecycleEnd(handler, "run-no-dup-flush");
const chatCalls = chatBroadcastCalls(broadcast);
expect(chatCalls).toHaveLength(3);
expect(chatCalls.map(([, payload]) => (payload as { state?: string }).state)).toEqual([
"delta",
"delta",
"final",
]);
expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3);
nowSpy.mockRestore();
});
it("cleans up agent run sequence tracking when lifecycle completes", () => {
const { agentRunSeq, chatRunState, handler, nowSpy } = createHarness({ now: 2_500 });
chatRunState.registry.add("run-cleanup", {

View File

@@ -158,6 +158,8 @@ export type ChatRunState = {
registry: ChatRunRegistry;
buffers: Map<string, string>;
deltaSentAt: Map<string, number>;
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
deltaLastBroadcastLen: Map<string, number>;
abortedRuns: Map<string, number>;
clear: () => void;
};
@@ -166,12 +168,14 @@ export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const deltaLastBroadcastLen = new Map<string, number>();
const abortedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
buffers.clear();
deltaSentAt.clear();
deltaLastBroadcastLen.clear();
abortedRuns.clear();
};
@@ -179,6 +183,7 @@ export function createChatRunState(): ChatRunState {
registry,
buffers,
deltaSentAt,
deltaLastBroadcastLen,
abortedRuns,
clear,
};
@@ -318,6 +323,7 @@ export function createAgentEventHandler({
return;
}
chatRunState.deltaSentAt.set(clientRunId, now);
chatRunState.deltaLastBroadcastLen.set(clientRunId, cleaned.length);
const payload = {
runId: clientRunId,
sessionKey,
@@ -352,6 +358,39 @@ export function createAgentEventHandler({
const text = normalizedHeartbeatText.text.trim();
const shouldSuppressSilent =
normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN);
const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text);
const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput(
clientRunId,
sourceRunId,
);
// Flush any throttled delta so streaming clients receive the complete text
// before the final event. The 150 ms throttle in emitChatDelta may have
// suppressed the most recent chunk, leaving the client with stale text.
// Only flush if the buffer has grown since the last broadcast to avoid duplicates.
if (
text &&
!shouldSuppressSilent &&
!shouldSuppressSilentLeadFragment &&
!shouldSuppressHeartbeatStreaming
) {
const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0;
if (text.length > lastBroadcastLen) {
const flushPayload = {
runId: clientRunId,
sessionKey,
seq,
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
},
};
broadcast("chat", flushPayload, { dropIfSlow: true });
nodeSendToSession(sessionKey, "chat", flushPayload);
}
}
chatRunState.deltaLastBroadcastLen.delete(clientRunId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
if (jobState === "done") {