mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:50:43 +00:00
fix(tts): keep media-only no-reply payloads
This commit is contained in:
@@ -90,6 +90,7 @@ export function createSubscriptionMock(): SubscriptionMock {
|
||||
getMessagingToolSentTexts: () => [] as string[],
|
||||
getMessagingToolSentMediaUrls: () => [] as string[],
|
||||
getMessagingToolSentTargets: () => [] as MessagingToolSend[],
|
||||
getPendingToolMediaReply: () => null,
|
||||
getSuccessfulCronAdds: () => 0,
|
||||
getReplayState: () => ({
|
||||
replayInvalid: false,
|
||||
|
||||
@@ -2044,6 +2044,7 @@ export async function runEmbeddedAttempt(
|
||||
getMessagingToolSentTexts,
|
||||
getMessagingToolSentMediaUrls,
|
||||
getMessagingToolSentTargets,
|
||||
getPendingToolMediaReply,
|
||||
getSuccessfulCronAdds,
|
||||
getReplayState,
|
||||
didSendViaMessagingTool,
|
||||
@@ -2994,6 +2995,7 @@ export async function runEmbeddedAttempt(
|
||||
messagingToolSentMediaUrls: getMessagingToolSentMediaUrls(),
|
||||
successfulCronAdds: getSuccessfulCronAdds(),
|
||||
});
|
||||
const pendingToolMediaReply = getPendingToolMediaReply();
|
||||
const replayMetadata = replayMetadataFromState(
|
||||
observeReplayMetadata(getReplayState(), observedReplayMetadata),
|
||||
);
|
||||
@@ -3077,6 +3079,8 @@ export async function runEmbeddedAttempt(
|
||||
messagingToolSentTexts: getMessagingToolSentTexts(),
|
||||
messagingToolSentMediaUrls: getMessagingToolSentMediaUrls(),
|
||||
messagingToolSentTargets: getMessagingToolSentTargets(),
|
||||
toolMediaUrls: pendingToolMediaReply?.mediaUrls,
|
||||
toolAudioAsVoice: pendingToolMediaReply?.audioAsVoice,
|
||||
successfulCronAdds: getSuccessfulCronAdds(),
|
||||
cloudCodeAssistFormatError: Boolean(
|
||||
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
|
||||
|
||||
@@ -12,10 +12,13 @@ function createContext(
|
||||
overrides?: {
|
||||
onAgentEvent?: (event: unknown) => void;
|
||||
onBeforeLifecycleTerminal?: () => void | Promise<void>;
|
||||
onBlockReply?: ((payload: unknown) => void) | undefined;
|
||||
onBlockReplyFlush?: () => void | Promise<void>;
|
||||
},
|
||||
): EmbeddedPiSubscribeContext {
|
||||
const onBlockReply = vi.fn();
|
||||
const hasOnBlockReplyOverride = Boolean(overrides && "onBlockReply" in overrides);
|
||||
const onBlockReply = hasOnBlockReplyOverride ? overrides?.onBlockReply : vi.fn();
|
||||
const emitBlockReply = vi.fn();
|
||||
return {
|
||||
params: {
|
||||
runId: "run-1",
|
||||
@@ -23,7 +26,7 @@ function createContext(
|
||||
sessionKey: "agent:main:main",
|
||||
onAgentEvent: overrides?.onAgentEvent,
|
||||
onBeforeLifecycleTerminal: overrides?.onBeforeLifecycleTerminal,
|
||||
onBlockReply,
|
||||
...(onBlockReply ? { onBlockReply } : {}),
|
||||
onBlockReplyFlush: overrides?.onBlockReplyFlush,
|
||||
},
|
||||
state: {
|
||||
@@ -43,7 +46,7 @@ function createContext(
|
||||
warn: vi.fn(),
|
||||
},
|
||||
flushBlockReplyBuffer: vi.fn(),
|
||||
emitBlockReply: onBlockReply,
|
||||
emitBlockReply,
|
||||
resolveCompactionRetry: vi.fn(),
|
||||
maybeResolveCompactionWait: vi.fn(),
|
||||
} as unknown as EmbeddedPiSubscribeContext;
|
||||
@@ -321,6 +324,18 @@ describe("handleAgentEnd", () => {
|
||||
expect(ctx.state.pendingToolAudioAsVoice).toBe(false);
|
||||
});
|
||||
|
||||
it("preserves orphaned tool media when no block reply callback is configured", async () => {
|
||||
const ctx = createContext(undefined, { onBlockReply: undefined });
|
||||
ctx.state.pendingToolMediaUrls = ["/tmp/reply.opus"];
|
||||
ctx.state.pendingToolAudioAsVoice = true;
|
||||
|
||||
await handleAgentEnd(ctx);
|
||||
|
||||
expect(ctx.emitBlockReply).not.toHaveBeenCalled();
|
||||
expect(ctx.state.pendingToolMediaUrls).toEqual(["/tmp/reply.opus"]);
|
||||
expect(ctx.state.pendingToolAudioAsVoice).toBe(true);
|
||||
});
|
||||
|
||||
it("emits orphaned tool media before the lifecycle end event", async () => {
|
||||
const onAgentEvent = vi.fn();
|
||||
const ctx = createContext(undefined, { onAgentEvent });
|
||||
|
||||
@@ -167,9 +167,11 @@ export function handleAgentEnd(ctx: EmbeddedPiSubscribeContext): void | Promise<
|
||||
};
|
||||
|
||||
const flushPendingMediaAndChannel = () => {
|
||||
const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state);
|
||||
if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) {
|
||||
ctx.emitBlockReply(pendingToolMediaReply);
|
||||
if (ctx.params.onBlockReply) {
|
||||
const pendingToolMediaReply = consumePendingToolMediaReply(ctx.state);
|
||||
if (pendingToolMediaReply && hasAssistantVisibleReply(pendingToolMediaReply)) {
|
||||
ctx.emitBlockReply(pendingToolMediaReply);
|
||||
}
|
||||
}
|
||||
|
||||
const postMediaFlushResult = ctx.flushBlockReplyBuffer();
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
handleMessageEnd,
|
||||
handleMessageUpdate,
|
||||
hasAssistantVisibleReply,
|
||||
readPendingToolMediaReply,
|
||||
recordPendingAssistantReplyDirectives,
|
||||
resolveSilentReplyFallbackText,
|
||||
} from "./pi-embedded-subscribe.handlers.messages.js";
|
||||
@@ -394,6 +395,21 @@ describe("consumePendingToolMediaIntoReply", () => {
|
||||
});
|
||||
|
||||
describe("consumePendingToolMediaReply", () => {
|
||||
it("reads a media-only reply without consuming queued tool media", () => {
|
||||
const state = {
|
||||
pendingToolMediaUrls: ["/tmp/reply.opus"],
|
||||
pendingToolAudioAsVoice: true,
|
||||
pendingToolTrustedLocalMedia: false,
|
||||
};
|
||||
|
||||
expect(readPendingToolMediaReply(state)).toEqual({
|
||||
mediaUrls: ["/tmp/reply.opus"],
|
||||
audioAsVoice: true,
|
||||
});
|
||||
expect(state.pendingToolMediaUrls).toEqual(["/tmp/reply.opus"]);
|
||||
expect(state.pendingToolAudioAsVoice).toBe(true);
|
||||
});
|
||||
|
||||
it("builds a media-only reply for orphaned tool media", () => {
|
||||
const state = {
|
||||
pendingToolMediaUrls: ["/tmp/reply.opus"],
|
||||
|
||||
@@ -212,6 +212,20 @@ export function consumePendingToolMediaReply(
|
||||
EmbeddedPiSubscribeState,
|
||||
"pendingToolMediaUrls" | "pendingToolAudioAsVoice" | "pendingToolTrustedLocalMedia"
|
||||
>,
|
||||
): BlockReplyPayload | null {
|
||||
const payload = readPendingToolMediaReply(state);
|
||||
if (!payload) {
|
||||
return null;
|
||||
}
|
||||
clearPendingToolMedia(state);
|
||||
return payload;
|
||||
}
|
||||
|
||||
export function readPendingToolMediaReply(
|
||||
state: Pick<
|
||||
EmbeddedPiSubscribeState,
|
||||
"pendingToolMediaUrls" | "pendingToolAudioAsVoice" | "pendingToolTrustedLocalMedia"
|
||||
>,
|
||||
): BlockReplyPayload | null {
|
||||
if (
|
||||
state.pendingToolMediaUrls.length === 0 &&
|
||||
@@ -220,15 +234,13 @@ export function consumePendingToolMediaReply(
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
const payload: BlockReplyPayload = {
|
||||
return {
|
||||
mediaUrls: state.pendingToolMediaUrls.length
|
||||
? Array.from(new Set(state.pendingToolMediaUrls))
|
||||
: undefined,
|
||||
audioAsVoice: state.pendingToolAudioAsVoice || undefined,
|
||||
trustedLocalMedia: state.pendingToolTrustedLocalMedia || undefined,
|
||||
};
|
||||
clearPendingToolMedia(state);
|
||||
return payload;
|
||||
}
|
||||
|
||||
function hasReplyDirectiveMetadata(parsed: ReplyDirectiveParseResult | null | undefined): boolean {
|
||||
|
||||
@@ -414,6 +414,34 @@ describe("subscribeEmbeddedPiSession", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps orphaned tool media available for non-block final payload assembly", () => {
|
||||
const { emit, subscription } = createSubscribedSessionHarness({
|
||||
runId: "run",
|
||||
builtinToolNames: new Set(["tts"]),
|
||||
});
|
||||
|
||||
emit({
|
||||
type: "tool_execution_end",
|
||||
toolName: "tts",
|
||||
toolCallId: "tc-1",
|
||||
isError: false,
|
||||
result: {
|
||||
details: {
|
||||
media: {
|
||||
mediaUrl: "/tmp/reply.opus",
|
||||
audioAsVoice: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
emit({ type: "agent_end" });
|
||||
|
||||
expect(subscription.getPendingToolMediaReply()).toEqual({
|
||||
mediaUrls: ["/tmp/reply.opus"],
|
||||
audioAsVoice: true,
|
||||
});
|
||||
});
|
||||
|
||||
it.each(THINKING_TAG_CASES)(
|
||||
"suppresses <%s> blocks across chunk boundaries",
|
||||
async ({ open, close }) => {
|
||||
|
||||
@@ -24,6 +24,7 @@ import { createEmbeddedPiSessionEventHandler } from "./pi-embedded-subscribe.han
|
||||
import {
|
||||
consumePendingAssistantReplyDirectivesIntoReply,
|
||||
consumePendingToolMediaIntoReply,
|
||||
readPendingToolMediaReply,
|
||||
} from "./pi-embedded-subscribe.handlers.messages.js";
|
||||
import type {
|
||||
EmbeddedPiSubscribeContext,
|
||||
@@ -866,6 +867,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
|
||||
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
|
||||
getMessagingToolSentMediaUrls: () => messagingToolSentMediaUrls.slice(),
|
||||
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
|
||||
getPendingToolMediaReply: () => readPendingToolMediaReply(state),
|
||||
getSuccessfulCronAdds: () => state.successfulCronAdds,
|
||||
getReplayState: () => ({ ...state.replayState }),
|
||||
// Returns true if any messaging tool successfully sent a message.
|
||||
|
||||
Reference in New Issue
Block a user