diff --git a/CHANGELOG.md b/CHANGELOG.md index 541d5e4cf5b..4599dbc5c00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,7 @@ Docs: https://docs.openclaw.ai - Gateway/Security canonicalization hardening: decode plugin route path variants to canonical fixpoint (with bounded depth), fail closed on canonicalization anomalies, and enforce gateway auth for deeply encoded `/api/channels/*` variants to prevent alternate-path auth bypass through plugin handlers. Thanks @tdjackey for reporting. - Browser/Gateway hardening: preserve env credentials for `OPENCLAW_GATEWAY_URL` / `CLAWDBOT_GATEWAY_URL` while treating explicit `--url` as override-only auth, and make container browser hardening flags optional with safer defaults for Docker/LXC stability. (#31504) Thanks @vincentkoc. - Gateway/Control UI basePath webhook passthrough: let non-read methods under configured `controlUiBasePath` fall through to plugin routes (instead of returning Control UI 405), restoring webhook handlers behind basePath mounts. (#32311) Thanks @ademczuk. +- Gateway/Webchat streaming finalization: flush throttled trailing assistant text before `final` chat events so streaming consumers do not miss tail content, while preserving duplicate suppression and heartbeat/silent lead-fragment guards. (#24856) Thanks @visionik and @vincentkoc. - Control UI/Legacy browser compatibility: replace `toSorted`-dependent cron suggestion sorting in `app-render` with a compatibility helper so older browsers without `Array.prototype.toSorted` no longer white-screen. (#31775) Thanks @liuxiaopai-ai. - macOS/PeekabooBridge: add compatibility socket symlinks for legacy `clawdbot`, `clawdis`, and `moltbot` Application Support socket paths so pre-rename clients can still connect. (#6033) Thanks @lumpinif and @vincentkoc. - Gateway/message tool reliability: avoid false `Unknown channel` failures when `message.*` actions receive platform-specific channel ids by falling back to `toolContext.currentChannelProvider`, and prevent health-monitor restart thrash for channels that just (re)started by adding a per-channel startup-connect grace window. (from #32367) Thanks @MunemHashmi. diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 3c5d5a67f6f..bfda498f5e3 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1295,7 +1295,11 @@ export async function runEmbeddedPiAgent( aborted, systemPromptReport: attempt.systemPromptReport, // Handle client tool calls (OpenResponses hosted tools) - stopReason: attempt.clientToolCall ? "tool_calls" : undefined, + // Propagate the LLM stop reason so callers (lifecycle events, + // ACP bridge) can distinguish end_turn from max_tokens. + stopReason: attempt.clientToolCall + ? "tool_calls" + : (lastAssistant?.stopReason as string | undefined), pendingToolCalls: attempt.clientToolCall ? [ { diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 1f58c5e39f4..32fbd3b2adc 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -873,6 +873,10 @@ async function agentCommandInternal( fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; if (!lifecycleEnded) { + const stopReason = result.meta.stopReason; + if (stopReason && stopReason !== "end_turn") { + console.error(`[agent] run ${runId} ended with stopReason=${stopReason}`); + } emitAgentEvent({ runId, stream: "lifecycle", @@ -881,6 +885,7 @@ async function agentCommandInternal( startedAt, endedAt: Date.now(), aborted: result.meta.aborted ?? false, + stopReason, }, }); } diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index e02ed25eb42..726e061be42 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -266,6 +266,89 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("flushes buffered text as delta before final when throttle suppresses the latest chunk", () => { + let now = 10_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-flush", { + sessionKey: "session-flush", + clientRunId: "client-flush", + }); + + handler({ + runId: "run-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello" }, + }); + + now = 10_100; + handler({ + runId: "run-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello world" }, + }); + + emitLifecycleEnd(handler, "run-flush"); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + const firstPayload = chatCalls[0]?.[1] as { state?: string }; + const secondPayload = chatCalls[1]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + const thirdPayload = chatCalls[2]?.[1] as { state?: string }; + expect(firstPayload.state).toBe("delta"); + expect(secondPayload.state).toBe("delta"); + expect(secondPayload.message?.content?.[0]?.text).toBe("Hello world"); + expect(thirdPayload.state).toBe("final"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + + it("does not flush an extra delta when the latest text already broadcast", () => { + let now = 11_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { broadcast, nodeSendToSession, chatRunState, handler } = createHarness(); + chatRunState.registry.add("run-no-dup-flush", { + sessionKey: "session-no-dup-flush", + clientRunId: "client-no-dup-flush", + }); + + handler({ + runId: "run-no-dup-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello" }, + }); + + now = 11_200; + handler({ + runId: "run-no-dup-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Hello world" }, + }); + + emitLifecycleEnd(handler, "run-no-dup-flush"); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(3); + expect(chatCalls.map(([, payload]) => (payload as { state?: string }).state)).toEqual([ + "delta", + "delta", + "final", + ]); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(3); + nowSpy.mockRestore(); + }); + it("cleans up agent run sequence tracking when lifecycle completes", () => { const { agentRunSeq, chatRunState, handler, nowSpy } = createHarness({ now: 2_500 }); chatRunState.registry.add("run-cleanup", { diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index d54d0a99eeb..00dc2a9d359 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -158,6 +158,8 @@ export type ChatRunState = { registry: ChatRunRegistry; buffers: Map; deltaSentAt: Map; + /** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */ + deltaLastBroadcastLen: Map; abortedRuns: Map; clear: () => void; }; @@ -166,12 +168,14 @@ export function createChatRunState(): ChatRunState { const registry = createChatRunRegistry(); const buffers = new Map(); const deltaSentAt = new Map(); + const deltaLastBroadcastLen = new Map(); const abortedRuns = new Map(); const clear = () => { registry.clear(); buffers.clear(); deltaSentAt.clear(); + deltaLastBroadcastLen.clear(); abortedRuns.clear(); }; @@ -179,6 +183,7 @@ export function createChatRunState(): ChatRunState { registry, buffers, deltaSentAt, + deltaLastBroadcastLen, abortedRuns, clear, }; @@ -318,6 +323,7 @@ export function createAgentEventHandler({ return; } chatRunState.deltaSentAt.set(clientRunId, now); + chatRunState.deltaLastBroadcastLen.set(clientRunId, cleaned.length); const payload = { runId: clientRunId, sessionKey, @@ -352,6 +358,39 @@ export function createAgentEventHandler({ const text = normalizedHeartbeatText.text.trim(); const shouldSuppressSilent = normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN); + const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text); + const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput( + clientRunId, + sourceRunId, + ); + // Flush any throttled delta so streaming clients receive the complete text + // before the final event. The 150 ms throttle in emitChatDelta may have + // suppressed the most recent chunk, leaving the client with stale text. + // Only flush if the buffer has grown since the last broadcast to avoid duplicates. + if ( + text && + !shouldSuppressSilent && + !shouldSuppressSilentLeadFragment && + !shouldSuppressHeartbeatStreaming + ) { + const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0; + if (text.length > lastBroadcastLen) { + const flushPayload = { + runId: clientRunId, + sessionKey, + seq, + state: "delta" as const, + message: { + role: "assistant", + content: [{ type: "text", text }], + timestamp: Date.now(), + }, + }; + broadcast("chat", flushPayload, { dropIfSlow: true }); + nodeSendToSession(sessionKey, "chat", flushPayload); + } + } + chatRunState.deltaLastBroadcastLen.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); if (jobState === "done") {