fix(webchat): show manual compaction progress

Add first-class session.operation start/end events for manual compaction and render the existing WebChat compaction indicator from those events.

Co-authored-by: Conan Scott <271909525+Conan-Scott@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-05-16 13:58:44 +01:00
committed by GitHub
parent f410a95081
commit e71d10fd4d
16 changed files with 449 additions and 24 deletions

View File

@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Gateway/WebChat: route image attachments through a configured vision-capable `imageModel` plan before inlining images, and carry that image-model fallback chain through runtime retries. (#82524) Thanks @frankekn.
- WebChat: show progress while manual `/compact` is running by streaming a session operation event to subscribed Control UI clients. Fixes #82407. Thanks @Conan-Scott.
- Codex app-server: limit canonical OpenAI Codex app-server attribution rewrites to local transcript and trajectory records, leaving runtime/tool routing on the selected OpenAI model metadata so OpenAI API-key backup profiles keep their billing path.
- Android/chat: make bare and markdown URLs in chat messages tappable by preserving Compose URL annotations in rendered markdown. Fixes #82187. (#82392) Thanks @neeravmakwana.
- Plugins/doctor: migrate legacy top-level plugin `tools` declarations into `contracts.tools`, so `openclaw doctor --fix` repairs local plugins for the manifest tool contract. (#81112) Thanks @100yenadmin.

View File

@@ -1788,6 +1788,44 @@ public struct SessionCompactionCheckpoint: Codable, Sendable {
}
}
public struct SessionOperationEvent: Codable, Sendable {
public let operationid: String
public let operation: String
public let phase: AnyCodable
public let sessionkey: String
public let ts: Int
public let completed: Bool?
public let reason: String?
public init(
operationid: String,
operation: String,
phase: AnyCodable,
sessionkey: String,
ts: Int,
completed: Bool?,
reason: String?)
{
self.operationid = operationid
self.operation = operation
self.phase = phase
self.sessionkey = sessionkey
self.ts = ts
self.completed = completed
self.reason = reason
}
private enum CodingKeys: String, CodingKey {
case operationid = "operationId"
case operation
case phase
case sessionkey = "sessionKey"
case ts
case completed
case reason
}
}
public struct SessionsCompactionListParams: Codable, Sendable {
public let key: String

View File

@@ -317,8 +317,9 @@ Defaults include isolated state/config and base gateway port `19001`.
a generated dump of every callable helper route.
- Requests: `req(method, params)``res(ok/payload|error)`.
- Common events include `connect.challenge`, `agent`, `chat`,
`session.message`, `session.tool`, `sessions.changed`, `presence`, `tick`,
`health`, `heartbeat`, pairing/approval lifecycle events, and `shutdown`.
`session.message`, `session.operation`, `session.tool`, `sessions.changed`,
`presence`, `tick`, `health`, `heartbeat`, pairing/approval lifecycle events,
and `shutdown`.
Agent runs are two-stage:

View File

@@ -473,8 +473,9 @@ enumeration of `src/gateway/server-methods/*.ts`.
events. In protocol v4, delta payloads carry `deltaText`; `message` remains
the cumulative assistant snapshot. Non-prefix replacements set `replace=true`
and use `deltaText` as the replacement text.
- `session.message` and `session.tool`: transcript/event-stream updates for a
subscribed session.
- `session.message`, `session.operation`, and `session.tool`: transcript,
in-flight session operation, and event-stream updates for a subscribed
session.
- `sessions.changed`: session index or metadata changed.
- `presence`: system presence snapshot updates.
- `tick`: periodic keepalive / liveness event.

View File

@@ -310,6 +310,7 @@ import {
SessionsCompactionListParamsSchema,
type SessionsCompactionRestoreParams,
SessionsCompactionRestoreParamsSchema,
type SessionOperationEvent,
type SessionsCreateParams,
SessionsCreateParamsSchema,
type SessionsDeleteParams,
@@ -1177,6 +1178,7 @@ export type {
SessionsPreviewParams,
SessionsDescribeParams,
SessionsResolveParams,
SessionOperationEvent,
SessionsPatchParams,
SessionsPatchResult,
SessionsResetParams,

View File

@@ -233,6 +233,7 @@ import {
SessionsCompactionRestoreParamsSchema,
SessionsCompactionRestoreResultSchema,
SessionCompactionCheckpointSchema,
SessionOperationEventSchema,
SessionsCleanupParamsSchema,
SessionsCreateParamsSchema,
SessionsDeleteParamsSchema,
@@ -329,6 +330,7 @@ export const ProtocolSchemas = {
SessionsDescribeParams: SessionsDescribeParamsSchema,
SessionsResolveParams: SessionsResolveParamsSchema,
SessionCompactionCheckpoint: SessionCompactionCheckpointSchema,
SessionOperationEvent: SessionOperationEventSchema,
SessionsCompactionListParams: SessionsCompactionListParamsSchema,
SessionsCompactionGetParams: SessionsCompactionGetParamsSchema,
SessionsCompactionBranchParams: SessionsCompactionBranchParamsSchema,

View File

@@ -9,6 +9,19 @@ export const SessionCompactionCheckpointReasonSchema = Type.Union([
Type.Literal("timeout-retry"),
]);
export const SessionOperationEventSchema = Type.Object(
{
operationId: NonEmptyString,
operation: Type.Literal("compact"),
phase: Type.Union([Type.Literal("start"), Type.Literal("end")]),
sessionKey: NonEmptyString,
ts: Type.Integer({ minimum: 0 }),
completed: Type.Optional(Type.Boolean()),
reason: Type.Optional(Type.String()),
},
{ additionalProperties: false },
);
export const SessionCompactionTranscriptReferenceSchema = Type.Object(
{
sessionId: NonEmptyString,

View File

@@ -55,6 +55,7 @@ export type SessionsPreviewParams = SchemaType<"SessionsPreviewParams">;
export type SessionsDescribeParams = SchemaType<"SessionsDescribeParams">;
export type SessionsResolveParams = SchemaType<"SessionsResolveParams">;
export type SessionCompactionCheckpoint = SchemaType<"SessionCompactionCheckpoint">;
export type SessionOperationEvent = SchemaType<"SessionOperationEvent">;
export type SessionsCompactionListParams = SchemaType<"SessionsCompactionListParams">;
export type SessionsCompactionGetParams = SchemaType<"SessionsCompactionGetParams">;
export type SessionsCompactionBranchParams = SchemaType<"SessionsCompactionBranchParams">;

View File

@@ -43,6 +43,7 @@ const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
"node.pair.resolved": [PAIRING_SCOPE],
"sessions.changed": [READ_SCOPE],
"session.message": [READ_SCOPE],
"session.operation": [READ_SCOPE],
"session.tool": [READ_SCOPE],
};

View File

@@ -34,6 +34,7 @@ export const GATEWAY_EVENTS = [
"agent",
"chat",
"session.message",
"session.operation",
"session.tool",
"sessions.changed",
"presence",

View File

@@ -56,6 +56,7 @@ import { GATEWAY_CLIENT_IDS } from "../protocol/client-info.js";
import {
ErrorCodes,
errorShape,
type SessionOperationEvent,
validateSessionsAbortParams,
validateSessionsCleanupParams,
validateSessionsCompactParams,
@@ -329,6 +330,25 @@ function emitSessionsChanged(
);
}
function emitSessionOperation(
context: Pick<GatewayRequestContext, "broadcastToConnIds" | "getSessionEventSubscriberConnIds">,
payload: Omit<SessionOperationEvent, "ts">,
) {
const connIds = context.getSessionEventSubscriberConnIds();
if (connIds.size === 0) {
return;
}
context.broadcastToConnIds(
"session.operation",
{
...payload,
ts: Date.now(),
} satisfies SessionOperationEvent,
connIds,
{ dropIfSlow: true },
);
}
function rejectWebchatSessionMutation(params: {
action: "patch" | "delete" | "compact" | "restore";
client: GatewayClient | null;
@@ -2142,24 +2162,52 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const workspaceDir =
normalizeOptionalString(entry?.spawnedWorkspaceDir) ||
resolveAgentWorkspaceDir(cfg, target.agentId);
const result = await compactEmbeddedPiSession({
sessionId,
const operationId = randomUUID();
emitSessionOperation(context, {
operationId,
operation: "compact",
phase: "start",
sessionKey: target.canonicalKey,
allowGatewaySubagentBinding: true,
sessionFile: filePath,
workspaceDir,
config: cfg,
provider: resolvedModel.provider,
model: resolvedModel.model,
agentHarnessId: entry?.sessionId === sessionId ? entry.agentHarnessId : undefined,
thinkLevel: normalizeThinkLevel(entry?.thinkingLevel),
reasoningLevel: normalizeReasoningLevel(entry?.reasoningLevel),
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
trigger: "manual",
});
let result: Awaited<ReturnType<typeof compactEmbeddedPiSession>>;
try {
result = await compactEmbeddedPiSession({
sessionId,
sessionKey: target.canonicalKey,
allowGatewaySubagentBinding: true,
sessionFile: filePath,
workspaceDir,
config: cfg,
provider: resolvedModel.provider,
model: resolvedModel.model,
agentHarnessId: entry?.sessionId === sessionId ? entry.agentHarnessId : undefined,
thinkLevel: normalizeThinkLevel(entry?.thinkingLevel),
reasoningLevel: normalizeReasoningLevel(entry?.reasoningLevel),
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
trigger: "manual",
});
} catch (err) {
emitSessionOperation(context, {
operationId,
operation: "compact",
phase: "end",
sessionKey: target.canonicalKey,
completed: false,
reason: formatErrorMessage(err),
});
throw err;
}
emitSessionOperation(context, {
operationId,
operation: "compact",
phase: "end",
sessionKey: target.canonicalKey,
completed: result.ok && result.compacted,
reason: result.reason,
});
if (result.ok && result.compacted) {

View File

@@ -5,6 +5,7 @@ import { expect, test, vi } from "vitest";
import { withEnvAsync } from "../test-utils/env.js";
import {
embeddedRunMock,
onceMessage,
piSdkMock,
rpcReq,
startConnectedServerWithClient,
@@ -249,6 +250,23 @@ test("sessions.compact without maxLines runs embedded manual compaction for chec
});
const { ws } = await openClient();
await rpcReq(ws, "sessions.subscribe", {});
const startEventPromise = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.operation" &&
(message.payload as { operation?: unknown; phase?: unknown })?.operation === "compact" &&
(message.payload as { operation?: unknown; phase?: unknown })?.phase === "start",
);
const endEventPromise = onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "session.operation" &&
(message.payload as { operation?: unknown; phase?: unknown })?.operation === "compact" &&
(message.payload as { operation?: unknown; phase?: unknown })?.phase === "end",
);
const compacted = await rpcReq<{
ok: true;
key: string;
@@ -261,6 +279,34 @@ test("sessions.compact without maxLines runs embedded manual compaction for chec
expect(compacted.ok).toBe(true);
expect(compacted.payload?.key).toBe("agent:main:main");
expect(compacted.payload?.compacted).toBe(true);
const startEvent = await startEventPromise;
const endEvent = await endEventPromise;
const startPayload = startEvent.payload as {
operationId?: string;
sessionKey?: string;
ts?: number;
};
const endPayload = endEvent.payload as {
operationId?: string;
sessionKey?: string;
completed?: boolean;
ts?: number;
};
expect(startPayload).toMatchObject({
operation: "compact",
phase: "start",
sessionKey: "agent:main:main",
});
expect(endPayload).toMatchObject({
operation: "compact",
phase: "end",
sessionKey: "agent:main:main",
completed: true,
});
expect(startPayload.operationId).toBeTruthy();
expect(endPayload.operationId).toBe(startPayload.operationId);
expect(typeof startPayload.ts).toBe("number");
expect(typeof endPayload.ts).toBe("number");
expect(embeddedRunMock.compactEmbeddedPiSession).toHaveBeenCalledTimes(1);
const compactionCall = embeddedRunMock.compactEmbeddedPiSession.mock.calls.at(0)?.[0] as
| {

View File

@@ -5,6 +5,7 @@ const loadSessionsMock = vi.fn();
const loadChatHistoryMock = vi.fn();
const applySessionsChangedEventMock = vi.fn();
const handleChatEventMock = vi.fn(() => "idle");
const handleSessionOperationEventMock = vi.fn();
vi.mock("./app-chat.ts", () => ({
CHAT_SESSIONS_ACTIVE_MINUTES: 10,
@@ -21,6 +22,7 @@ vi.mock("./app-settings.ts", () => ({
}));
vi.mock("./app-tool-stream.ts", () => ({
handleAgentEvent: vi.fn(),
handleSessionOperationEvent: handleSessionOperationEventMock,
resetToolStream: vi.fn(),
}));
vi.mock("./controllers/agents.ts", () => ({
@@ -406,6 +408,28 @@ describe("handleGatewayEvent session.message", () => {
});
});
describe("handleGatewayEvent session.operation", () => {
it("routes session operation events to the tool stream state", () => {
handleSessionOperationEventMock.mockReset();
const host = createHost();
const payload = {
operationId: "operation-1",
operation: "compact",
phase: "start",
sessionKey: "agent:main:main",
};
handleGatewayEvent(host, {
type: "event",
event: "session.operation",
payload,
seq: 1,
});
expect(handleSessionOperationEventMock).toHaveBeenCalledWith(host, payload);
});
});
describe("addExecApproval", () => {
it("keeps the newest approval at the front of the queue", () => {
const queue = addExecApproval(

View File

@@ -18,7 +18,13 @@ import {
setLastActiveSessionKey,
syncUrlWithSessionKey,
} from "./app-settings.ts";
import { handleAgentEvent, resetToolStream, type AgentEventPayload } from "./app-tool-stream.ts";
import {
handleAgentEvent,
handleSessionOperationEvent,
resetToolStream,
type AgentEventPayload,
type SessionOperationEventPayload,
} from "./app-tool-stream.ts";
import { shouldReloadHistoryForFinalEvent } from "./chat-event-reload.ts";
import { reconcileChatRunLifecycle } from "./chat/run-lifecycle.ts";
import { parseChatSideResult, type ChatSideResult } from "./chat/side-result.ts";
@@ -842,6 +848,14 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) {
return;
}
if (evt.event === "session.operation") {
handleSessionOperationEvent(
host as unknown as Parameters<typeof handleSessionOperationEvent>[0],
evt.payload as SessionOperationEventPayload | undefined,
);
return;
}
if (evt.event === "presence") {
const payload = evt.payload as { presence?: PresenceEntry[] } | undefined;
if (payload?.presence && Array.isArray(payload.presence)) {

View File

@@ -1,6 +1,11 @@
// @vitest-environment node
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { handleAgentEvent, type FallbackStatus, type ToolStreamEntry } from "./app-tool-stream.ts";
import {
handleAgentEvent,
handleSessionOperationEvent,
type FallbackStatus,
type ToolStreamEntry,
} from "./app-tool-stream.ts";
type ToolStreamHost = Parameters<typeof handleAgentEvent>[0];
type AgentEvent = NonNullable<Parameters<typeof handleAgentEvent>[1]>;
@@ -353,6 +358,113 @@ describe("app-tool-stream fallback lifecycle handling", () => {
vi.useRealTimers();
});
it("shows manual session operation compaction progress while idle", () => {
useToolStreamFakeTimers();
const host = createHost({
sessionKey: "main",
hello: {
snapshot: {
sessionDefaults: {
defaultAgentId: "main",
mainKey: "main",
mainSessionKey: "agent:main:main",
},
},
},
});
handleSessionOperationEvent(host, {
operationId: "operation-1",
operation: "compact",
phase: "start",
sessionKey: "agent:main:main",
ts: TOOL_STREAM_TEST_NOW,
});
expect(host.compactionStatus).toEqual({
phase: "active",
runId: "operation-1",
startedAt: TOOL_STREAM_TEST_NOW,
completedAt: null,
});
handleSessionOperationEvent(host, {
operationId: "operation-1",
operation: "compact",
phase: "end",
sessionKey: "agent:main:main",
ts: TOOL_STREAM_TEST_NOW,
completed: true,
});
expect(host.compactionStatus).toEqual({
phase: "complete",
runId: "operation-1",
startedAt: TOOL_STREAM_TEST_NOW,
completedAt: TOOL_STREAM_TEST_NOW,
});
vi.useRealTimers();
});
it("ignores manual session operation compaction for other sessions", () => {
useToolStreamFakeTimers();
const host = createHost({ sessionKey: "agent:main:main" });
handleSessionOperationEvent(host, {
operationId: "operation-1",
operation: "compact",
phase: "start",
sessionKey: "agent:other:main",
ts: TOOL_STREAM_TEST_NOW,
});
expect(host.compactionStatus).toBeNull();
expect(host.compactionClearTimer).toBeNull();
vi.useRealTimers();
});
it("ignores stale manual session operation completion after a newer start", () => {
useToolStreamFakeTimers();
const host = createHost({ sessionKey: "agent:main:main" });
handleSessionOperationEvent(host, {
operationId: "operation-1",
operation: "compact",
phase: "start",
sessionKey: "agent:main:main",
ts: TOOL_STREAM_TEST_NOW,
});
handleSessionOperationEvent(host, {
operationId: "operation-2",
operation: "compact",
phase: "start",
sessionKey: "agent:main:main",
ts: TOOL_STREAM_TEST_NOW,
});
handleSessionOperationEvent(host, {
operationId: "operation-1",
operation: "compact",
phase: "end",
sessionKey: "agent:main:main",
ts: TOOL_STREAM_TEST_NOW,
completed: true,
});
expect(host.compactionStatus).toEqual({
phase: "active",
runId: "operation-2",
startedAt: TOOL_STREAM_TEST_NOW,
completedAt: null,
});
vi.advanceTimersByTime(5 * 60_000);
expect(host.compactionStatus).toBeNull();
expect(host.compactionClearTimer).toBeNull();
vi.useRealTimers();
});
it("treats lifecycle error as terminal for retry-pending compaction", () => {
useToolStreamFakeTimers();
const host = createHost();

View File

@@ -1,6 +1,7 @@
import { createChatModelOverride } from "./chat-model-ref.ts";
import type { ChatModelOverride } from "./chat-model-ref.types.ts";
import { formatUnknownText, truncateText } from "./format.ts";
import { buildAgentMainSessionKey, DEFAULT_AGENT_ID, DEFAULT_MAIN_KEY } from "./session-key.ts";
import { normalizeLowercaseStringOrEmpty } from "./string-coerce.ts";
const TOOL_STREAM_LIMIT = 50;
@@ -16,6 +17,16 @@ export type AgentEventPayload = {
data: Record<string, unknown>;
};
export type SessionOperationEventPayload = {
operationId?: string;
operation?: string;
phase?: string;
sessionKey?: string;
ts?: number;
completed?: boolean;
reason?: string;
};
export type ToolStreamEntry = {
toolCallId: string;
runId: string;
@@ -30,6 +41,11 @@ export type ToolStreamEntry = {
type ToolStreamHost = {
sessionKey: string;
hello?: {
snapshot?: {
sessionDefaults?: SessionDefaultsSnapshot;
};
} | null;
chatRunId: string | null;
chatStream: string | null;
chatStreamStartedAt: number | null;
@@ -41,6 +57,12 @@ type ToolStreamHost = {
chatModelOverrides?: Record<string, ChatModelOverride | null>;
};
type SessionDefaultsSnapshot = {
defaultAgentId?: string;
mainKey?: string;
mainSessionKey?: string;
};
function toTrimmedString(value: unknown): string | null {
if (typeof value !== "string") {
return null;
@@ -219,6 +241,49 @@ function syncSessionStatusModelOverride(host: ToolStreamHost, data: Record<strin
};
}
function readSessionDefaults(host: ToolStreamHost): SessionDefaultsSnapshot | undefined {
return host.hello?.snapshot?.sessionDefaults;
}
function resolveDefaultMainSessionKey(host: ToolStreamHost): string {
const defaults = readSessionDefaults(host);
const configuredMain = toTrimmedString(defaults?.mainSessionKey);
if (configuredMain) {
return configuredMain;
}
return buildAgentMainSessionKey({
agentId: toTrimmedString(defaults?.defaultAgentId) ?? DEFAULT_AGENT_ID,
mainKey: toTrimmedString(defaults?.mainKey) ?? DEFAULT_MAIN_KEY,
});
}
function normalizeSessionKeyForEventComparison(
host: ToolStreamHost,
value?: string,
): string | null {
const raw = toTrimmedString(value);
if (!raw) {
return null;
}
const defaults = readSessionDefaults(host);
const mainKey = toTrimmedString(defaults?.mainKey) ?? DEFAULT_MAIN_KEY;
const defaultAgentId = toTrimmedString(defaults?.defaultAgentId) ?? DEFAULT_AGENT_ID;
const canonicalMain = resolveDefaultMainSessionKey(host);
const aliases = new Set(
[
DEFAULT_MAIN_KEY,
mainKey,
canonicalMain,
buildAgentMainSessionKey({ agentId: defaultAgentId, mainKey: DEFAULT_MAIN_KEY }),
buildAgentMainSessionKey({ agentId: defaultAgentId, mainKey }),
].map((entry) => normalizeLowercaseStringOrEmpty(entry)),
);
const normalizedRaw = normalizeLowercaseStringOrEmpty(raw);
return aliases.has(normalizedRaw)
? normalizeLowercaseStringOrEmpty(canonicalMain)
: normalizedRaw;
}
function buildToolStreamMessage(entry: ToolStreamEntry): Record<string, unknown> {
const content: Array<Record<string, unknown>> = [];
content.push({
@@ -357,6 +422,57 @@ function setCompactionComplete(host: CompactionHost, runId: string) {
scheduleCompactionClear(host, COMPACTION_TOAST_DURATION_MS, { phase: "complete", runId });
}
export function handleSessionOperationEvent(
host: ToolStreamHost,
payload?: SessionOperationEventPayload,
) {
if (!payload || payload.operation !== "compact") {
return;
}
const sessionKey = toTrimmedString(payload.sessionKey);
if (
!sessionKey ||
normalizeSessionKeyForEventComparison(host, sessionKey) !==
normalizeSessionKeyForEventComparison(host, host.sessionKey)
) {
return;
}
const operationId = toTrimmedString(payload.operationId) ?? `session-compact:${sessionKey}`;
const compactionHost = host as CompactionHost;
if (payload.phase === "start") {
clearCompactionTimer(compactionHost);
compactionHost.compactionStatus = {
phase: "active",
runId: operationId,
startedAt: Date.now(),
completedAt: null,
};
scheduleCompactionClear(compactionHost, COMPACTION_ACTIVE_STALE_TIMEOUT_MS, {
phase: "active",
runId: operationId,
});
return;
}
if (payload.phase !== "end") {
return;
}
if (
compactionHost.compactionStatus?.runId &&
compactionHost.compactionStatus.runId !== operationId
) {
return;
}
clearCompactionTimer(compactionHost);
if (payload.completed === true) {
setCompactionComplete(compactionHost, operationId);
return;
}
compactionHost.compactionStatus = null;
}
export function handleCompactionEvent(host: CompactionHost, payload: AgentEventPayload) {
const data = payload.data ?? {};
const phase = typeof data.phase === "string" ? data.phase : "";
@@ -432,7 +548,11 @@ function resolveAcceptedSession(
},
): { accepted: boolean; sessionKey?: string } {
const sessionKey = typeof payload.sessionKey === "string" ? payload.sessionKey : undefined;
if (sessionKey && sessionKey !== host.sessionKey) {
if (
sessionKey &&
normalizeSessionKeyForEventComparison(host, sessionKey) !==
normalizeSessionKeyForEventComparison(host, host.sessionKey)
) {
return { accepted: false };
}
if (!host.chatRunId && options?.allowSessionScopedWhenIdle && sessionKey) {