From 60ec7ca0f1837516ea828fcc54574d148566c64f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 20 Apr 2026 14:20:05 +0100 Subject: [PATCH] refactor: share gateway send inflight handling --- src/gateway/server-methods/send.ts | 145 +++++++++++++++++------------ 1 file changed, 86 insertions(+), 59 deletions(-) diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 4316dec9653..330ee489585 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -36,7 +36,7 @@ import { validateSendParams, } from "../protocol/index.js"; import { formatForLog } from "../ws-log.js"; -import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js"; +import type { GatewayRequestContext, GatewayRequestHandlers, RespondFn } from "./types.js"; type InflightResult = { ok: boolean; @@ -59,6 +59,44 @@ const getInflightMap = (context: GatewayRequestContext) => { return inflight; }; +async function resolveGatewayInflightMap(params: { + context: GatewayRequestContext; + dedupeKey: string; + respond: RespondFn; +}): Promise> | undefined> { + const cached = params.context.dedupe.get(params.dedupeKey); + if (cached) { + params.respond(cached.ok, cached.payload, cached.error, { + cached: true, + }); + return undefined; + } + const inflightMap = getInflightMap(params.context); + const inflight = inflightMap.get(params.dedupeKey); + if (inflight) { + const result = await inflight; + const meta = result.meta ? { ...result.meta, cached: true } : { cached: true }; + params.respond(result.ok, result.payload, result.error, meta); + return undefined; + } + return inflightMap; +} + +async function runGatewayInflightWork(params: { + inflightMap: Map>; + dedupeKey: string; + work: Promise; + respond: RespondFn; +}) { + params.inflightMap.set(params.dedupeKey, params.work); + try { + const result = await params.work; + params.respond(result.ok, result.payload, result.error, result.meta); + } finally { + params.inflightMap.delete(params.dedupeKey); + } +} + async function resolveRequestedChannel(params: { requestChannel: unknown; unsupportedMessage: (input: string) => string; @@ -185,6 +223,43 @@ function cacheGatewayDedupeFailure(params: { }); } +function createGatewayInflightSuccess(params: { + context: GatewayRequestContext; + dedupeKey: string; + payload: unknown; + channel: string; +}): InflightResult { + cacheGatewayDedupeSuccess({ + context: params.context, + dedupeKey: params.dedupeKey, + payload: params.payload, + }); + return { + ok: true, + payload: params.payload, + meta: { channel: params.channel }, + }; +} + +function createGatewayInflightUnavailableFailure(params: { + context: GatewayRequestContext; + dedupeKey: string; + channel: string; + err: unknown; +}): InflightResult { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(params.err)); + cacheGatewayDedupeFailure({ + context: params.context, + dedupeKey: params.dedupeKey, + error, + }); + return { + ok: false, + error, + meta: { channel: params.channel, error: formatForLog(params.err) }, + }; +} + export const sendHandlers: GatewayRequestHandlers = { "message.action": async ({ params, respond, context, client }) => { const p = params; @@ -235,19 +310,8 @@ export const sendHandlers: GatewayRequestHandlers = { const senderIsOwner = callerIsFullOperator && request.senderIsOwner === true; const idem = request.idempotencyKey; const dedupeKey = `message.action:${idem}`; - const cached = context.dedupe.get(dedupeKey); - if (cached) { - respond(cached.ok, cached.payload, cached.error, { - cached: true, - }); - return; - } - const inflightMap = getInflightMap(context); - const inflight = inflightMap.get(dedupeKey); - if (inflight) { - const result = await inflight; - const meta = result.meta ? { ...result.meta, cached: true } : { cached: true }; - respond(result.ok, result.payload, result.error, meta); + const inflightMap = await resolveGatewayInflightMap({ context, dedupeKey, respond }); + if (!inflightMap) { return; } const resolvedChannel = await resolveRequestedChannel({ @@ -298,26 +362,13 @@ export const sendHandlers: GatewayRequestHandlers = { return { ok: false, error, meta: { channel } }; } const payload = extractToolPayload(handled); - cacheGatewayDedupeSuccess({ context, dedupeKey, payload }); - return { - ok: true, - payload, - meta: { channel }, - }; + return createGatewayInflightSuccess({ context, dedupeKey, payload, channel }); } catch (err) { - const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - cacheGatewayDedupeFailure({ context, dedupeKey, error }); - return { ok: false, error, meta: { channel, error: formatForLog(err) } }; + return createGatewayInflightUnavailableFailure({ context, dedupeKey, channel, err }); } })(); - inflightMap.set(dedupeKey, work); - try { - const result = await work; - respond(result.ok, result.payload, result.error, result.meta); - } finally { - inflightMap.delete(dedupeKey); - } + await runGatewayInflightWork({ inflightMap, dedupeKey, work, respond }); }, send: async ({ params, respond, context, client }) => { const p = params; @@ -347,19 +398,8 @@ export const sendHandlers: GatewayRequestHandlers = { }; const idem = request.idempotencyKey; const dedupeKey = `send:${idem}`; - const cached = context.dedupe.get(dedupeKey); - if (cached) { - respond(cached.ok, cached.payload, cached.error, { - cached: true, - }); - return; - } - const inflightMap = getInflightMap(context); - const inflight = inflightMap.get(dedupeKey); - if (inflight) { - const result = await inflight; - const meta = result.meta ? { ...result.meta, cached: true } : { cached: true }; - respond(result.ok, result.payload, result.error, meta); + const inflightMap = await resolveGatewayInflightMap({ context, dedupeKey, respond }); + if (!inflightMap) { return; } const to = normalizeOptionalString(request.to) ?? ""; @@ -496,26 +536,13 @@ export const sendHandlers: GatewayRequestHandlers = { throw new Error("No delivery result"); } const payload = buildGatewayDeliveryPayload({ runId: idem, channel, result }); - cacheGatewayDedupeSuccess({ context, dedupeKey, payload }); - return { - ok: true, - payload, - meta: { channel }, - }; + return createGatewayInflightSuccess({ context, dedupeKey, payload, channel }); } catch (err) { - const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); - cacheGatewayDedupeFailure({ context, dedupeKey, error }); - return { ok: false, error, meta: { channel, error: formatForLog(err) } }; + return createGatewayInflightUnavailableFailure({ context, dedupeKey, channel, err }); } })(); - inflightMap.set(dedupeKey, work); - try { - const result = await work; - respond(result.ok, result.payload, result.error, result.meta); - } finally { - inflightMap.delete(dedupeKey); - } + await runGatewayInflightWork({ inflightMap, dedupeKey, work, respond }); }, poll: async ({ params, respond, context, client }) => { const p = params;