From bbf932fd7d69c32f800779d98312fbdcc3901c1b Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 30 Apr 2026 04:20:35 +0100 Subject: [PATCH] fix(channels): preserve observe-only turn compatibility --- CHANGELOG.md | 1 + src/channels/turn/kernel.test.ts | 44 +++++++++++++++++++ src/channels/turn/kernel.ts | 39 +++++++++++++++- src/channels/turn/types.ts | 16 +++++++ .../test-helpers/plugin-runtime-mock.ts | 27 +++++++++++- src/plugins/runtime/runtime-channel.ts | 4 ++ src/plugins/runtime/types-channel.ts | 4 ++ 7 files changed, 132 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e6af61b7d2..85c02a8d2b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Channels/groups: preserve observe-only turn suppression for prepared dispatch paths and restore deprecated channel turn runtime aliases, so passive observer/group flows stay silent while older plugins keep compiling. Thanks @vincentkoc. - Feishu/Bitable: clean up newly created placeholder rows whose fields contain only default empty values while preserving meaningful link, attachment, user, number, boolean, and location values during create-app cleanup. (#73920) Carries forward #40602. Thanks @boat2moon. - macOS app: keep attach-only mode and the Debug Settings launchd toggle marker-only, so launching with `--attach-only`/`--no-launchd` no longer uninstalls the Gateway LaunchAgent or drops active sessions. (#72174) Thanks @DolencLuka. - Plugin SDK: restore the deprecated `plugin-sdk/zalouser` command-auth facade so published Lark/Zalo plugins that import it load on current hosts. Fixes #74702. Thanks @Goron01. diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 5ffe58c7598..87c3c518890 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -342,6 +342,50 @@ describe("channel turn kernel", () => { expect(result.dispatchResult.queuedFinal).toBe(true); }); + it("suppresses prepared dispatch for observe-only full turns", async () => { + const events: string[] = []; + const onFinalize = vi.fn(); + const runDispatch = vi.fn(async () => { + events.push("custom-dispatch"); + return { + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }; + }); + const result = await runChannelTurn({ + channel: "test", + raw: { id: "msg-1", text: "hello" }, + adapter: { + ingest: () => ({ id: "msg-1", rawText: "hello" }), + preflight: () => ({ kind: "observeOnly", reason: "broadcast-observer" }), + resolveTurn: () => ({ + channel: "test", + routeSessionKey: "agent:observer:test:peer", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }), + recordInboundSession: createRecordInboundSession(events), + runDispatch, + }), + onFinalize, + }, + }); + + expect(result.admission).toEqual({ kind: "observeOnly", reason: "broadcast-observer" }); + expect(result.dispatched).toBe(true); + expect(events).toEqual(["record"]); + expect(runDispatch).not.toHaveBeenCalled(); + if (!result.dispatched) { + throw new Error("expected dispatch"); + } + expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false); + expect(onFinalize).toHaveBeenCalledWith( + expect.objectContaining({ + admission: { kind: "observeOnly", reason: "broadcast-observer" }, + dispatched: true, + }), + ); + }); + it("finalizes failed dispatches before rethrowing", async () => { const onFinalize = vi.fn(); const dispatchError = new Error("dispatch failed"); diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index b9da060445c..6a319572a17 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -1,5 +1,6 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import { clearHistoryEntriesIfEnabled } from "../../auto-reply/reply/history.js"; +import { EMPTY_CHANNEL_TURN_DISPATCH_COUNTS } from "./dispatch-result.js"; export { buildChannelTurnContext, filterChannelTurnSupplementalContext } from "./context.js"; export type { BuildChannelTurnContextParams } from "./context.js"; import type { @@ -15,6 +16,7 @@ import type { PreparedChannelTurn, PreflightFacts, RunChannelTurnParams, + RunResolvedChannelTurnParams, } from "./types.js"; export { EMPTY_CHANNEL_TURN_DISPATCH_COUNTS, @@ -49,6 +51,7 @@ export type { ReplyPlanFacts, RouteFacts, RunChannelTurnParams, + RunResolvedChannelTurnParams, SenderFacts, SupplementalContextFacts, } from "./types.js"; @@ -110,6 +113,15 @@ function clearPendingHistoryAfterTurn(params?: ChannelTurnHistoryFinalizeOptions }); } +function resolveObserveOnlyDispatchResult( + params: PreparedChannelTurn, +): TDispatchResult { + return (params.observeOnlyDispatchResult ?? { + queuedFinal: false, + counts: EMPTY_CHANNEL_TURN_DISPATCH_COUNTS, + }) as TDispatchResult; +} + export async function dispatchAssembledChannelTurn( params: AssembledChannelTurn, ): Promise { @@ -158,7 +170,14 @@ async function dispatchResolvedChannelTurn( }, ): Promise> { if (isPreparedChannelTurn(params)) { - return await runPreparedChannelTurn(params); + return await runPreparedChannelTurn( + params.admission.kind === "observeOnly" + ? { + ...params, + runDispatch: async () => resolveObserveOnlyDispatchResult(params), + } + : params, + ); } return (await dispatchAssembledChannelTurn( params, @@ -431,3 +450,21 @@ export async function runChannelTurn< return result; } + +export async function runResolvedChannelTurn< + TRaw, + TDispatchResult = DispatchedChannelTurnResult["dispatchResult"], +>( + params: RunResolvedChannelTurnParams, +): Promise> { + return await runChannelTurn({ + channel: params.channel, + accountId: params.accountId, + raw: params.raw, + log: params.log, + adapter: { + ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input), + resolveTurn: params.resolveTurn, + }, + }); +} diff --git a/src/channels/turn/types.ts b/src/channels/turn/types.ts index af302f9a2ca..eff2f6e7a42 100644 --- a/src/channels/turn/types.ts +++ b/src/channels/turn/types.ts @@ -235,6 +235,7 @@ export type PreparedChannelTurn = { history?: ChannelTurnHistoryFinalizeOptions; onPreDispatchFailure?: (err: unknown) => void | Promise; runDispatch: () => Promise; + observeOnlyDispatchResult?: TDispatchResult; admission?: Extract; log?: (event: ChannelTurnLogEvent) => void; messageId?: string; @@ -315,3 +316,18 @@ export type RunChannelTurnParams; log?: (event: ChannelTurnLogEvent) => void; }; + +export type RunResolvedChannelTurnParams = { + channel: string; + accountId?: string; + raw: TRaw; + input: + | NormalizedTurnInput + | ((raw: TRaw) => Promise | NormalizedTurnInput | null); + resolveTurn: ( + input: NormalizedTurnInput, + eventClass: ChannelEventClass, + preflight: PreflightFacts, + ) => Promise> | ChannelTurnResolved; + log?: (event: ChannelTurnLogEvent) => void; +}; diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index 0920b8e95cf..95cfda24494 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -157,9 +157,16 @@ export function createPluginRuntimeMock(overrides: DeepPartial = } throw err; } - const dispatchResult = await params.runDispatch(); + const admission = params.admission ?? { kind: "dispatch" as const }; + const dispatchResult = + admission.kind === "observeOnly" + ? (params.observeOnlyDispatchResult ?? { + queuedFinal: false, + counts: { tool: 0, block: 0, final: 0 }, + }) + : await params.runDispatch(); return { - admission: params.admission ?? { kind: "dispatch" as const }, + admission, dispatched: true, ctxPayload: params.ctxPayload, routeSessionKey: params.routeSessionKey, @@ -617,8 +624,24 @@ export function createPluginRuntimeMock(overrides: DeepPartial = }, turn: { run: runChannelTurnMock, + runResolved: vi.fn( + async (params: Parameters[0]) => + await runChannelTurnMock({ + channel: params.channel, + accountId: params.accountId, + raw: params.raw, + log: params.log, + adapter: { + ingest: (raw) => + typeof params.input === "function" ? params.input(raw) : params.input, + resolveTurn: params.resolveTurn, + }, + }), + ) as unknown as PluginRuntime["channel"]["turn"]["runResolved"], buildContext: buildChannelTurnContextMock, runPrepared: runPreparedChannelTurnMock, + dispatchAssembled: + dispatchAssembledChannelTurnMock as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, threadBindings: { setIdleTimeoutBySessionKey: diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index eab284e445b..e77d9571078 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -54,6 +54,8 @@ import { buildChannelTurnContext, runChannelTurn, runPreparedChannelTurn, + runResolvedChannelTurn, + dispatchAssembledChannelTurn, } from "../../channels/turn/kernel.js"; import { resolveChannelGroupPolicy, @@ -172,8 +174,10 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { }, turn: { run: runChannelTurn, + runResolved: runResolvedChannelTurn, buildContext: buildChannelTurnContext, runPrepared: runPreparedChannelTurn, + dispatchAssembled: dispatchAssembledChannelTurn, }, threadBindings: { setIdleTimeoutBySessionKey: ({ channelId, targetSessionKey, accountId, idleTimeoutMs }) => diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index a55ab37f968..dd1a9b4f439 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -153,8 +153,12 @@ export type PluginRuntimeChannel = { }; turn: { run: typeof import("../../channels/turn/kernel.js").runChannelTurn; + /** @deprecated Prefer `run(...)`. */ + runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn; buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext; runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn; + /** @deprecated Prefer `run(...)` or `runPrepared(...)`. */ + dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn; }; threadBindings: { setIdleTimeoutBySessionKey: (params: {