Files
openclaw/src/gateway/server-runtime-subscriptions.ts
clawsweeper[bot] 22e8cd2a1d fix(gateway): clear completed session active runs (#87810)
Summary:
- This PR adds an internal gateway active-run projection flag, clears it during terminal lifecycle handling be ... ons.list on that flag, adds gateway regression coverage, and tightens memory-wiki confidence normalization.
- PR surface: Source +29, Tests +131. Total +160 across 7 files.
- Reproducibility: yes. Source inspection shows current main can broadcast terminal sessions.changed before ch ...  the abort-controller entry, and the before/after recording supports the visible stuck In progress symptom.

Automerge notes:
- PR branch already contained follow-up commit before automerge: fix(gateway): preserve chat retry guard after terminal state
- PR branch already contained follow-up commit before automerge: fix(gateway): clear completed session active runs

Validation:
- ClawSweeper review passed for head 9b132bdc2b.
- Required merge gates passed before the squash merge.

Prepared head SHA: 9b132bdc2b
Review: https://github.com/openclaw/openclaw/pull/87810#issuecomment-4569094800

Co-authored-by: scotthuang <scotthuang@tencent.com>
Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com>
Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com>
Approved-by: takhoffman
Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
2026-05-29 05:03:10 +00:00

120 lines
4.6 KiB
TypeScript

import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
import type { ChatAbortControllerEntry } from "./chat-abort.js";
import type {
ChatRunState,
SessionEventSubscriberRegistry,
SessionMessageSubscriberRegistry,
ToolEventRecipientRegistry,
} from "./server-chat-state.js";
export function startGatewayEventSubscriptions(params: {
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
broadcastToConnIds: (
event: string,
payload: unknown,
connIds: ReadonlySet<string>,
opts?: { dropIfSlow?: boolean },
) => void;
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
agentRunSeq: Map<string, number>;
chatRunState: ChatRunState;
toolEventRecipients: ToolEventRecipientRegistry;
sessionEventSubscribers: SessionEventSubscriberRegistry;
sessionMessageSubscribers: SessionMessageSubscriberRegistry;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
}) {
let agentEventHandlerPromise: Promise<
ReturnType<typeof import("./server-chat.js").createAgentEventHandler>
> | null = null;
const getAgentEventHandler = () => {
agentEventHandlerPromise ??= Promise.all([
import("./server-chat.js"),
import("./server-session-key.js"),
]).then(([{ createAgentEventHandler }, { resolveSessionKeyForRun }]) =>
createAgentEventHandler({
broadcast: params.broadcast,
broadcastToConnIds: params.broadcastToConnIds,
nodeSendToSession: params.nodeSendToSession,
agentRunSeq: params.agentRunSeq,
chatRunState: params.chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients: params.toolEventRecipients,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
clearTrackedActiveRun: ({ runId, clientRunId }) => {
const candidateRunIds = runId === clientRunId ? [runId] : [runId, clientRunId];
for (const candidateRunId of candidateRunIds) {
const entry = params.chatAbortControllers.get(candidateRunId);
// Chat abort entries can hold the requested key while chat run
// state holds the canonical key; the run ids are the scoped match.
if (entry) {
entry.projectSessionActive = false;
}
}
},
isChatSendRunActive: (runId) => {
const entry = params.chatAbortControllers.get(runId);
return entry !== undefined && entry.kind !== "agent";
},
}),
);
return agentEventHandlerPromise;
};
let transcriptUpdateHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createTranscriptUpdateBroadcastHandler>
> | null = null;
const getTranscriptUpdateHandler = () => {
transcriptUpdateHandlerPromise ??= import("./server-session-events.js").then(
({ createTranscriptUpdateBroadcastHandler }) =>
createTranscriptUpdateBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
sessionMessageSubscribers: params.sessionMessageSubscribers,
}),
);
return transcriptUpdateHandlerPromise;
};
let lifecycleEventHandlerPromise: Promise<
ReturnType<typeof import("./server-session-events.js").createLifecycleEventBroadcastHandler>
> | null = null;
const getLifecycleEventHandler = () => {
lifecycleEventHandlerPromise ??= import("./server-session-events.js").then(
({ createLifecycleEventBroadcastHandler }) =>
createLifecycleEventBroadcastHandler({
broadcastToConnIds: params.broadcastToConnIds,
sessionEventSubscribers: params.sessionEventSubscribers,
}),
);
return lifecycleEventHandlerPromise;
};
const agentUnsub = onAgentEvent((evt) => {
void getAgentEventHandler().then((handler) => handler(evt));
});
const heartbeatUnsub = onHeartbeatEvent((evt) => {
params.broadcast("heartbeat", evt, { dropIfSlow: true });
});
const transcriptUnsub = onSessionTranscriptUpdate((evt) => {
void getTranscriptUpdateHandler().then((handler) => handler(evt));
});
const lifecycleUnsub = onSessionLifecycleEvent((evt) => {
void getLifecycleEventHandler().then((handler) => handler(evt));
});
return {
agentUnsub,
heartbeatUnsub,
transcriptUnsub,
lifecycleUnsub,
};
}