fix(channels): suppress observe-only prepared dispatch

Co-authored-by: openclaw-clawsweeper[bot] <280122609+openclaw-clawsweeper[bot]@users.noreply.github.com>
This commit is contained in:
clawsweeper[bot]
2026-04-29 22:05:28 -07:00
committed by GitHub
parent 87a211d309
commit ccb43f95cb
2 changed files with 78 additions and 36 deletions

View File

@@ -127,6 +127,40 @@ describe("channel turn kernel", () => {
);
});
it("suppresses direct prepared dispatches for observe-only admission", async () => {
const events: string[] = [];
const recordInboundSession = createRecordInboundSession(events);
const runDispatch = vi.fn(async () => {
events.push("dispatch");
return {
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
};
});
const observeOnlyDispatchResult = {
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
};
const result = await runPreparedChannelTurn({
channel: "test",
routeSessionKey: "agent:observer:test:peer",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }),
recordInboundSession,
runDispatch,
observeOnlyDispatchResult,
admission: { kind: "observeOnly", reason: "broadcast-observer" },
});
expect(events).toEqual(["record"]);
expect(runDispatch).not.toHaveBeenCalled();
expect(result.admission).toEqual({ kind: "observeOnly", reason: "broadcast-observer" });
expect(result.dispatched).toBe(true);
expect(result.dispatchResult).toBe(observeOnlyDispatchResult);
expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false);
});
it("clears pending group history after a successful prepared turn", async () => {
const historyMap = new Map([["room-1", [{ sender: "User", body: "queued before reply" }]]]);

View File

@@ -125,33 +125,36 @@ function resolveObserveOnlyDispatchResult<TDispatchResult>(
export async function dispatchAssembledChannelTurn(
params: AssembledChannelTurn,
): Promise<DispatchedChannelTurnResult> {
return await runPreparedChannelTurn({
channel: params.channel,
accountId: params.accountId,
routeSessionKey: params.routeSessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
record: params.record,
history: params.history,
admission: params.admission,
log: params.log,
messageId: params.messageId,
runDispatch: async () =>
await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
cfg: params.cfg,
dispatcherOptions: {
...params.dispatcherOptions,
deliver: async (payload: ReplyPayload, info) => {
await params.delivery.deliver(payload, info);
return await runPreparedChannelTurnCore(
{
channel: params.channel,
accountId: params.accountId,
routeSessionKey: params.routeSessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
record: params.record,
history: params.history,
admission: params.admission,
log: params.log,
messageId: params.messageId,
runDispatch: async () =>
await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
cfg: params.cfg,
dispatcherOptions: {
...params.dispatcherOptions,
deliver: async (payload: ReplyPayload, info) => {
await params.delivery.deliver(payload, info);
},
onError: params.delivery.onError,
},
onError: params.delivery.onError,
},
replyOptions: params.replyOptions,
replyResolver: params.replyResolver,
}),
});
replyOptions: params.replyOptions,
replyResolver: params.replyResolver,
}),
},
{ suppressObserveOnlyDispatch: false },
);
}
function isPreparedChannelTurn<TDispatchResult>(
@@ -170,24 +173,18 @@ async function dispatchResolvedChannelTurn<TDispatchResult>(
},
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
if (isPreparedChannelTurn(params)) {
return await runPreparedChannelTurn(
params.admission.kind === "observeOnly"
? {
...params,
runDispatch: async () => resolveObserveOnlyDispatchResult(params),
}
: params,
);
return await runPreparedChannelTurn(params);
}
return (await dispatchAssembledChannelTurn(
params,
)) as DispatchedChannelTurnResult<TDispatchResult>;
}
export async function runPreparedChannelTurn<
async function runPreparedChannelTurnCore<
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
>(
params: PreparedChannelTurn<TDispatchResult>,
options: { suppressObserveOnlyDispatch: boolean },
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
const admission = params.admission ?? ({ kind: "dispatch" } as const);
emit({
@@ -253,7 +250,10 @@ export async function runPreparedChannelTurn<
});
let dispatchResult: TDispatchResult;
try {
dispatchResult = await params.runDispatch();
dispatchResult =
options.suppressObserveOnlyDispatch && admission.kind === "observeOnly"
? resolveObserveOnlyDispatchResult(params)
: await params.runDispatch();
} catch (err) {
emit({
...params,
@@ -289,6 +289,14 @@ export async function runPreparedChannelTurn<
};
}
export async function runPreparedChannelTurn<
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
return await runPreparedChannelTurnCore(params, { suppressObserveOnlyDispatch: true });
}
export async function runChannelTurn<
TRaw,
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],