Files
openclaw/src/gateway/server-runtime-subscriptions.ts
Val Alexander 119a01c829 fix(webchat): stabilize live transcript run state
Stabilize WebChat transcript/run-state truth for Codex and selected-session observers.

Summary:
- Mirror Codex inbound prompts at turn start without duplicating suppressed persisted prompts.
- Deliver hidden external-channel live chat/tool/agent updates only to exact selected-session subscribers.
- Repair Control UI selected-session subscription state, alias-aware run adoption, and accumulated stream dedupe.
- Add focused Codex, gateway/session-event, and Control UI regression coverage.

Verification:
- Current-head CI: 101 green, 0 pending; stale canceled entries are superseded automation from prior force-pushed heads.
- Local focused Vitest shards passed: Codex app-server 2 files / 233 tests, gateway/session 4 files / 116 tests, UI 7 files / 238 tests.
- `node scripts/run-tsgo.mjs -p test/tsconfig/tsconfig.core.test.json --incremental --tsBuildInfoFile .artifacts/tsgo-cache/core-test.tsbuildinfo`
- `node --import tsx scripts/check-no-extension-test-core-imports.ts`
- `git diff --check origin/main..HEAD`

Closes #83528.
Closes #82611.
Refs #83949.
2026-05-24 23:07:29 -05:00

109 lines
4.1 KiB
TypeScript

import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import type {
ChatRunState,
SessionEventSubscriberRegistry,
SessionMessageSubscriberRegistry,
ToolEventRecipientRegistry,
} from "./server-chat-state.js";
export function startGatewayEventSubscriptions(params: {
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
broadcastToConnIds: (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: { dropIfSlow?: boolean },
) => void;
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
agentRunSeq: Map<string, number>;
chatRunState: ChatRunState;
toolEventRecipients: ToolEventRecipientRegistry;
sessionEventSubscribers: SessionEventSubscriberRegistry;
sessionMessageSubscribers: SessionMessageSubscriberRegistry;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}) {
let agentEventHandlerPromise: Promise<
ReturnType<typeof import("./server-chat.js").createAgentEventHandler>
> | null = null;
const getAgentEventHandler = () => {
agentEventHandlerPromise ??= Promise.all([
import("./server-chat.js"),
import("./server-session-key.js"),
]).then(([{ createAgentEventHandler }, { resolveSessionKeyForRun }]) =>
createAgentEventHandler({
broadcast: params.broadcast,
broadcastToConnIds: params.broadcastToConnIds,
nodeSendToSession: params.nodeSendToSession,
agentRunSeq: params.agentRunSeq,
chatRunState: params.chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients: params.toolEventRecipients,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
isChatSendRunActive: (runId) => {
const entry = params.chatAbortControllers.get(runId);
return entry !== undefined && entry.kind !== "agent";
},
}),
);
return agentEventHandlerPromise;
};
let transcriptUpdateHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createTranscriptUpdateBroadcastHandler>
> | null = null;
const getTranscriptUpdateHandler = () => {
transcriptUpdateHandlerPromise ??= import("./server-session-events.js").then(
({ createTranscriptUpdateBroadcastHandler }) =>
createTranscriptUpdateBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
}),
);
return transcriptUpdateHandlerPromise;
};
let lifecycleEventHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createLifecycleEventBroadcastHandler>
> | null = null;
const getLifecycleEventHandler = () => {
lifecycleEventHandlerPromise ??= import("./server-session-events.js").then(
({ createLifecycleEventBroadcastHandler }) =>
createLifecycleEventBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
}),
);
return lifecycleEventHandlerPromise;
};
const agentUnsub = onAgentEvent((evt) => {
void getAgentEventHandler().then((handler) => handler(evt));
});
const heartbeatUnsub = onHeartbeatEvent((evt) => {
params.broadcast("heartbeat", evt, { dropIfSlow: true });
});
const transcriptUnsub = onSessionTranscriptUpdate((evt) => {
void getTranscriptUpdateHandler().then((handler) => handler(evt));
});
const lifecycleUnsub = onSessionLifecycleEvent((evt) => {
void getLifecycleEventHandler().then((handler) => handler(evt));
});
return {
agentUnsub,
heartbeatUnsub,
transcriptUnsub,
lifecycleUnsub,
};
}