fix(btw): complete side-result delivery across clients

This commit is contained in:
Nimrod Gutman
2026-03-14 12:42:36 +02:00
parent 58cecae88e
commit 3e1737ec54
8 changed files with 170 additions and 9 deletions

View File

@@ -455,8 +455,9 @@ class ChatController(
val sid = root["sessionId"].asStringOrNull()
val thinkingLevel = root["thinkingLevel"].asStringOrNull()
val array = root["messages"].asArrayOrNull() ?: JsonArray(emptyList())
val sideResults = root["sideResults"].asArrayOrNull() ?: JsonArray(emptyList())
val messages =
val messages = (
array.mapNotNull { item ->
val obj = item.asObjectOrNull() ?: return@mapNotNull null
val role = obj["role"].asStringOrNull() ?: return@mapNotNull null
@@ -468,11 +469,26 @@ class ChatController(
content = content,
timestampMs = ts,
)
}
} + sideResults.mapNotNull(::parseSideResultMessage)
).sortedBy { it.timestampMs ?: Long.MAX_VALUE }
return ChatHistory(sessionKey = sessionKey, sessionId = sid, thinkingLevel = thinkingLevel, messages = messages)
}
private fun parseSideResultMessage(el: JsonElement): ChatMessage? {
val obj = el.asObjectOrNull() ?: return null
val text = obj["text"].asStringOrNull()?.trim().orEmpty()
if (text.isEmpty()) return null
val question = obj["question"].asStringOrNull()?.trim().orEmpty()
val label = if (question.isEmpty()) "BTW" else "BTW: $question"
return ChatMessage(
id = UUID.randomUUID().toString(),
role = "assistant",
content = listOf(ChatMessageContent(type = "text", text = "$label\n\n$text")),
timestampMs = obj["ts"].asLongOrNull(),
)
}
private fun parseMessageContent(el: JsonElement): ChatMessageContent? {
val obj = el.asObjectOrNull() ?: return null
val type = obj["type"].asStringOrNull() ?: "text"

View File

@@ -232,7 +232,30 @@ public struct OpenClawChatHistoryPayload: Codable, Sendable {
public let sessionKey: String
public let sessionId: String?
public let messages: [AnyCodable]?
public let sideResults: [OpenClawChatSideResult]?
public let thinkingLevel: String?
public init(
sessionKey: String,
sessionId: String?,
messages: [AnyCodable]?,
sideResults: [OpenClawChatSideResult]? = nil,
thinkingLevel: String?)
{
self.sessionKey = sessionKey
self.sessionId = sessionId
self.messages = messages
self.sideResults = sideResults
self.thinkingLevel = thinkingLevel
}
}
public struct OpenClawChatSideResult: Codable, Sendable {
public let kind: String?
public let question: String?
public let text: String?
public let ts: Double?
public let isError: Bool?
}
public struct OpenClawSessionPreviewItem: Codable, Hashable, Sendable {

View File

@@ -232,7 +232,9 @@ public final class OpenClawChatViewModel {
let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey)
self.messages = Self.reconcileMessageIDs(
previous: self.messages,
incoming: Self.decodeMessages(payload.messages ?? []))
incoming: Self.decodeHistoryMessages(
messages: payload.messages ?? [],
sideResults: payload.sideResults ?? []))
self.sessionId = payload.sessionId
if !self.prefersExplicitThinkingLevel,
let level = Self.normalizedThinkingLevel(payload.thinkingLevel)
@@ -257,6 +259,43 @@ public final class OpenClawChatViewModel {
return Self.dedupeMessages(decoded)
}
private static func decodeHistoryMessages(
messages rawMessages: [AnyCodable],
sideResults: [OpenClawChatSideResult]) -> [OpenClawChatMessage]
{
let decodedMessages = Self.decodeMessages(rawMessages)
let decodedSideResults = sideResults.compactMap(Self.decodeSideResultMessage)
return Self.dedupeMessages(decodedMessages + decodedSideResults)
}
private static func decodeSideResultMessage(_ sideResult: OpenClawChatSideResult) -> OpenClawChatMessage? {
let text = (sideResult.text ?? "").trimmingCharacters(in: .whitespacesAndNewlines)
guard !text.isEmpty else { return nil }
let question = (sideResult.question ?? "").trimmingCharacters(in: .whitespacesAndNewlines)
let label = question.isEmpty ? "BTW" : "BTW: \(question)"
let body = "\(label)\n\n\(text)"
return OpenClawChatMessage(
role: "assistant",
content: [
OpenClawChatMessageContent(
type: "text",
text: body,
thinking: nil,
thinkingSignature: nil,
mimeType: nil,
fileName: nil,
content: nil,
id: nil,
name: nil,
arguments: nil),
],
timestamp: sideResult.ts,
toolCallId: nil,
toolName: nil,
usage: nil,
stopReason: nil)
}
private static func stripInboundMetadata(from message: OpenClawChatMessage) -> OpenClawChatMessage {
guard message.role.lowercased() == "user" else {
return message
@@ -921,7 +960,9 @@ public final class OpenClawChatViewModel {
let payload = try await self.transport.requestHistory(sessionKey: self.sessionKey)
self.messages = Self.reconcileMessageIDs(
previous: self.messages,
incoming: Self.decodeMessages(payload.messages ?? []))
incoming: Self.decodeHistoryMessages(
messages: payload.messages ?? [],
sideResults: payload.sideResults ?? []))
self.sessionId = payload.sessionId
if !self.prefersExplicitThinkingLevel,
let level = Self.normalizedThinkingLevel(payload.thinkingLevel)

View File

@@ -14,12 +14,14 @@ private func chatTextMessage(role: String, text: String, timestamp: Double) -> A
private func historyPayload(
sessionKey: String = "main",
sessionId: String? = "sess-main",
messages: [AnyCodable] = []) -> OpenClawChatHistoryPayload
messages: [AnyCodable] = [],
sideResults: [OpenClawChatSideResult] = []) -> OpenClawChatHistoryPayload
{
OpenClawChatHistoryPayload(
sessionKey: sessionKey,
sessionId: sessionId,
messages: messages,
sideResults: sideResults,
thinkingLevel: "off")
}
@@ -1226,7 +1228,7 @@ extension TestChatTransportState {
#expect(await MainActor.run { callbackState.values } == ["medium"])
}
@Test func serverProvidedThinkingLevelsOutsideMenuArePreservedForSend() async throws {
@Test func serverProvidedThinkingLevelsOutsideMenuArePreservedForSend() async throws {
let history = OpenClawChatHistoryPayload(
sessionKey: "main",
sessionId: "sess-main",
@@ -1244,6 +1246,28 @@ extension TestChatTransportState {
}
}
@Test func mergesBTWSideResultsIntoHistoryRefresh() async throws {
let history = historyPayload(
messages: [chatTextMessage(role: "user", text: "main task", timestamp: 1000)],
sideResults: [
OpenClawChatSideResult(
kind: "btw",
question: "what is 17 * 19?",
text: "323",
ts: 2000,
isError: false),
])
let (_, vm) = await makeViewModel(historyResponses: [history])
try await loadAndWaitBootstrap(vm: vm, sessionId: "sess-main")
let messages = await MainActor.run { vm.messages }
#expect(messages.count == 2)
#expect(messages.last?.role == "assistant")
#expect(messages.last?.content.first?.text == "BTW: what is 17 * 19?\n\n323")
}
@Test func staleThinkingPatchCompletionReappliesLatestSelection() async throws {
let history = OpenClawChatHistoryPayload(
sessionKey: "main",

View File

@@ -4,7 +4,10 @@ import {
sendPayloadMediaSequence,
} from "../../../src/channels/plugins/outbound/direct-text-media.js";
import type { ChannelOutboundAdapter } from "../../../src/channels/plugins/types.js";
import type { OutboundSendDeps } from "../../../src/infra/outbound/deliver.js";
import {
resolveOutboundSendDep,
type OutboundSendDeps,
} from "../../../src/infra/outbound/deliver.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { markdownToTelegramHtmlChunks } from "./format.js";
import { parseTelegramReplyToMessageId, parseTelegramThreadId } from "./outbound-params.js";
@@ -30,7 +33,8 @@ function resolveTelegramSendContext(params: {
accountId?: string;
};
} {
const send = params.deps?.sendTelegram ?? sendMessageTelegram;
const send =
resolveOutboundSendDep<TelegramSendFn>(params.deps, "telegram") ?? sendMessageTelegram;
return {
send,
baseOpts: {

View File

@@ -389,6 +389,10 @@ export class AcpGatewayAgent implements Agent {
await this.handleChatEvent(evt);
return;
}
if (evt.event === "chat.side_result") {
await this.handleSideResultEvent(evt);
return;
}
if (evt.event === "agent") {
await this.handleAgentEvent(evt);
}
@@ -830,6 +834,36 @@ export class AcpGatewayAgent implements Agent {
}
}
private async handleSideResultEvent(evt: EventFrame): Promise<void> {
const payload = (evt.payload ?? {}) as Record<string, unknown>;
const sessionKey = payload.sessionKey as string | undefined;
const runId = payload.runId as string | undefined;
const text = payload.text as string | undefined;
if (!sessionKey) {
return;
}
const pending = this.findPendingBySessionKey(sessionKey, runId);
if (!pending) {
return;
}
const trimmed = text?.trim() ?? "";
if (trimmed.length > 0) {
await this.connection.sessionUpdate({
sessionId: pending.sessionId,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: trimmed },
},
});
pending.sentTextLength = trimmed.length;
pending.sentText = trimmed;
}
await this.finishPrompt(pending.sessionId, pending, "end_turn");
}
private async handleDeltaEvent(
sessionId: string,
messageData: Record<string, unknown>,

View File

@@ -937,7 +937,6 @@ function broadcastSideResult(params: {
...params.payload,
seq,
});
params.context.agentRunSeq.delete(params.payload.runId);
}
function broadcastChatError(params: {
@@ -1390,6 +1389,11 @@ export const chatHandlers: GatewayRequestHandlers = {
ts: Date.now(),
},
});
broadcastChatFinal({
context,
runId: clientRunId,
sessionKey: rawSessionKey,
});
} else {
const combinedReply = finalReplies
.map((part) => part.text?.trim() ?? "")

View File

@@ -514,6 +514,15 @@ describe("gateway server chat", () => {
o.payload?.runId === "idem-btw-1",
8000,
);
const finalPromise = onceMessage(
ws,
(o) =>
o.type === "event" &&
o.event === "chat" &&
o.payload?.state === "final" &&
o.payload?.runId === "idem-btw-1",
8000,
);
const res = await rpcReq(ws, "chat.send", {
sessionKey: "main",
@@ -523,6 +532,7 @@ describe("gateway server chat", () => {
expect(res.ok).toBe(true);
const sideResult = await sideResultPromise;
const finalEvent = await finalPromise;
expect(sideResult.payload).toMatchObject({
kind: "btw",
runId: "idem-btw-1",
@@ -530,6 +540,11 @@ describe("gateway server chat", () => {
question: "what is 17 * 19?",
text: "323",
});
expect(finalEvent.payload).toMatchObject({
runId: "idem-btw-1",
sessionKey: "main",
state: "final",
});
const historyRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
sessionKey: "main",