mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:00:42 +00:00
refactor: share gateway send inflight handling
This commit is contained in:
@@ -36,7 +36,7 @@ import {
|
||||
validateSendParams,
|
||||
} from "../protocol/index.js";
|
||||
import { formatForLog } from "../ws-log.js";
|
||||
import type { GatewayRequestContext, GatewayRequestHandlers } from "./types.js";
|
||||
import type { GatewayRequestContext, GatewayRequestHandlers, RespondFn } from "./types.js";
|
||||
|
||||
type InflightResult = {
|
||||
ok: boolean;
|
||||
@@ -59,6 +59,44 @@ const getInflightMap = (context: GatewayRequestContext) => {
|
||||
return inflight;
|
||||
};
|
||||
|
||||
async function resolveGatewayInflightMap(params: {
|
||||
context: GatewayRequestContext;
|
||||
dedupeKey: string;
|
||||
respond: RespondFn;
|
||||
}): Promise<Map<string, Promise<InflightResult>> | undefined> {
|
||||
const cached = params.context.dedupe.get(params.dedupeKey);
|
||||
if (cached) {
|
||||
params.respond(cached.ok, cached.payload, cached.error, {
|
||||
cached: true,
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
const inflightMap = getInflightMap(params.context);
|
||||
const inflight = inflightMap.get(params.dedupeKey);
|
||||
if (inflight) {
|
||||
const result = await inflight;
|
||||
const meta = result.meta ? { ...result.meta, cached: true } : { cached: true };
|
||||
params.respond(result.ok, result.payload, result.error, meta);
|
||||
return undefined;
|
||||
}
|
||||
return inflightMap;
|
||||
}
|
||||
|
||||
async function runGatewayInflightWork(params: {
|
||||
inflightMap: Map<string, Promise<InflightResult>>;
|
||||
dedupeKey: string;
|
||||
work: Promise<InflightResult>;
|
||||
respond: RespondFn;
|
||||
}) {
|
||||
params.inflightMap.set(params.dedupeKey, params.work);
|
||||
try {
|
||||
const result = await params.work;
|
||||
params.respond(result.ok, result.payload, result.error, result.meta);
|
||||
} finally {
|
||||
params.inflightMap.delete(params.dedupeKey);
|
||||
}
|
||||
}
|
||||
|
||||
async function resolveRequestedChannel(params: {
|
||||
requestChannel: unknown;
|
||||
unsupportedMessage: (input: string) => string;
|
||||
@@ -185,6 +223,43 @@ function cacheGatewayDedupeFailure(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function createGatewayInflightSuccess(params: {
|
||||
context: GatewayRequestContext;
|
||||
dedupeKey: string;
|
||||
payload: unknown;
|
||||
channel: string;
|
||||
}): InflightResult {
|
||||
cacheGatewayDedupeSuccess({
|
||||
context: params.context,
|
||||
dedupeKey: params.dedupeKey,
|
||||
payload: params.payload,
|
||||
});
|
||||
return {
|
||||
ok: true,
|
||||
payload: params.payload,
|
||||
meta: { channel: params.channel },
|
||||
};
|
||||
}
|
||||
|
||||
function createGatewayInflightUnavailableFailure(params: {
|
||||
context: GatewayRequestContext;
|
||||
dedupeKey: string;
|
||||
channel: string;
|
||||
err: unknown;
|
||||
}): InflightResult {
|
||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(params.err));
|
||||
cacheGatewayDedupeFailure({
|
||||
context: params.context,
|
||||
dedupeKey: params.dedupeKey,
|
||||
error,
|
||||
});
|
||||
return {
|
||||
ok: false,
|
||||
error,
|
||||
meta: { channel: params.channel, error: formatForLog(params.err) },
|
||||
};
|
||||
}
|
||||
|
||||
export const sendHandlers: GatewayRequestHandlers = {
|
||||
"message.action": async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
@@ -235,19 +310,8 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
const senderIsOwner = callerIsFullOperator && request.senderIsOwner === true;
|
||||
const idem = request.idempotencyKey;
|
||||
const dedupeKey = `message.action:${idem}`;
|
||||
const cached = context.dedupe.get(dedupeKey);
|
||||
if (cached) {
|
||||
respond(cached.ok, cached.payload, cached.error, {
|
||||
cached: true,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const inflightMap = getInflightMap(context);
|
||||
const inflight = inflightMap.get(dedupeKey);
|
||||
if (inflight) {
|
||||
const result = await inflight;
|
||||
const meta = result.meta ? { ...result.meta, cached: true } : { cached: true };
|
||||
respond(result.ok, result.payload, result.error, meta);
|
||||
const inflightMap = await resolveGatewayInflightMap({ context, dedupeKey, respond });
|
||||
if (!inflightMap) {
|
||||
return;
|
||||
}
|
||||
const resolvedChannel = await resolveRequestedChannel({
|
||||
@@ -298,26 +362,13 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
return { ok: false, error, meta: { channel } };
|
||||
}
|
||||
const payload = extractToolPayload(handled);
|
||||
cacheGatewayDedupeSuccess({ context, dedupeKey, payload });
|
||||
return {
|
||||
ok: true,
|
||||
payload,
|
||||
meta: { channel },
|
||||
};
|
||||
return createGatewayInflightSuccess({ context, dedupeKey, payload, channel });
|
||||
} catch (err) {
|
||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
||||
cacheGatewayDedupeFailure({ context, dedupeKey, error });
|
||||
return { ok: false, error, meta: { channel, error: formatForLog(err) } };
|
||||
return createGatewayInflightUnavailableFailure({ context, dedupeKey, channel, err });
|
||||
}
|
||||
})();
|
||||
|
||||
inflightMap.set(dedupeKey, work);
|
||||
try {
|
||||
const result = await work;
|
||||
respond(result.ok, result.payload, result.error, result.meta);
|
||||
} finally {
|
||||
inflightMap.delete(dedupeKey);
|
||||
}
|
||||
await runGatewayInflightWork({ inflightMap, dedupeKey, work, respond });
|
||||
},
|
||||
send: async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
@@ -347,19 +398,8 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
};
|
||||
const idem = request.idempotencyKey;
|
||||
const dedupeKey = `send:${idem}`;
|
||||
const cached = context.dedupe.get(dedupeKey);
|
||||
if (cached) {
|
||||
respond(cached.ok, cached.payload, cached.error, {
|
||||
cached: true,
|
||||
});
|
||||
return;
|
||||
}
|
||||
const inflightMap = getInflightMap(context);
|
||||
const inflight = inflightMap.get(dedupeKey);
|
||||
if (inflight) {
|
||||
const result = await inflight;
|
||||
const meta = result.meta ? { ...result.meta, cached: true } : { cached: true };
|
||||
respond(result.ok, result.payload, result.error, meta);
|
||||
const inflightMap = await resolveGatewayInflightMap({ context, dedupeKey, respond });
|
||||
if (!inflightMap) {
|
||||
return;
|
||||
}
|
||||
const to = normalizeOptionalString(request.to) ?? "";
|
||||
@@ -496,26 +536,13 @@ export const sendHandlers: GatewayRequestHandlers = {
|
||||
throw new Error("No delivery result");
|
||||
}
|
||||
const payload = buildGatewayDeliveryPayload({ runId: idem, channel, result });
|
||||
cacheGatewayDedupeSuccess({ context, dedupeKey, payload });
|
||||
return {
|
||||
ok: true,
|
||||
payload,
|
||||
meta: { channel },
|
||||
};
|
||||
return createGatewayInflightSuccess({ context, dedupeKey, payload, channel });
|
||||
} catch (err) {
|
||||
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
|
||||
cacheGatewayDedupeFailure({ context, dedupeKey, error });
|
||||
return { ok: false, error, meta: { channel, error: formatForLog(err) } };
|
||||
return createGatewayInflightUnavailableFailure({ context, dedupeKey, channel, err });
|
||||
}
|
||||
})();
|
||||
|
||||
inflightMap.set(dedupeKey, work);
|
||||
try {
|
||||
const result = await work;
|
||||
respond(result.ok, result.payload, result.error, result.meta);
|
||||
} finally {
|
||||
inflightMap.delete(dedupeKey);
|
||||
}
|
||||
await runGatewayInflightWork({ inflightMap, dedupeKey, work, respond });
|
||||
},
|
||||
poll: async ({ params, respond, context, client }) => {
|
||||
const p = params;
|
||||
|
||||
Reference in New Issue
Block a user