From ccb43f95cb179a15dee61ebbdb93a50eb51599e9 Mon Sep 17 00:00:00 2001 From: "clawsweeper[bot]" <274271284+clawsweeper[bot]@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:05:28 -0700 Subject: [PATCH] fix(channels): suppress observe-only prepared dispatch Co-authored-by: openclaw-clawsweeper[bot] <280122609+openclaw-clawsweeper[bot]@users.noreply.github.com> --- src/channels/turn/kernel.test.ts | 34 ++++++++++++++ src/channels/turn/kernel.ts | 80 ++++++++++++++++++-------------- 2 files changed, 78 insertions(+), 36 deletions(-) diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 87c3c518890..be2ffc63e0b 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -127,6 +127,40 @@ describe("channel turn kernel", () => { ); }); + it("suppresses direct prepared dispatches for observe-only admission", async () => { + const events: string[] = []; + const recordInboundSession = createRecordInboundSession(events); + const runDispatch = vi.fn(async () => { + events.push("dispatch"); + return { + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }; + }); + const observeOnlyDispatchResult = { + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }; + + const result = await runPreparedChannelTurn({ + channel: "test", + routeSessionKey: "agent:observer:test:peer", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }), + recordInboundSession, + runDispatch, + observeOnlyDispatchResult, + admission: { kind: "observeOnly", reason: "broadcast-observer" }, + }); + + expect(events).toEqual(["record"]); + expect(runDispatch).not.toHaveBeenCalled(); + expect(result.admission).toEqual({ kind: "observeOnly", reason: "broadcast-observer" }); + expect(result.dispatched).toBe(true); + expect(result.dispatchResult).toBe(observeOnlyDispatchResult); + expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false); + }); + it("clears pending group history after a successful prepared turn", async () => { const historyMap = new Map([["room-1", [{ sender: "User", body: "queued before reply" }]]]); diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 6a319572a17..62f9009e1dd 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -125,33 +125,36 @@ function resolveObserveOnlyDispatchResult( export async function dispatchAssembledChannelTurn( params: AssembledChannelTurn, ): Promise { - return await runPreparedChannelTurn({ - channel: params.channel, - accountId: params.accountId, - routeSessionKey: params.routeSessionKey, - storePath: params.storePath, - ctxPayload: params.ctxPayload, - recordInboundSession: params.recordInboundSession, - record: params.record, - history: params.history, - admission: params.admission, - log: params.log, - messageId: params.messageId, - runDispatch: async () => - await params.dispatchReplyWithBufferedBlockDispatcher({ - ctx: params.ctxPayload, - cfg: params.cfg, - dispatcherOptions: { - ...params.dispatcherOptions, - deliver: async (payload: ReplyPayload, info) => { - await params.delivery.deliver(payload, info); + return await runPreparedChannelTurnCore( + { + channel: params.channel, + accountId: params.accountId, + routeSessionKey: params.routeSessionKey, + storePath: params.storePath, + ctxPayload: params.ctxPayload, + recordInboundSession: params.recordInboundSession, + record: params.record, + history: params.history, + admission: params.admission, + log: params.log, + messageId: params.messageId, + runDispatch: async () => + await params.dispatchReplyWithBufferedBlockDispatcher({ + ctx: params.ctxPayload, + cfg: params.cfg, + dispatcherOptions: { + ...params.dispatcherOptions, + deliver: async (payload: ReplyPayload, info) => { + await params.delivery.deliver(payload, info); + }, + onError: params.delivery.onError, }, - onError: params.delivery.onError, - }, - replyOptions: params.replyOptions, - replyResolver: params.replyResolver, - }), - }); + replyOptions: params.replyOptions, + replyResolver: params.replyResolver, + }), + }, + { suppressObserveOnlyDispatch: false }, + ); } function isPreparedChannelTurn( @@ -170,24 +173,18 @@ async function dispatchResolvedChannelTurn( }, ): Promise> { if (isPreparedChannelTurn(params)) { - return await runPreparedChannelTurn( - params.admission.kind === "observeOnly" - ? { - ...params, - runDispatch: async () => resolveObserveOnlyDispatchResult(params), - } - : params, - ); + return await runPreparedChannelTurn(params); } return (await dispatchAssembledChannelTurn( params, )) as DispatchedChannelTurnResult; } -export async function runPreparedChannelTurn< +async function runPreparedChannelTurnCore< TDispatchResult = DispatchedChannelTurnResult["dispatchResult"], >( params: PreparedChannelTurn, + options: { suppressObserveOnlyDispatch: boolean }, ): Promise> { const admission = params.admission ?? ({ kind: "dispatch" } as const); emit({ @@ -253,7 +250,10 @@ export async function runPreparedChannelTurn< }); let dispatchResult: TDispatchResult; try { - dispatchResult = await params.runDispatch(); + dispatchResult = + options.suppressObserveOnlyDispatch && admission.kind === "observeOnly" + ? resolveObserveOnlyDispatchResult(params) + : await params.runDispatch(); } catch (err) { emit({ ...params, @@ -289,6 +289,14 @@ export async function runPreparedChannelTurn< }; } +export async function runPreparedChannelTurn< + TDispatchResult = DispatchedChannelTurnResult["dispatchResult"], +>( + params: PreparedChannelTurn, +): Promise> { + return await runPreparedChannelTurnCore(params, { suppressObserveOnlyDispatch: true }); +} + export async function runChannelTurn< TRaw, TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],