From e71d10fd4d744516b9e98e5e68bb35d0f0cf8e17 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 16 May 2026 13:58:44 +0100 Subject: [PATCH] fix(webchat): show manual compaction progress Add first-class session.operation start/end events for manual compaction and render the existing WebChat compaction indicator from those events. Co-authored-by: Conan Scott <271909525+Conan-Scott@users.noreply.github.com> --- CHANGELOG.md | 1 + .../OpenClawProtocol/GatewayModels.swift | 38 ++++++ docs/gateway/index.md | 5 +- docs/gateway/protocol.md | 5 +- src/gateway/protocol/index.ts | 2 + .../protocol/schema/protocol-schemas.ts | 2 + src/gateway/protocol/schema/sessions.ts | 13 ++ src/gateway/protocol/schema/types.ts | 1 + src/gateway/server-broadcast.ts | 1 + src/gateway/server-methods-list.ts | 1 + src/gateway/server-methods/sessions.ts | 82 +++++++++--- .../server.sessions.compaction.test.ts | 46 +++++++ ui/src/ui/app-gateway.sessions.node.test.ts | 24 ++++ ui/src/ui/app-gateway.ts | 16 ++- ui/src/ui/app-tool-stream.node.test.ts | 114 +++++++++++++++- ui/src/ui/app-tool-stream.ts | 122 +++++++++++++++++- 16 files changed, 449 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 517fcbbbe93..2d04ea9289f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Gateway/WebChat: route image attachments through a configured vision-capable `imageModel` plan before inlining images, and carry that image-model fallback chain through runtime retries. (#82524) Thanks @frankekn. +- WebChat: show progress while manual `/compact` is running by streaming a session operation event to subscribed Control UI clients. Fixes #82407. Thanks @Conan-Scott. - Codex app-server: limit canonical OpenAI Codex app-server attribution rewrites to local transcript and trajectory records, leaving runtime/tool routing on the selected OpenAI model metadata so OpenAI API-key backup profiles keep their billing path. - Android/chat: make bare and markdown URLs in chat messages tappable by preserving Compose URL annotations in rendered markdown. Fixes #82187. (#82392) Thanks @neeravmakwana. - Plugins/doctor: migrate legacy top-level plugin `tools` declarations into `contracts.tools`, so `openclaw doctor --fix` repairs local plugins for the manifest tool contract. (#81112) Thanks @100yenadmin. diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 6ad90c3e1f0..92539810dc0 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -1788,6 +1788,44 @@ public struct SessionCompactionCheckpoint: Codable, Sendable { } } +public struct SessionOperationEvent: Codable, Sendable { + public let operationid: String + public let operation: String + public let phase: AnyCodable + public let sessionkey: String + public let ts: Int + public let completed: Bool? + public let reason: String? + + public init( + operationid: String, + operation: String, + phase: AnyCodable, + sessionkey: String, + ts: Int, + completed: Bool?, + reason: String?) + { + self.operationid = operationid + self.operation = operation + self.phase = phase + self.sessionkey = sessionkey + self.ts = ts + self.completed = completed + self.reason = reason + } + + private enum CodingKeys: String, CodingKey { + case operationid = "operationId" + case operation + case phase + case sessionkey = "sessionKey" + case ts + case completed + case reason + } +} + public struct SessionsCompactionListParams: Codable, Sendable { public let key: String diff --git a/docs/gateway/index.md b/docs/gateway/index.md index d84bc1d0231..b57bc966d3d 100644 --- a/docs/gateway/index.md +++ b/docs/gateway/index.md @@ -317,8 +317,9 @@ Defaults include isolated state/config and base gateway port `19001`. a generated dump of every callable helper route. - Requests: `req(method, params)` → `res(ok/payload|error)`. - Common events include `connect.challenge`, `agent`, `chat`, - `session.message`, `session.tool`, `sessions.changed`, `presence`, `tick`, - `health`, `heartbeat`, pairing/approval lifecycle events, and `shutdown`. + `session.message`, `session.operation`, `session.tool`, `sessions.changed`, + `presence`, `tick`, `health`, `heartbeat`, pairing/approval lifecycle events, + and `shutdown`. Agent runs are two-stage: diff --git a/docs/gateway/protocol.md b/docs/gateway/protocol.md index 4c031d3f12f..0c146fec533 100644 --- a/docs/gateway/protocol.md +++ b/docs/gateway/protocol.md @@ -473,8 +473,9 @@ enumeration of `src/gateway/server-methods/*.ts`. events. In protocol v4, delta payloads carry `deltaText`; `message` remains the cumulative assistant snapshot. Non-prefix replacements set `replace=true` and use `deltaText` as the replacement text. -- `session.message` and `session.tool`: transcript/event-stream updates for a - subscribed session. +- `session.message`, `session.operation`, and `session.tool`: transcript, + in-flight session operation, and event-stream updates for a subscribed + session. - `sessions.changed`: session index or metadata changed. - `presence`: system presence snapshot updates. - `tick`: periodic keepalive / liveness event. diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 64f0b77fa41..57e296d61a7 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -310,6 +310,7 @@ import { SessionsCompactionListParamsSchema, type SessionsCompactionRestoreParams, SessionsCompactionRestoreParamsSchema, + type SessionOperationEvent, type SessionsCreateParams, SessionsCreateParamsSchema, type SessionsDeleteParams, @@ -1177,6 +1178,7 @@ export type { SessionsPreviewParams, SessionsDescribeParams, SessionsResolveParams, + SessionOperationEvent, SessionsPatchParams, SessionsPatchResult, SessionsResetParams, diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 501adf2414c..ef77729b222 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -233,6 +233,7 @@ import { SessionsCompactionRestoreParamsSchema, SessionsCompactionRestoreResultSchema, SessionCompactionCheckpointSchema, + SessionOperationEventSchema, SessionsCleanupParamsSchema, SessionsCreateParamsSchema, SessionsDeleteParamsSchema, @@ -329,6 +330,7 @@ export const ProtocolSchemas = { SessionsDescribeParams: SessionsDescribeParamsSchema, SessionsResolveParams: SessionsResolveParamsSchema, SessionCompactionCheckpoint: SessionCompactionCheckpointSchema, + SessionOperationEvent: SessionOperationEventSchema, SessionsCompactionListParams: SessionsCompactionListParamsSchema, SessionsCompactionGetParams: SessionsCompactionGetParamsSchema, SessionsCompactionBranchParams: SessionsCompactionBranchParamsSchema, diff --git a/src/gateway/protocol/schema/sessions.ts b/src/gateway/protocol/schema/sessions.ts index e3fc0cb8e31..de073bf5556 100644 --- a/src/gateway/protocol/schema/sessions.ts +++ b/src/gateway/protocol/schema/sessions.ts @@ -9,6 +9,19 @@ export const SessionCompactionCheckpointReasonSchema = Type.Union([ Type.Literal("timeout-retry"), ]); +export const SessionOperationEventSchema = Type.Object( + { + operationId: NonEmptyString, + operation: Type.Literal("compact"), + phase: Type.Union([Type.Literal("start"), Type.Literal("end")]), + sessionKey: NonEmptyString, + ts: Type.Integer({ minimum: 0 }), + completed: Type.Optional(Type.Boolean()), + reason: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + export const SessionCompactionTranscriptReferenceSchema = Type.Object( { sessionId: NonEmptyString, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 0f5df850f13..7691b1d4f33 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -55,6 +55,7 @@ export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">; export type SessionsDescribeParams = SchemaType<"SessionsDescribeParams">; export type SessionsResolveParams = SchemaType<"SessionsResolveParams">; export type SessionCompactionCheckpoint = SchemaType<"SessionCompactionCheckpoint">; +export type SessionOperationEvent = SchemaType<"SessionOperationEvent">; export type SessionsCompactionListParams = SchemaType<"SessionsCompactionListParams">; export type SessionsCompactionGetParams = SchemaType<"SessionsCompactionGetParams">; export type SessionsCompactionBranchParams = SchemaType<"SessionsCompactionBranchParams">; diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index bee0dd9cfad..15a428fe362 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -43,6 +43,7 @@ const EVENT_SCOPE_GUARDS: Record = { "node.pair.resolved": [PAIRING_SCOPE], "sessions.changed": [READ_SCOPE], "session.message": [READ_SCOPE], + "session.operation": [READ_SCOPE], "session.tool": [READ_SCOPE], }; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index d5493452347..5220f06a663 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -34,6 +34,7 @@ export const GATEWAY_EVENTS = [ "agent", "chat", "session.message", + "session.operation", "session.tool", "sessions.changed", "presence", diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 10e06b2304a..13b4dda78aa 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -56,6 +56,7 @@ import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, + type SessionOperationEvent, validateSessionsAbortParams, validateSessionsCleanupParams, validateSessionsCompactParams, @@ -329,6 +330,25 @@ function emitSessionsChanged( ); } +function emitSessionOperation( + context: Pick, + payload: Omit, +) { + const connIds = context.getSessionEventSubscriberConnIds(); + if (connIds.size === 0) { + return; + } + context.broadcastToConnIds( + "session.operation", + { + ...payload, + ts: Date.now(), + } satisfies SessionOperationEvent, + connIds, + { dropIfSlow: true }, + ); +} + function rejectWebchatSessionMutation(params: { action: "patch" | "delete" | "compact" | "restore"; client: GatewayClient | null; @@ -2142,24 +2162,52 @@ export const sessionsHandlers: GatewayRequestHandlers = { const workspaceDir = normalizeOptionalString(entry?.spawnedWorkspaceDir) || resolveAgentWorkspaceDir(cfg, target.agentId); - const result = await compactEmbeddedPiSession({ - sessionId, + const operationId = randomUUID(); + emitSessionOperation(context, { + operationId, + operation: "compact", + phase: "start", sessionKey: target.canonicalKey, - allowGatewaySubagentBinding: true, - sessionFile: filePath, - workspaceDir, - config: cfg, - provider: resolvedModel.provider, - model: resolvedModel.model, - agentHarnessId: entry?.sessionId === sessionId ? entry.agentHarnessId : undefined, - thinkLevel: normalizeThinkLevel(entry?.thinkingLevel), - reasoningLevel: normalizeReasoningLevel(entry?.reasoningLevel), - bashElevated: { - enabled: false, - allowed: false, - defaultLevel: "off", - }, - trigger: "manual", + }); + let result: Awaited>; + try { + result = await compactEmbeddedPiSession({ + sessionId, + sessionKey: target.canonicalKey, + allowGatewaySubagentBinding: true, + sessionFile: filePath, + workspaceDir, + config: cfg, + provider: resolvedModel.provider, + model: resolvedModel.model, + agentHarnessId: entry?.sessionId === sessionId ? entry.agentHarnessId : undefined, + thinkLevel: normalizeThinkLevel(entry?.thinkingLevel), + reasoningLevel: normalizeReasoningLevel(entry?.reasoningLevel), + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + trigger: "manual", + }); + } catch (err) { + emitSessionOperation(context, { + operationId, + operation: "compact", + phase: "end", + sessionKey: target.canonicalKey, + completed: false, + reason: formatErrorMessage(err), + }); + throw err; + } + emitSessionOperation(context, { + operationId, + operation: "compact", + phase: "end", + sessionKey: target.canonicalKey, + completed: result.ok && result.compacted, + reason: result.reason, }); if (result.ok && result.compacted) { diff --git a/src/gateway/server.sessions.compaction.test.ts b/src/gateway/server.sessions.compaction.test.ts index fdf537c21db..bb295dc786a 100644 --- a/src/gateway/server.sessions.compaction.test.ts +++ b/src/gateway/server.sessions.compaction.test.ts @@ -5,6 +5,7 @@ import { expect, test, vi } from "vitest"; import { withEnvAsync } from "../test-utils/env.js"; import { embeddedRunMock, + onceMessage, piSdkMock, rpcReq, startConnectedServerWithClient, @@ -249,6 +250,23 @@ test("sessions.compact without maxLines runs embedded manual compaction for chec }); const { ws } = await openClient(); + await rpcReq(ws, "sessions.subscribe", {}); + const startEventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.operation" && + (message.payload as { operation?: unknown; phase?: unknown })?.operation === "compact" && + (message.payload as { operation?: unknown; phase?: unknown })?.phase === "start", + ); + const endEventPromise = onceMessage( + ws, + (message) => + message.type === "event" && + message.event === "session.operation" && + (message.payload as { operation?: unknown; phase?: unknown })?.operation === "compact" && + (message.payload as { operation?: unknown; phase?: unknown })?.phase === "end", + ); const compacted = await rpcReq<{ ok: true; key: string; @@ -261,6 +279,34 @@ test("sessions.compact without maxLines runs embedded manual compaction for chec expect(compacted.ok).toBe(true); expect(compacted.payload?.key).toBe("agent:main:main"); expect(compacted.payload?.compacted).toBe(true); + const startEvent = await startEventPromise; + const endEvent = await endEventPromise; + const startPayload = startEvent.payload as { + operationId?: string; + sessionKey?: string; + ts?: number; + }; + const endPayload = endEvent.payload as { + operationId?: string; + sessionKey?: string; + completed?: boolean; + ts?: number; + }; + expect(startPayload).toMatchObject({ + operation: "compact", + phase: "start", + sessionKey: "agent:main:main", + }); + expect(endPayload).toMatchObject({ + operation: "compact", + phase: "end", + sessionKey: "agent:main:main", + completed: true, + }); + expect(startPayload.operationId).toBeTruthy(); + expect(endPayload.operationId).toBe(startPayload.operationId); + expect(typeof startPayload.ts).toBe("number"); + expect(typeof endPayload.ts).toBe("number"); expect(embeddedRunMock.compactEmbeddedPiSession).toHaveBeenCalledTimes(1); const compactionCall = embeddedRunMock.compactEmbeddedPiSession.mock.calls.at(0)?.[0] as | { diff --git a/ui/src/ui/app-gateway.sessions.node.test.ts b/ui/src/ui/app-gateway.sessions.node.test.ts index 9246359c794..3e4e6bd5867 100644 --- a/ui/src/ui/app-gateway.sessions.node.test.ts +++ b/ui/src/ui/app-gateway.sessions.node.test.ts @@ -5,6 +5,7 @@ const loadSessionsMock = vi.fn(); const loadChatHistoryMock = vi.fn(); const applySessionsChangedEventMock = vi.fn(); const handleChatEventMock = vi.fn(() => "idle"); +const handleSessionOperationEventMock = vi.fn(); vi.mock("./app-chat.ts", () => ({ CHAT_SESSIONS_ACTIVE_MINUTES: 10, @@ -21,6 +22,7 @@ vi.mock("./app-settings.ts", () => ({ })); vi.mock("./app-tool-stream.ts", () => ({ handleAgentEvent: vi.fn(), + handleSessionOperationEvent: handleSessionOperationEventMock, resetToolStream: vi.fn(), })); vi.mock("./controllers/agents.ts", () => ({ @@ -406,6 +408,28 @@ describe("handleGatewayEvent session.message", () => { }); }); +describe("handleGatewayEvent session.operation", () => { + it("routes session operation events to the tool stream state", () => { + handleSessionOperationEventMock.mockReset(); + const host = createHost(); + const payload = { + operationId: "operation-1", + operation: "compact", + phase: "start", + sessionKey: "agent:main:main", + }; + + handleGatewayEvent(host, { + type: "event", + event: "session.operation", + payload, + seq: 1, + }); + + expect(handleSessionOperationEventMock).toHaveBeenCalledWith(host, payload); + }); +}); + describe("addExecApproval", () => { it("keeps the newest approval at the front of the queue", () => { const queue = addExecApproval( diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index 3426eefbf31..eadf89d10e0 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -18,7 +18,13 @@ import { setLastActiveSessionKey, syncUrlWithSessionKey, } from "./app-settings.ts"; -import { handleAgentEvent, resetToolStream, type AgentEventPayload } from "./app-tool-stream.ts"; +import { + handleAgentEvent, + handleSessionOperationEvent, + resetToolStream, + type AgentEventPayload, + type SessionOperationEventPayload, +} from "./app-tool-stream.ts"; import { shouldReloadHistoryForFinalEvent } from "./chat-event-reload.ts"; import { reconcileChatRunLifecycle } from "./chat/run-lifecycle.ts"; import { parseChatSideResult, type ChatSideResult } from "./chat/side-result.ts"; @@ -842,6 +848,14 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) { return; } + if (evt.event === "session.operation") { + handleSessionOperationEvent( + host as unknown as Parameters[0], + evt.payload as SessionOperationEventPayload | undefined, + ); + return; + } + if (evt.event === "presence") { const payload = evt.payload as { presence?: PresenceEntry[] } | undefined; if (payload?.presence && Array.isArray(payload.presence)) { diff --git a/ui/src/ui/app-tool-stream.node.test.ts b/ui/src/ui/app-tool-stream.node.test.ts index c93b3c683c9..ccb3a147def 100644 --- a/ui/src/ui/app-tool-stream.node.test.ts +++ b/ui/src/ui/app-tool-stream.node.test.ts @@ -1,6 +1,11 @@ // @vitest-environment node import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { handleAgentEvent, type FallbackStatus, type ToolStreamEntry } from "./app-tool-stream.ts"; +import { + handleAgentEvent, + handleSessionOperationEvent, + type FallbackStatus, + type ToolStreamEntry, +} from "./app-tool-stream.ts"; type ToolStreamHost = Parameters[0]; type AgentEvent = NonNullable[1]>; @@ -353,6 +358,113 @@ describe("app-tool-stream fallback lifecycle handling", () => { vi.useRealTimers(); }); + it("shows manual session operation compaction progress while idle", () => { + useToolStreamFakeTimers(); + const host = createHost({ + sessionKey: "main", + hello: { + snapshot: { + sessionDefaults: { + defaultAgentId: "main", + mainKey: "main", + mainSessionKey: "agent:main:main", + }, + }, + }, + }); + + handleSessionOperationEvent(host, { + operationId: "operation-1", + operation: "compact", + phase: "start", + sessionKey: "agent:main:main", + ts: TOOL_STREAM_TEST_NOW, + }); + + expect(host.compactionStatus).toEqual({ + phase: "active", + runId: "operation-1", + startedAt: TOOL_STREAM_TEST_NOW, + completedAt: null, + }); + + handleSessionOperationEvent(host, { + operationId: "operation-1", + operation: "compact", + phase: "end", + sessionKey: "agent:main:main", + ts: TOOL_STREAM_TEST_NOW, + completed: true, + }); + + expect(host.compactionStatus).toEqual({ + phase: "complete", + runId: "operation-1", + startedAt: TOOL_STREAM_TEST_NOW, + completedAt: TOOL_STREAM_TEST_NOW, + }); + + vi.useRealTimers(); + }); + + it("ignores manual session operation compaction for other sessions", () => { + useToolStreamFakeTimers(); + const host = createHost({ sessionKey: "agent:main:main" }); + + handleSessionOperationEvent(host, { + operationId: "operation-1", + operation: "compact", + phase: "start", + sessionKey: "agent:other:main", + ts: TOOL_STREAM_TEST_NOW, + }); + + expect(host.compactionStatus).toBeNull(); + expect(host.compactionClearTimer).toBeNull(); + + vi.useRealTimers(); + }); + + it("ignores stale manual session operation completion after a newer start", () => { + useToolStreamFakeTimers(); + const host = createHost({ sessionKey: "agent:main:main" }); + + handleSessionOperationEvent(host, { + operationId: "operation-1", + operation: "compact", + phase: "start", + sessionKey: "agent:main:main", + ts: TOOL_STREAM_TEST_NOW, + }); + handleSessionOperationEvent(host, { + operationId: "operation-2", + operation: "compact", + phase: "start", + sessionKey: "agent:main:main", + ts: TOOL_STREAM_TEST_NOW, + }); + handleSessionOperationEvent(host, { + operationId: "operation-1", + operation: "compact", + phase: "end", + sessionKey: "agent:main:main", + ts: TOOL_STREAM_TEST_NOW, + completed: true, + }); + + expect(host.compactionStatus).toEqual({ + phase: "active", + runId: "operation-2", + startedAt: TOOL_STREAM_TEST_NOW, + completedAt: null, + }); + vi.advanceTimersByTime(5 * 60_000); + expect(host.compactionStatus).toBeNull(); + expect(host.compactionClearTimer).toBeNull(); + + vi.useRealTimers(); + }); + it("treats lifecycle error as terminal for retry-pending compaction", () => { useToolStreamFakeTimers(); const host = createHost(); diff --git a/ui/src/ui/app-tool-stream.ts b/ui/src/ui/app-tool-stream.ts index 59cce5baca0..e0e68c45003 100644 --- a/ui/src/ui/app-tool-stream.ts +++ b/ui/src/ui/app-tool-stream.ts @@ -1,6 +1,7 @@ import { createChatModelOverride } from "./chat-model-ref.ts"; import type { ChatModelOverride } from "./chat-model-ref.types.ts"; import { formatUnknownText, truncateText } from "./format.ts"; +import { buildAgentMainSessionKey, DEFAULT_AGENT_ID, DEFAULT_MAIN_KEY } from "./session-key.ts"; import { normalizeLowercaseStringOrEmpty } from "./string-coerce.ts"; const TOOL_STREAM_LIMIT = 50; @@ -16,6 +17,16 @@ export type AgentEventPayload = { data: Record; }; +export type SessionOperationEventPayload = { + operationId?: string; + operation?: string; + phase?: string; + sessionKey?: string; + ts?: number; + completed?: boolean; + reason?: string; +}; + export type ToolStreamEntry = { toolCallId: string; runId: string; @@ -30,6 +41,11 @@ export type ToolStreamEntry = { type ToolStreamHost = { sessionKey: string; + hello?: { + snapshot?: { + sessionDefaults?: SessionDefaultsSnapshot; + }; + } | null; chatRunId: string | null; chatStream: string | null; chatStreamStartedAt: number | null; @@ -41,6 +57,12 @@ type ToolStreamHost = { chatModelOverrides?: Record; }; +type SessionDefaultsSnapshot = { + defaultAgentId?: string; + mainKey?: string; + mainSessionKey?: string; +}; + function toTrimmedString(value: unknown): string | null { if (typeof value !== "string") { return null; @@ -219,6 +241,49 @@ function syncSessionStatusModelOverride(host: ToolStreamHost, data: Record normalizeLowercaseStringOrEmpty(entry)), + ); + const normalizedRaw = normalizeLowercaseStringOrEmpty(raw); + return aliases.has(normalizedRaw) + ? normalizeLowercaseStringOrEmpty(canonicalMain) + : normalizedRaw; +} + function buildToolStreamMessage(entry: ToolStreamEntry): Record { const content: Array> = []; content.push({ @@ -357,6 +422,57 @@ function setCompactionComplete(host: CompactionHost, runId: string) { scheduleCompactionClear(host, COMPACTION_TOAST_DURATION_MS, { phase: "complete", runId }); } +export function handleSessionOperationEvent( + host: ToolStreamHost, + payload?: SessionOperationEventPayload, +) { + if (!payload || payload.operation !== "compact") { + return; + } + const sessionKey = toTrimmedString(payload.sessionKey); + if ( + !sessionKey || + normalizeSessionKeyForEventComparison(host, sessionKey) !== + normalizeSessionKeyForEventComparison(host, host.sessionKey) + ) { + return; + } + + const operationId = toTrimmedString(payload.operationId) ?? `session-compact:${sessionKey}`; + const compactionHost = host as CompactionHost; + + if (payload.phase === "start") { + clearCompactionTimer(compactionHost); + compactionHost.compactionStatus = { + phase: "active", + runId: operationId, + startedAt: Date.now(), + completedAt: null, + }; + scheduleCompactionClear(compactionHost, COMPACTION_ACTIVE_STALE_TIMEOUT_MS, { + phase: "active", + runId: operationId, + }); + return; + } + + if (payload.phase !== "end") { + return; + } + if ( + compactionHost.compactionStatus?.runId && + compactionHost.compactionStatus.runId !== operationId + ) { + return; + } + clearCompactionTimer(compactionHost); + if (payload.completed === true) { + setCompactionComplete(compactionHost, operationId); + return; + } + compactionHost.compactionStatus = null; +} + export function handleCompactionEvent(host: CompactionHost, payload: AgentEventPayload) { const data = payload.data ?? {}; const phase = typeof data.phase === "string" ? data.phase : ""; @@ -432,7 +548,11 @@ function resolveAcceptedSession( }, ): { accepted: boolean; sessionKey?: string } { const sessionKey = typeof payload.sessionKey === "string" ? payload.sessionKey : undefined; - if (sessionKey && sessionKey !== host.sessionKey) { + if ( + sessionKey && + normalizeSessionKeyForEventComparison(host, sessionKey) !== + normalizeSessionKeyForEventComparison(host, host.sessionKey) + ) { return { accepted: false }; } if (!host.chatRunId && options?.allowSessionScopedWhenIdle && sessionKey) {