fix(gateway): fire typed session_end on shutdown/restart for active sessions (#57790)

`session_end` was only fired when a session was replaced, reset, deleted, or
compacted -- the gateway shutdown/restart paths closed the process without
enumerating active sessions, so downstream `session_end` plugins
(e.g. claude-mem) accumulated ghost rows in `active` state across restarts.
Issue reporter saw 11 orphaned sessions cause 63 timeouts/day from agent
pool exhaustion.

Add an in-memory active-session tracker
(`src/gateway/active-sessions-shutdown-tracker.ts`) populated by
`emitGatewaySessionStartPluginHook` and forgotten unconditionally by
`emitGatewaySessionEndPluginHook` (even when no plugin listens), so any
session that has already been finalized through the normal lifecycle is
never re-fired by the shutdown drain. The close handler then calls a new
`drainActiveSessionsForShutdown({ reason })` in `session-reset-service.ts`
between the `gateway:shutdown`/`gateway:pre-restart` lifecycle hooks and
the subsystem teardown steps; the drain races a bounded 2 s total timeout
so a slow plugin cannot block SIGTERM/SIGINT, surfacing the timeout as a
`session-end-drain` warning on the shutdown result.

Extend `PluginHookSessionEndReason` with `"shutdown"` and `"restart"` so
plugins can distinguish a graceful close from a planned restart; the close
handler picks `restart` when `restartExpectedMs` is set and `shutdown`
otherwise. Update `emitGatewaySessionStartPluginHook` to also accept
`storePath`, `sessionFile`, and `agentId` so the shutdown drain can build
the same `session_end` payload shape the normal lifecycle path emits, and
update the existing call sites in `session-reset-service.ts` and
`server-methods/sessions.ts` to pass those fields through.

Tests:

- `src/gateway/active-sessions-shutdown-tracker.test.ts` (new) -- tracker
  insert/forget/clear semantics, idempotent re-noting, empty-id guard,
  snapshot isolation.
- `src/gateway/drain-active-sessions-for-shutdown.test.ts` (new) -- drain
  fires `session_end` with the right reason for every tracked session,
  skips sessions already finalized via reset/delete/compaction, and still
  forgets sessions even when no `session_end` plugin is registered.
- `src/gateway/server-close.test.ts` -- four new cases covering the
  shutdown/restart drain wiring, the bounded timeout warning, and the
  drain-skipped-when-no-helper case.

Docs:

- `docs/plugins/hooks.md` documents the new `shutdown`/`restart` values
  on `PluginHookSessionEndReason`.
- `docs/automation/hooks.md` documents the post-`gateway:shutdown`
  `session_end` drain step and its bounded execution guarantee.

Fixes #57790.
This commit is contained in:
pandadev66
2026-05-11 16:24:57 +02:00
committed by Peter Steinberger
parent 8a8cb6fb30
commit dfa1a11676
13 changed files with 549 additions and 3 deletions

View File

@@ -62,6 +62,7 @@ Docs: https://docs.openclaw.ai
- Plugins/update: include beta-channel fallback details in plugin update outcomes when `@beta` is unavailable and OpenClaw uses the recorded default/latest plugin spec, making mixed beta/latest plugin cohorts visible in update summaries. Fixes #80689. Thanks @BKF-Gitty.
- Control UI/performance: scope Nodes polling to the active Nodes tab, debounce stale session-list reconciliation, and bound chat-side session refreshes so long-running dashboards avoid background reload churn. Thanks @BunsDev.
- Plugins/channels: explain bundled channel entry files that reach the legacy plugin loader as setup-runtime loader mismatches instead of generic missing-register failures. Thanks @chinar-amrutkar.
- Plugins/session-end: fire a typed `session_end` plugin hook with reason `shutdown` (or `restart` when a restart is expected) for every session that was still active when the gateway process stops. Previously SIGTERM/SIGINT/restart paths closed the gateway without enumerating active sessions, leaving downstream `session_end` plugins (e.g. claude-mem) with ghost rows accumulating across restarts. The new shutdown finalizer drains an in-memory tracker that is populated by `session_start` and forgotten by replace / reset / delete / compaction emitters, so previously-finalized sessions are never double-fired. The drain is bounded to a 2 s total budget so a slow plugin cannot block process exit. Adds `"shutdown"` and `"restart"` to `PluginHookSessionEndReason`. Fixes #57790. Thanks @pandadev66.
- Bonjour/Gateway: treat active ciao probing and fresh name-conflict renames as in-progress so the mDNS watchdog waits for probe settlement before retrying, preventing rapid re-advertise loops on Windows, WSL, and other multicast-hostile hosts. (#74778) Refs #74242. Thanks @fuller-stack-dev.
- Providers/MiniMax: send a minimal Anthropic-compatible user fallback when message conversion filters a turn to an empty payload, so MiniMax M2.7 no longer returns `chat content is empty` after tool-heavy sessions. Fixes #74589. Thanks @neeravmakwana and @DerekEXS.
- Tools/media: preserve implicit allow-all semantics from `tools.alsoAllow`-only policies when preconstructing built-in media generation and PDF tools, so configured media tools become live without forcing `tools.allow: ["*", ...]`. Fixes #77841. Thanks @trialanderrorstudios.

View File

@@ -135,6 +135,8 @@ plugin hook `before_agent_finalize` instead. See [Plugin hooks](/plugins/hooks).
**Gateway lifecycle events**: `gateway:shutdown` includes `reason` and `restartExpectedMs` and fires when gateway shutdown begins. `gateway:pre-restart` includes the same context but only fires when shutdown is part of an expected restart and a finite `restartExpectedMs` value is supplied. During shutdown, each lifecycle hook wait is best-effort and bounded so shutdown continues if a handler stalls.
Between the `gateway:shutdown` (or `gateway:pre-restart`) event and the rest of the shutdown sequence, the gateway also fires a typed `session_end` plugin hook for every session that was still active when the process stopped. The event's `reason` is `shutdown` for a plain SIGTERM/SIGINT stop and `restart` when the close was scheduled as part of an expected restart. This drain is bounded so a slow `session_end` handler cannot block process exit, and sessions that have already been finalized through replace / reset / delete / compaction are skipped to avoid double-firing.
## Hook discovery
Hooks are discovered from these directories, in order of increasing override precedence:

View File

@@ -134,7 +134,7 @@ observation-only.
**Sessions and compaction**
- `session_start` / `session_end` - track session lifecycle boundaries
- `session_start` / `session_end` - track session lifecycle boundaries. The event's `reason` is one of `new`, `reset`, `idle`, `daily`, `compaction`, `deleted`, `shutdown`, `restart`, or `unknown`. The `shutdown` and `restart` values fire from the gateway shutdown finalizer when the process is stopped or restarted while sessions are still active, so downstream plugins (such as memory or transcript stores) can finalize ghost rows that would otherwise be left in an open state across restarts. The finalizer is bounded so a slow plugin cannot block SIGTERM/SIGINT.
- `before_compaction` / `after_compaction` - observe or annotate compaction cycles
- `before_reset` - observe session-reset events (`/reset`, programmatic resets)

View File

@@ -0,0 +1,128 @@
import { afterEach, describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
clearActiveSessionsForShutdownTracker,
forgetActiveSessionForShutdown,
listActiveSessionsForShutdown,
noteActiveSessionForShutdown,
} from "./active-sessions-shutdown-tracker.js";
// Regression coverage for #57790: the in-memory active-session tracker that
// the close handler drains on shutdown / restart must be keyed by sessionId,
// must not double-track the same session, and must forget sessions that have
// already been finalized through replace / reset / delete / compaction so
// the shutdown drain never double-fires `session_end` for them.
const cfg: OpenClawConfig = {};
afterEach(() => {
clearActiveSessionsForShutdownTracker();
});
describe("active-sessions-shutdown-tracker", () => {
it("returns an empty list when no sessions have been noted", () => {
expect(listActiveSessionsForShutdown()).toEqual([]);
});
it("notes sessions keyed by sessionId so re-noting the same id replaces the entry", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "session-A",
storePath: "/tmp/store.json",
sessionFile: "/tmp/old.jsonl",
agentId: "main",
});
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "session-A",
storePath: "/tmp/store.json",
sessionFile: "/tmp/new.jsonl",
agentId: "main",
});
const entries = listActiveSessionsForShutdown();
expect(entries).toHaveLength(1);
expect(entries[0].sessionId).toBe("session-A");
expect(entries[0].sessionFile).toBe("/tmp/new.jsonl");
});
it("ignores empty sessionId notes", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "",
storePath: "/tmp/store.json",
});
expect(listActiveSessionsForShutdown()).toEqual([]);
});
it("forgets a session by id so a subsequent drain does not see it", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "session-A",
storePath: "/tmp/store.json",
});
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:other",
sessionId: "session-B",
storePath: "/tmp/store.json",
});
forgetActiveSessionForShutdown("session-A");
const entries = listActiveSessionsForShutdown();
expect(entries.map((entry) => entry.sessionId)).toEqual(["session-B"]);
});
it("treats forget on an unknown sessionId as a no-op", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "session-A",
storePath: "/tmp/store.json",
});
forgetActiveSessionForShutdown("does-not-exist");
forgetActiveSessionForShutdown(undefined);
expect(listActiveSessionsForShutdown()).toHaveLength(1);
});
it("returns a snapshot list so callers do not mutate the underlying tracker", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:main",
sessionId: "session-A",
storePath: "/tmp/store.json",
});
const snapshot = listActiveSessionsForShutdown();
snapshot.length = 0;
expect(listActiveSessionsForShutdown()).toHaveLength(1);
});
it("clears the entire tracker for test isolation", () => {
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:a",
sessionId: "session-A",
storePath: "/tmp/store.json",
});
noteActiveSessionForShutdown({
cfg,
sessionKey: "agent:main:b",
sessionId: "session-B",
storePath: "/tmp/store.json",
});
clearActiveSessionsForShutdownTracker();
expect(listActiveSessionsForShutdown()).toEqual([]);
});
});

View File

@@ -0,0 +1,47 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
// Module-level tracker of sessions that have received `session_start` but not
// yet a paired `session_end`. The close handler drains this set on gateway
// shutdown / restart so downstream `session_end` plugins (e.g. claude-mem)
// can finalize sessions that were active when the process stopped, instead
// of leaving ghost rows in `active` state across restarts (see #57790).
//
// Membership is keyed by `sessionId`. The existing session lifecycle paths
// (`emitGatewaySessionStartPluginHook` /
// `emitGatewaySessionEndPluginHook` in `session-reset-service.ts`) call into
// this tracker so a session that has already been finalized by replace /
// reset / delete / compaction is forgotten before the shutdown drain ever
// runs. That is what keeps the shutdown finalizer from double-firing.
export type ActiveSessionForShutdown = {
cfg: OpenClawConfig;
sessionKey: string;
sessionId: string;
storePath: string;
sessionFile?: string;
agentId?: string;
};
const trackedSessions = new Map<string, ActiveSessionForShutdown>();
export function noteActiveSessionForShutdown(entry: ActiveSessionForShutdown): void {
if (!entry.sessionId) {
return;
}
trackedSessions.set(entry.sessionId, entry);
}
export function forgetActiveSessionForShutdown(sessionId: string | undefined): void {
if (!sessionId) {
return;
}
trackedSessions.delete(sessionId);
}
export function listActiveSessionsForShutdown(): ActiveSessionForShutdown[] {
return Array.from(trackedSessions.values());
}
export function clearActiveSessionsForShutdownTracker(): void {
trackedSessions.clear();
}

View File

@@ -0,0 +1,168 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
// Regression coverage for #57790: the bounded shutdown drain must fire a
// typed `session_end` for every session the tracker has noted, must skip
// sessions that have already been finalized through replace / reset /
// delete / compaction (so we never double-fire), must respect the
// configured total timeout, and must propagate the reason ("shutdown" or
// "restart") into the plugin hook payload.
const runSessionEndMock = vi.fn(async () => undefined);
const hasHooksMock = vi.fn((name: string) => name === "session_end");
const getGlobalHookRunnerMock = vi.fn(() => ({
hasHooks: hasHooksMock,
runSessionEnd: runSessionEndMock,
runSessionStart: vi.fn(async () => undefined),
}));
vi.mock("../plugins/hook-runner-global.js", () => ({
getGlobalHookRunner: getGlobalHookRunnerMock,
}));
vi.mock("./session-transcript-files.fs.js", () => ({
resolveStableSessionEndTranscript: vi.fn(() => ({
sessionFile: undefined,
transcriptArchived: false,
})),
archiveSessionTranscriptsDetailed: vi.fn(() => []),
}));
vi.mock("../auto-reply/reply/session-hooks.js", () => ({
buildSessionEndHookPayload: vi.fn(
(params: { sessionId: string; reason: string; sessionKey: string }) => ({
event: { sessionId: params.sessionId, reason: params.reason, sessionKey: params.sessionKey },
context: { sessionId: params.sessionId, reason: params.reason },
}),
),
buildSessionStartHookPayload: vi.fn(() => ({ event: {}, context: {} })),
}));
const {
drainActiveSessionsForShutdown,
emitGatewaySessionEndPluginHook,
emitGatewaySessionStartPluginHook,
} = await import("./session-reset-service.js");
const { clearActiveSessionsForShutdownTracker, listActiveSessionsForShutdown } =
await import("./active-sessions-shutdown-tracker.js");
const cfg: OpenClawConfig = {};
beforeEach(() => {
clearActiveSessionsForShutdownTracker();
runSessionEndMock.mockClear();
hasHooksMock.mockClear();
hasHooksMock.mockImplementation((name: string) => name === "session_end");
});
afterEach(() => {
clearActiveSessionsForShutdownTracker();
});
describe("drainActiveSessionsForShutdown", () => {
it("returns an empty result and skips hook emission when no sessions are tracked", async () => {
const result = await drainActiveSessionsForShutdown({ reason: "shutdown" });
expect(result).toEqual({ emittedSessionIds: [], timedOut: false });
expect(runSessionEndMock).not.toHaveBeenCalled();
});
it("fires session_end with reason=shutdown for every tracked session and clears them", async () => {
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
});
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:other",
sessionId: "sess-B",
storePath: "/tmp/store.json",
});
const result = await drainActiveSessionsForShutdown({ reason: "shutdown" });
expect(result.timedOut).toBe(false);
expect(result.emittedSessionIds.sort()).toEqual(["sess-A", "sess-B"]);
expect(runSessionEndMock).toHaveBeenCalledTimes(2);
const reasons = runSessionEndMock.mock.calls.map(
([event]) => (event as { reason?: string }).reason,
);
expect(reasons.every((reason) => reason === "shutdown")).toBe(true);
// After the drain, the tracker forgets every emitted session (the emit
// helper calls `forgetActiveSessionForShutdown`), so a second drain is a
// no-op and we never double-fire on restart loops.
expect(listActiveSessionsForShutdown()).toEqual([]);
});
it("propagates reason=restart when called for a restart shutdown", async () => {
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
});
await drainActiveSessionsForShutdown({ reason: "restart" });
expect(runSessionEndMock).toHaveBeenCalledTimes(1);
expect((runSessionEndMock.mock.calls[0][0] as { reason?: string }).reason).toBe("restart");
});
it("does not double-fire for a session already finalized by reset/delete/compaction", async () => {
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
});
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:other",
sessionId: "sess-B",
storePath: "/tmp/store.json",
});
// Simulate sess-A being finalized through the normal reset path before
// the gateway is shut down: the matching `session_end` is fired with
// reason="reset" and the tracker forgets it.
emitGatewaySessionEndPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
reason: "reset",
});
runSessionEndMock.mockClear();
await drainActiveSessionsForShutdown({ reason: "shutdown" });
expect(runSessionEndMock).toHaveBeenCalledTimes(1);
expect((runSessionEndMock.mock.calls[0][0] as { sessionId?: string }).sessionId).toBe("sess-B");
});
it("still records the session as forgotten when no `session_end` plugins are registered", async () => {
hasHooksMock.mockImplementation(() => false);
emitGatewaySessionStartPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
});
// session_end fires while no plugin listens: hook is not run, but the
// shutdown tracker must still forget the session so the later drain
// does not pick it up.
emitGatewaySessionEndPluginHook({
cfg,
sessionKey: "agent:main:main",
sessionId: "sess-A",
storePath: "/tmp/store.json",
reason: "deleted",
});
expect(listActiveSessionsForShutdown()).toEqual([]);
const result = await drainActiveSessionsForShutdown({ reason: "shutdown" });
expect(result.emittedSessionIds).toEqual([]);
});
});

View File

@@ -1 +1,2 @@
export * from "./server-close.js";
export { drainActiveSessionsForShutdown } from "./session-reset-service.js";

View File

@@ -186,6 +186,68 @@ describe("createGatewayCloseHandler", () => {
).toBe(true);
});
it("drains the active-session tracker with reason=shutdown on SIGTERM/SIGINT close", async () => {
const drainActiveSessionsForShutdown = vi.fn(async () => ({
emittedSessionIds: ["session-A", "session-B"],
timedOut: false,
}));
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({ drainActiveSessionsForShutdown }),
);
await close({ reason: "SIGTERM" });
expect(drainActiveSessionsForShutdown).toHaveBeenCalledTimes(1);
expect(drainActiveSessionsForShutdown.mock.calls[0][0]).toMatchObject({
reason: "shutdown",
});
});
it("drains the active-session tracker with reason=restart when restartExpectedMs is set", async () => {
const drainActiveSessionsForShutdown = vi.fn(async () => ({
emittedSessionIds: ["session-A"],
timedOut: false,
}));
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({ drainActiveSessionsForShutdown }),
);
await close({ reason: "gateway restarting", restartExpectedMs: 1234 });
expect(drainActiveSessionsForShutdown).toHaveBeenCalledTimes(1);
expect(drainActiveSessionsForShutdown.mock.calls[0][0]).toMatchObject({
reason: "restart",
});
});
it("records a warning and continues shutdown when the session-end drain reports a timeout", async () => {
const drainActiveSessionsForShutdown = vi.fn(async () => ({
emittedSessionIds: ["session-A"],
timedOut: true,
}));
const close = createGatewayCloseHandler(
createGatewayCloseTestDeps({ drainActiveSessionsForShutdown }),
);
const result = await close({ reason: "SIGTERM" });
expect(drainActiveSessionsForShutdown).toHaveBeenCalledTimes(1);
expect(result.warnings).toContain("session-end-drain");
expect(
mocks.logWarn.mock.calls.some(([message]) =>
String(message).includes("session-end-drain timed out"),
),
).toBe(true);
});
it("skips the session-end drain step when no drain helper is provided", async () => {
const close = createGatewayCloseHandler(createGatewayCloseTestDeps());
const result = await close({ reason: "SIGTERM" });
expect(result.warnings).not.toContain("session-end-drain");
});
it("continues restart shutdown and records a warning when gateway pre-restart hook stalls", async () => {
vi.useFakeTimers();
mocks.triggerInternalHook.mockImplementation((event: InternalHookEvent) => {

View File

@@ -13,6 +13,7 @@ import { normalizeOptionalString } from "../shared/string-coerce.js";
const shutdownLog = createSubsystemLogger("gateway/shutdown");
const GATEWAY_SHUTDOWN_HOOK_TIMEOUT_MS = 1_000;
const GATEWAY_PRE_RESTART_HOOK_TIMEOUT_MS = 1_000;
const ACTIVE_SESSIONS_SHUTDOWN_DRAIN_TIMEOUT_MS = 2_000;
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
const HTTP_CLOSE_GRACE_MS = 1_000;
@@ -197,6 +198,10 @@ export function createGatewayCloseHandler(params: {
wss: WebSocketServer;
httpServer: HttpServer;
httpServers?: HttpServer[];
drainActiveSessionsForShutdown?: (params: {
reason: "shutdown" | "restart";
totalTimeoutMs?: number;
}) => Promise<{ emittedSessionIds: string[]; timedOut: boolean }>;
}) {
return async (opts?: {
reason?: string;
@@ -256,6 +261,26 @@ export function createGatewayCloseHandler(params: {
warnings,
);
}
if (params.drainActiveSessionsForShutdown) {
await shutdownStep(
"session-end-drain",
async () => {
const drainReason: "shutdown" | "restart" =
restartExpectedMs !== null ? "restart" : "shutdown";
const result = await params.drainActiveSessionsForShutdown!({
reason: drainReason,
totalTimeoutMs: ACTIVE_SESSIONS_SHUTDOWN_DRAIN_TIMEOUT_MS,
});
if (result.timedOut) {
shutdownLog.warn(
`session-end-drain timed out after ${ACTIVE_SESSIONS_SHUTDOWN_DRAIN_TIMEOUT_MS}ms after ${result.emittedSessionIds.length} sessions; continuing shutdown`,
);
recordShutdownWarning(warnings, "session-end-drain");
}
},
warnings,
);
}
if (params.bonjourStop) {
await shutdownStep("bonjour", () => params.bonjourStop!(), warnings);
}

View File

@@ -1328,6 +1328,9 @@ export const sessionsHandlers: GatewayRequestHandlers = {
sessionKey: target.canonicalKey,
sessionId: createdEntry.sessionId,
resumedFrom: parentEntry?.sessionId,
storePath: target.storePath,
sessionFile: createdEntry.sessionFile,
agentId: target.agentId,
});
}
},

View File

@@ -935,7 +935,8 @@ export async function startGatewayServer(
const createCloseHandler =
() => async (opts?: { reason?: string; restartExpectedMs?: number | null }) => {
const channelIds = listLoadedChannelPlugins().map((plugin) => plugin.id as ChannelId);
const { createGatewayCloseHandler } = await loadGatewayCloseModule();
const { createGatewayCloseHandler, drainActiveSessionsForShutdown } =
await loadGatewayCloseModule();
await createGatewayCloseHandler({
bonjourStop: runtimeState.bonjourStop,
tailscaleCleanup: runtimeState.tailscaleCleanup,
@@ -963,6 +964,7 @@ export async function startGatewayServer(
wss,
httpServer,
httpServers,
drainActiveSessionsForShutdown,
})(opts);
};
let clearFallbackGatewayContextForServer = () => {};

View File

@@ -37,6 +37,11 @@ import {
normalizeAgentId,
parseAgentSessionKey,
} from "../routing/session-key.js";
import {
forgetActiveSessionForShutdown,
listActiveSessionsForShutdown,
noteActiveSessionForShutdown,
} from "./active-sessions-shutdown-tracker.js";
import { ErrorCodes, errorShape } from "./protocol/index.js";
import {
archiveSessionTranscriptsDetailed,
@@ -102,7 +107,16 @@ export function emitGatewaySessionEndPluginHook(params: {
storePath: string;
sessionFile?: string;
agentId?: string;
reason: "new" | "reset" | "idle" | "daily" | "compaction" | "deleted" | "unknown";
reason:
| "new"
| "reset"
| "idle"
| "daily"
| "compaction"
| "deleted"
| "shutdown"
| "restart"
| "unknown";
archivedTranscripts?: ArchivedSessionTranscript[];
nextSessionId?: string;
nextSessionKey?: string;
@@ -110,6 +124,10 @@ export function emitGatewaySessionEndPluginHook(params: {
if (!params.sessionId) {
return;
}
// Drop this session from the shutdown finalizer's tracked set unconditionally
// -- even when no plugin hooks are registered for `session_end`, the session
// is being closed here and must not be re-finalized by a later shutdown drain.
forgetActiveSessionForShutdown(params.sessionId);
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("session_end")) {
return;
@@ -141,10 +159,29 @@ export function emitGatewaySessionStartPluginHook(params: {
sessionKey: string;
sessionId?: string;
resumedFrom?: string;
storePath?: string;
sessionFile?: string;
agentId?: string;
}): void {
if (!params.sessionId) {
return;
}
// Track the session for the shutdown finalizer even when no plugin hooks are
// registered locally, so a later restart still emits a typed `session_end`
// for sessions that opened while a `session_end` plugin was attached. The
// tracker is keyed by `sessionId`, so a session that is subsequently closed
// via reset / delete / compaction is forgotten before the shutdown drain
// ever runs (see #57790).
if (params.storePath) {
noteActiveSessionForShutdown({
cfg: params.cfg,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
storePath: params.storePath,
sessionFile: params.sessionFile,
agentId: params.agentId,
});
}
const hookRunner = getGlobalHookRunner();
if (!hookRunner?.hasHooks("session_start")) {
return;
@@ -160,6 +197,71 @@ export function emitGatewaySessionStartPluginHook(params: {
});
}
const SHUTDOWN_DRAIN_DEFAULT_TOTAL_TIMEOUT_MS = 2_000;
export type DrainActiveSessionsForShutdownResult = {
emittedSessionIds: string[];
timedOut: boolean;
};
/**
* Emit a typed `session_end` for every session that received `session_start`
* but did not yet receive a paired `session_end`. The bounded total timeout
* mirrors the gateway lifecycle hook timeout so a slow plugin cannot block
* SIGTERM/SIGINT past the runtime's overall shutdown grace window.
*
* Sessions that have already been finalized through replace / reset / delete /
* compaction are forgotten from the tracker by `emitGatewaySessionEndPluginHook`
* before this drain runs, so they will not be double-fired here.
*/
export async function drainActiveSessionsForShutdown(params: {
reason: "shutdown" | "restart";
totalTimeoutMs?: number;
}): Promise<DrainActiveSessionsForShutdownResult> {
const tracked = listActiveSessionsForShutdown();
if (tracked.length === 0) {
return { emittedSessionIds: [], timedOut: false };
}
const totalTimeoutMs = Math.max(
100,
Math.floor(params.totalTimeoutMs ?? SHUTDOWN_DRAIN_DEFAULT_TOTAL_TIMEOUT_MS),
);
const emittedSessionIds: string[] = [];
const drain = (async () => {
for (const entry of tracked) {
emitGatewaySessionEndPluginHook({
cfg: entry.cfg,
sessionKey: entry.sessionKey,
sessionId: entry.sessionId,
storePath: entry.storePath,
sessionFile: entry.sessionFile,
agentId: entry.agentId,
reason: params.reason,
});
emittedSessionIds.push(entry.sessionId);
}
})();
let timer: ReturnType<typeof setTimeout> | undefined;
const timeout = new Promise<"timeout">((resolve) => {
timer = setTimeout(() => resolve("timeout"), totalTimeoutMs);
timer.unref?.();
});
try {
const result = await Promise.race([drain.then(() => "ok" as const), timeout]);
if (result === "timeout") {
logVerbose(
`shutdown session-end drain timed out after ${totalTimeoutMs}ms with ${tracked.length - emittedSessionIds.length} sessions remaining`,
);
return { emittedSessionIds, timedOut: true };
}
return { emittedSessionIds, timedOut: false };
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
export async function emitSessionUnboundLifecycleEvent(params: {
targetSessionKey: string;
reason: "session-reset" | "session-delete";
@@ -682,6 +784,9 @@ export async function performGatewaySessionReset(params: {
sessionKey: target.canonicalKey ?? params.key,
sessionId: next.sessionId,
resumedFrom: oldSessionId,
storePath,
sessionFile: next.sessionFile as string | undefined,
agentId: target.agentId,
});
if (hadExistingEntry) {
await emitSessionUnboundLifecycleEvent({

View File

@@ -516,6 +516,8 @@ export type PluginHookSessionEndReason =
| "daily"
| "compaction"
| "deleted"
| "shutdown"
| "restart"
| "unknown";
export type PluginHookSessionEndEvent = {