import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js"; import { createRunStateMachine, type RunStateStatusSink } from "../channels/run-state-machine.js"; import { KeyedAsyncQueue } from "./keyed-async-queue.js"; type CloseAwareServer = { once: (event: "close", listener: () => void) => unknown; }; type PassiveAccountLifecycleParams = { abortSignal?: AbortSignal; start: () => Promise; stop?: (handle: Handle) => void | Promise; onStop?: () => void | Promise; }; export type ChannelRunQueueTaskContext = { lifecycleSignal?: AbortSignal; }; export type ChannelRunQueue = { enqueue: (key: string, task: (context: ChannelRunQueueTaskContext) => Promise) => void; deactivate: () => void; }; export type ChannelRunQueueParams = { setStatus?: RunStateStatusSink; abortSignal?: AbortSignal; onError?: (error: unknown) => void; }; /** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */ export function createAccountStatusSink(params: { accountId: string; setStatus: (next: ChannelAccountSnapshot) => void; }): (patch: Omit) => void { return (patch) => { params.setStatus({ accountId: params.accountId, ...patch }); }; } /** * Serialize channel work per key while keeping lifecycle/busy accounting out of * channel-specific message handlers. The queue does not impose run timeouts; * callers should rely on session/tool/runtime lifecycle for long-running work. */ export function createChannelRunQueue(params: ChannelRunQueueParams): ChannelRunQueue { const queue = new KeyedAsyncQueue(); const runState = createRunStateMachine({ setStatus: params.setStatus, abortSignal: params.abortSignal, }); const reportError = (error: unknown) => { try { params.onError?.(error); } catch { // Keep queue error handling best-effort; callers should not create a // secondary unhandled rejection from their reporting hook. } }; return { enqueue(key, task) { void queue .enqueue(key, async () => { if (!runState.isActive()) { return; } runState.onRunStart(); try { if (!runState.isActive()) { return; } await task({ lifecycleSignal: params.abortSignal }); } finally { runState.onRunEnd(); } }) .catch(reportError); }, deactivate: runState.deactivate, }; } /** * Return a promise that resolves when the signal is aborted. * * If no signal is provided, the promise stays pending forever. When provided, * `onAbort` runs once before the promise resolves. */ export function waitUntilAbort( signal?: AbortSignal, onAbort?: () => void | Promise, ): Promise { return new Promise((resolve, reject) => { const complete = () => { Promise.resolve(onAbort?.()).then(() => resolve(), reject); }; if (!signal) { return; } if (signal.aborted) { complete(); return; } signal.addEventListener("abort", complete, { once: true }); }); } /** * Keep a passive account task alive until abort, then run optional cleanup. */ export async function runPassiveAccountLifecycle( params: PassiveAccountLifecycleParams, ): Promise { const handle = await params.start(); try { await waitUntilAbort(params.abortSignal); } finally { await params.stop?.(handle); await params.onStop?.(); } } /** * Keep a channel/provider task pending until the HTTP server closes. * * When an abort signal is provided, `onAbort` is invoked once and should * trigger server shutdown. The returned promise resolves only after `close`. */ export async function keepHttpServerTaskAlive(params: { server: CloseAwareServer; abortSignal?: AbortSignal; onAbort?: () => void | Promise; }): Promise { const { server, abortSignal, onAbort } = params; let abortTask: Promise = Promise.resolve(); let abortTriggered = false; const triggerAbort = () => { if (abortTriggered) { return; } abortTriggered = true; abortTask = Promise.resolve(onAbort?.()).then(() => undefined); }; const onAbortSignal = () => { triggerAbort(); }; if (abortSignal) { if (abortSignal.aborted) { triggerAbort(); } else { abortSignal.addEventListener("abort", onAbortSignal, { once: true }); } } await new Promise((resolve) => { server.once("close", () => resolve()); }); if (abortSignal) { abortSignal.removeEventListener("abort", onAbortSignal); } await abortTask; }