fix(channels): preserve observe-only turn compatibility

This commit is contained in:
Peter Steinberger
2026-04-30 04:20:35 +01:00
parent 7a2bb2fcda
commit bbf932fd7d
7 changed files with 132 additions and 3 deletions

View File

@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Channels/groups: preserve observe-only turn suppression for prepared dispatch paths and restore deprecated channel turn runtime aliases, so passive observer/group flows stay silent while older plugins keep compiling. Thanks @vincentkoc.
- Feishu/Bitable: clean up newly created placeholder rows whose fields contain only default empty values while preserving meaningful link, attachment, user, number, boolean, and location values during create-app cleanup. (#73920) Carries forward #40602. Thanks @boat2moon.
- macOS app: keep attach-only mode and the Debug Settings launchd toggle marker-only, so launching with `--attach-only`/`--no-launchd` no longer uninstalls the Gateway LaunchAgent or drops active sessions. (#72174) Thanks @DolencLuka.
- Plugin SDK: restore the deprecated `plugin-sdk/zalouser` command-auth facade so published Lark/Zalo plugins that import it load on current hosts. Fixes #74702. Thanks @Goron01.

View File

@@ -342,6 +342,50 @@ describe("channel turn kernel", () => {
expect(result.dispatchResult.queuedFinal).toBe(true);
});
it("suppresses prepared dispatch for observe-only full turns", async () => {
const events: string[] = [];
const onFinalize = vi.fn();
const runDispatch = vi.fn(async () => {
events.push("custom-dispatch");
return {
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
};
});
const result = await runChannelTurn({
channel: "test",
raw: { id: "msg-1", text: "hello" },
adapter: {
ingest: () => ({ id: "msg-1", rawText: "hello" }),
preflight: () => ({ kind: "observeOnly", reason: "broadcast-observer" }),
resolveTurn: () => ({
channel: "test",
routeSessionKey: "agent:observer:test:peer",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }),
recordInboundSession: createRecordInboundSession(events),
runDispatch,
}),
onFinalize,
},
});
expect(result.admission).toEqual({ kind: "observeOnly", reason: "broadcast-observer" });
expect(result.dispatched).toBe(true);
expect(events).toEqual(["record"]);
expect(runDispatch).not.toHaveBeenCalled();
if (!result.dispatched) {
throw new Error("expected dispatch");
}
expect(hasFinalChannelTurnDispatch(result.dispatchResult)).toBe(false);
expect(onFinalize).toHaveBeenCalledWith(
expect.objectContaining({
admission: { kind: "observeOnly", reason: "broadcast-observer" },
dispatched: true,
}),
);
});
it("finalizes failed dispatches before rethrowing", async () => {
const onFinalize = vi.fn();
const dispatchError = new Error("dispatch failed");

View File

@@ -1,5 +1,6 @@
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { clearHistoryEntriesIfEnabled } from "../../auto-reply/reply/history.js";
import { EMPTY_CHANNEL_TURN_DISPATCH_COUNTS } from "./dispatch-result.js";
export { buildChannelTurnContext, filterChannelTurnSupplementalContext } from "./context.js";
export type { BuildChannelTurnContextParams } from "./context.js";
import type {
@@ -15,6 +16,7 @@ import type {
PreparedChannelTurn,
PreflightFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
} from "./types.js";
export {
EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
@@ -49,6 +51,7 @@ export type {
ReplyPlanFacts,
RouteFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
SenderFacts,
SupplementalContextFacts,
} from "./types.js";
@@ -110,6 +113,15 @@ function clearPendingHistoryAfterTurn(params?: ChannelTurnHistoryFinalizeOptions
});
}
function resolveObserveOnlyDispatchResult<TDispatchResult>(
params: PreparedChannelTurn<TDispatchResult>,
): TDispatchResult {
return (params.observeOnlyDispatchResult ?? {
queuedFinal: false,
counts: EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
}) as TDispatchResult;
}
export async function dispatchAssembledChannelTurn(
params: AssembledChannelTurn,
): Promise<DispatchedChannelTurnResult> {
@@ -158,7 +170,14 @@ async function dispatchResolvedChannelTurn<TDispatchResult>(
},
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
if (isPreparedChannelTurn(params)) {
return await runPreparedChannelTurn(params);
return await runPreparedChannelTurn(
params.admission.kind === "observeOnly"
? {
...params,
runDispatch: async () => resolveObserveOnlyDispatchResult(params),
}
: params,
);
}
return (await dispatchAssembledChannelTurn(
params,
@@ -431,3 +450,21 @@ export async function runChannelTurn<
return result;
}
export async function runResolvedChannelTurn<
TRaw,
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
>(
params: RunResolvedChannelTurnParams<TRaw, TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
return await runChannelTurn({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input),
resolveTurn: params.resolveTurn,
},
});
}

View File

@@ -235,6 +235,7 @@ export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
history?: ChannelTurnHistoryFinalizeOptions;
onPreDispatchFailure?: (err: unknown) => void | Promise<void>;
runDispatch: () => Promise<TDispatchResult>;
observeOnlyDispatchResult?: TDispatchResult;
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
log?: (event: ChannelTurnLogEvent) => void;
messageId?: string;
@@ -315,3 +316,18 @@ export type RunChannelTurnParams<TRaw, TDispatchResult = DispatchFromConfigResul
adapter: ChannelTurnAdapter<TRaw, TDispatchResult>;
log?: (event: ChannelTurnLogEvent) => void;
};
export type RunResolvedChannelTurnParams<TRaw, TDispatchResult = DispatchFromConfigResult> = {
channel: string;
accountId?: string;
raw: TRaw;
input:
| NormalizedTurnInput
| ((raw: TRaw) => Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null);
resolveTurn: (
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
preflight: PreflightFacts,
) => Promise<ChannelTurnResolved<TDispatchResult>> | ChannelTurnResolved<TDispatchResult>;
log?: (event: ChannelTurnLogEvent) => void;
};

View File

@@ -157,9 +157,16 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
}
throw err;
}
const dispatchResult = await params.runDispatch();
const admission = params.admission ?? { kind: "dispatch" as const };
const dispatchResult =
admission.kind === "observeOnly"
? (params.observeOnlyDispatchResult ?? {
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
})
: await params.runDispatch();
return {
admission: params.admission ?? { kind: "dispatch" as const },
admission,
dispatched: true,
ctxPayload: params.ctxPayload,
routeSessionKey: params.routeSessionKey,
@@ -617,8 +624,24 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
},
turn: {
run: runChannelTurnMock,
runResolved: vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) =>
await runChannelTurnMock({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: (raw) =>
typeof params.input === "function" ? params.input(raw) : params.input,
resolveTurn: params.resolveTurn,
},
}),
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
buildContext: buildChannelTurnContextMock,
runPrepared: runPreparedChannelTurnMock,
dispatchAssembled:
dispatchAssembledChannelTurnMock as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},
threadBindings: {
setIdleTimeoutBySessionKey:

View File

@@ -54,6 +54,8 @@ import {
buildChannelTurnContext,
runChannelTurn,
runPreparedChannelTurn,
runResolvedChannelTurn,
dispatchAssembledChannelTurn,
} from "../../channels/turn/kernel.js";
import {
resolveChannelGroupPolicy,
@@ -172,8 +174,10 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
},
turn: {
run: runChannelTurn,
runResolved: runResolvedChannelTurn,
buildContext: buildChannelTurnContext,
runPrepared: runPreparedChannelTurn,
dispatchAssembled: dispatchAssembledChannelTurn,
},
threadBindings: {
setIdleTimeoutBySessionKey: ({ channelId, targetSessionKey, accountId, idleTimeoutMs }) =>

View File

@@ -153,8 +153,12 @@ export type PluginRuntimeChannel = {
};
turn: {
run: typeof import("../../channels/turn/kernel.js").runChannelTurn;
/** @deprecated Prefer `run(...)`. */
runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn;
buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext;
runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn;
/** @deprecated Prefer `run(...)` or `runPrepared(...)`. */
dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;
};
threadBindings: {
setIdleTimeoutBySessionKey: (params: {