mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 23:04:45 +00:00
refactor(auto-reply): simplify foreground freshness fence
This commit is contained in:
@@ -28,12 +28,10 @@ type Delivery = {
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
const promise = new Promise<T>((res) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
function queuedFinalResult() {
|
||||
@@ -88,7 +86,6 @@ describe("foreground reply freshness", () => {
|
||||
it("suppresses an older foreground final after a newer inbound turn starts for the same session target", async () => {
|
||||
const deliveries: Delivery[] = [];
|
||||
const olderStarted = createDeferred<void>();
|
||||
const newerStarted = createDeferred<void>();
|
||||
const releaseOlderFinal = createDeferred<void>();
|
||||
|
||||
hoisted.dispatchReplyFromConfigMock.mockImplementation(
|
||||
@@ -100,7 +97,6 @@ describe("foreground reply freshness", () => {
|
||||
return queuedFinalResult();
|
||||
}
|
||||
if (params.ctx.MessageSid === "new-message") {
|
||||
newerStarted.resolve();
|
||||
params.dispatcher.sendFinalReply({ text: "new final" });
|
||||
return queuedFinalResult();
|
||||
}
|
||||
@@ -114,12 +110,10 @@ describe("foreground reply freshness", () => {
|
||||
);
|
||||
await olderStarted.promise;
|
||||
|
||||
const newerDispatch = dispatchWithDeliveries(
|
||||
const newerResult = await dispatchWithDeliveries(
|
||||
buildForegroundCtx({ MessageSid: "new-message" }),
|
||||
deliveries,
|
||||
);
|
||||
await newerStarted.promise;
|
||||
const newerResult = await newerDispatch;
|
||||
|
||||
releaseOlderFinal.resolve();
|
||||
const olderResult = await olderDispatch;
|
||||
@@ -139,10 +133,9 @@ describe("foreground reply freshness", () => {
|
||||
const deliveries: Delivery[] = [];
|
||||
const beforeDeliverStarted = createDeferred<void>();
|
||||
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
|
||||
const newerStarted = createDeferred<void>();
|
||||
const beforeDeliver = vi.fn(async () => {
|
||||
const beforeDeliver = vi.fn(() => {
|
||||
beforeDeliverStarted.resolve();
|
||||
return await releaseBeforeDeliver.promise;
|
||||
return releaseBeforeDeliver.promise;
|
||||
});
|
||||
|
||||
hoisted.dispatchReplyFromConfigMock.mockImplementation(
|
||||
@@ -152,7 +145,6 @@ describe("foreground reply freshness", () => {
|
||||
return queuedFinalResult();
|
||||
}
|
||||
if (params.ctx.MessageSid === "new-message") {
|
||||
newerStarted.resolve();
|
||||
return {
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
@@ -173,7 +165,6 @@ describe("foreground reply freshness", () => {
|
||||
buildForegroundCtx({ MessageSid: "new-message" }),
|
||||
deliveries,
|
||||
);
|
||||
await newerStarted.promise;
|
||||
|
||||
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
|
||||
const olderResult = await olderDispatch;
|
||||
@@ -193,7 +184,6 @@ describe("foreground reply freshness", () => {
|
||||
it("keeps concurrent foreground finals isolated for different targets sharing a session", async () => {
|
||||
const deliveries: Delivery[] = [];
|
||||
const firstStarted = createDeferred<void>();
|
||||
const secondStarted = createDeferred<void>();
|
||||
const releaseFirstFinal = createDeferred<void>();
|
||||
|
||||
hoisted.dispatchReplyFromConfigMock.mockImplementation(
|
||||
@@ -205,7 +195,6 @@ describe("foreground reply freshness", () => {
|
||||
return queuedFinalResult();
|
||||
}
|
||||
if (params.ctx.MessageSid === "second-chat") {
|
||||
secondStarted.resolve();
|
||||
params.dispatcher.sendFinalReply({ text: "second chat final" });
|
||||
return queuedFinalResult();
|
||||
}
|
||||
@@ -234,7 +223,6 @@ describe("foreground reply freshness", () => {
|
||||
}),
|
||||
deliveries,
|
||||
);
|
||||
await secondStarted.promise;
|
||||
await expect(secondDispatch).resolves.toEqual({
|
||||
queuedFinal: true,
|
||||
counts: { tool: 0, block: 0, final: 1 },
|
||||
|
||||
@@ -92,8 +92,13 @@ function beginForegroundReplyFence(
|
||||
};
|
||||
}
|
||||
|
||||
function isForegroundReplyFenceSuperseded(snapshot: ForegroundReplyFenceSnapshot): boolean {
|
||||
return (foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation;
|
||||
function isForegroundReplyFenceSuperseded(
|
||||
snapshot: ForegroundReplyFenceSnapshot | undefined,
|
||||
): boolean {
|
||||
return Boolean(
|
||||
snapshot &&
|
||||
(foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation,
|
||||
);
|
||||
}
|
||||
|
||||
function endForegroundReplyFence(snapshot: ForegroundReplyFenceSnapshot): void {
|
||||
@@ -283,26 +288,24 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
|
||||
}): Promise<DispatchInboundResult> {
|
||||
const finalized = finalizeInboundContext(params.ctx);
|
||||
const foregroundReplyFence = beginForegroundReplyFence(finalized);
|
||||
let foregroundReplyFenceActive = true;
|
||||
const silentReplyContext = resolveDispatcherSilentReplyContext(finalized, params.cfg);
|
||||
const configuredBeforeDeliver =
|
||||
params.dispatcherOptions.beforeDeliver ?? buildMessageSendingBeforeDeliver(finalized);
|
||||
const beforeDeliver: ReplyDispatchBeforeDeliver | undefined = foregroundReplyFence
|
||||
? async (payload, info) => {
|
||||
const isSuperseded = () =>
|
||||
foregroundReplyFenceActive && isForegroundReplyFenceSuperseded(foregroundReplyFence);
|
||||
if (isSuperseded()) {
|
||||
return null;
|
||||
const beforeDeliver: ReplyDispatchBeforeDeliver | undefined =
|
||||
foregroundReplyFence || configuredBeforeDeliver
|
||||
? async (payload, info) => {
|
||||
if (isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
|
||||
return null;
|
||||
}
|
||||
const deliverPayload = configuredBeforeDeliver
|
||||
? await configuredBeforeDeliver(payload, info)
|
||||
: payload;
|
||||
if (!deliverPayload || isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
|
||||
return null;
|
||||
}
|
||||
return deliverPayload;
|
||||
}
|
||||
const deliverPayload = configuredBeforeDeliver
|
||||
? await configuredBeforeDeliver(payload, info)
|
||||
: payload;
|
||||
if (!deliverPayload || isSuperseded()) {
|
||||
return null;
|
||||
}
|
||||
return deliverPayload;
|
||||
}
|
||||
: configuredBeforeDeliver;
|
||||
: undefined;
|
||||
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
|
||||
createReplyDispatcherWithTyping({
|
||||
...params.dispatcherOptions,
|
||||
@@ -322,7 +325,6 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
|
||||
});
|
||||
} finally {
|
||||
if (foregroundReplyFence) {
|
||||
foregroundReplyFenceActive = false;
|
||||
endForegroundReplyFence(foregroundReplyFence);
|
||||
}
|
||||
markRunComplete();
|
||||
|
||||
Reference in New Issue
Block a user