mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:50:42 +00:00
fix: share reply media context (#68111) (thanks @ayeshakhalid192007-dev)
This commit is contained in:
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Hooks/Slack: standardize shared message hook routing fields (`threadId` / `replyToId`) and stop Slack outbound delivery from re-running `message_sending` inside the channel adapter, so plugins like thread-ownership make one outbound routing decision per reply. Thanks @vincentkoc.
|
||||
- Auto-reply/media: share one run-scoped reply media context between streamed block delivery and final payload filtering, so a local `MEDIA:` attachment is staged once and duplicate media sends are suppressed reliably. (#68111) Thanks @ayeshakhalid192007-dev.
|
||||
- Gateway/restart: preserve group and channel chat context when resuming an agent turn after a Gateway restart, so continuation replies keep the same prompt, routing, and tool-status behavior as the original conversation.
|
||||
- Gateway/pairing: shared-secret loopback CLI clients now silently auto-approve `metadata-upgrade` pairing (platform / device family refresh) instead of being disconnected with `1008 pairing required`. This matches the scope-upgrade and role-upgrade behavior added in #69431 and unblocks non-interactive CLI automation when a paired-device record has a stale platform string (e.g. device key replicated across hosts, install migrated between OSes, or platform-string format changed between OpenClaw versions). Browser / Control-UI clients keep the existing approval-required flow for metadata changes.
|
||||
- Gateway/pairing: treat any forwarded-header evidence (`Forwarded`, `X-Forwarded-*`, or `X-Real-IP`) as proxied WebSocket traffic before pairing locality checks, so reverse-proxy topologies cannot use the loopback shared-secret helper auto-pairing path.
|
||||
|
||||
@@ -20,6 +20,7 @@ const sentinelError = new Error("stop-after-preflight");
|
||||
const resolveQueuedReplyExecutionConfigMock = vi.fn();
|
||||
const resolveReplyToModeMock = vi.fn();
|
||||
const createReplyToModeFilterForChannelMock = vi.fn();
|
||||
const createReplyMediaContextMock = vi.fn();
|
||||
const createReplyMediaPathNormalizerMock = vi.fn();
|
||||
const runPreflightCompactionIfNeededMock = vi.fn();
|
||||
const runMemoryFlushIfNeededMock = vi.fn();
|
||||
@@ -37,6 +38,12 @@ vi.mock("./reply-threading.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./reply-media-paths.js", () => ({
|
||||
createReplyMediaContext: (...args: unknown[]) => {
|
||||
createReplyMediaContextMock(...args);
|
||||
return {
|
||||
normalizePayload: createReplyMediaPathNormalizerMock(...args),
|
||||
};
|
||||
},
|
||||
createReplyMediaPathNormalizer: (...args: unknown[]) =>
|
||||
createReplyMediaPathNormalizerMock(...args),
|
||||
}));
|
||||
@@ -112,6 +119,7 @@ describe("runReplyAgent runtime config", () => {
|
||||
resolveQueuedReplyExecutionConfigMock.mockReset();
|
||||
resolveReplyToModeMock.mockReset();
|
||||
createReplyToModeFilterForChannelMock.mockReset();
|
||||
createReplyMediaContextMock.mockReset();
|
||||
createReplyMediaPathNormalizerMock.mockReset();
|
||||
runPreflightCompactionIfNeededMock.mockReset();
|
||||
runMemoryFlushIfNeededMock.mockReset();
|
||||
@@ -142,7 +150,7 @@ describe("runReplyAgent runtime config", () => {
|
||||
}),
|
||||
);
|
||||
expect(resolveReplyToModeMock).toHaveBeenCalledWith(freshCfg, "telegram", "default", "dm");
|
||||
expect(createReplyMediaPathNormalizerMock).toHaveBeenCalledWith({
|
||||
expect(createReplyMediaContextMock).toHaveBeenCalledWith({
|
||||
cfg: freshCfg,
|
||||
sessionKey: undefined,
|
||||
workspaceDir: "/tmp",
|
||||
|
||||
@@ -118,6 +118,9 @@ vi.mock("./reply-delivery.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./reply-media-paths.runtime.js", () => ({
|
||||
createReplyMediaContext: () => ({
|
||||
normalizePayload: (payload: unknown) => payload,
|
||||
}),
|
||||
createReplyMediaPathNormalizer: () => (payload: unknown) => payload,
|
||||
}));
|
||||
|
||||
|
||||
@@ -71,7 +71,8 @@ import {
|
||||
import { type BlockReplyPipeline } from "./block-reply-pipeline.js";
|
||||
import type { FollowupRun } from "./queue.js";
|
||||
import { createBlockReplyDeliveryHandler } from "./reply-delivery.js";
|
||||
import { createReplyMediaPathNormalizer } from "./reply-media-paths.runtime.js";
|
||||
import type { ReplyMediaContext } from "./reply-media-paths.js";
|
||||
import { createReplyMediaContext } from "./reply-media-paths.runtime.js";
|
||||
import type { ReplyOperation } from "./reply-run-registry.js";
|
||||
import type { TypingSignaler } from "./typing-mode.js";
|
||||
|
||||
@@ -585,7 +586,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
activeSessionStore?: Record<string, SessionEntry>;
|
||||
storePath?: string;
|
||||
resolvedVerboseLevel: VerboseLevel;
|
||||
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
|
||||
replyMediaContext?: ReplyMediaContext;
|
||||
}): Promise<AgentRunLoopResult> {
|
||||
const TRANSIENT_HTTP_RETRY_DELAY_MS = 2_500;
|
||||
let didLogHeartbeatStrip = false;
|
||||
@@ -602,9 +603,9 @@ export async function runAgentTurnWithFallback(params: {
|
||||
};
|
||||
|
||||
const runId = params.opts?.runId ?? crypto.randomUUID();
|
||||
const normalizeReplyMediaPaths =
|
||||
params.normalizeMediaPaths ??
|
||||
createReplyMediaPathNormalizer({
|
||||
const replyMediaContext =
|
||||
params.replyMediaContext ??
|
||||
createReplyMediaContext({
|
||||
cfg: runtimeConfig,
|
||||
sessionKey: params.sessionKey,
|
||||
workspaceDir: params.followupRun.run.workspaceDir,
|
||||
@@ -838,7 +839,7 @@ export async function runAgentTurnWithFallback(params: {
|
||||
currentMessageId: params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
|
||||
normalizeStreamingText,
|
||||
applyReplyToMode: params.applyReplyToMode,
|
||||
normalizeMediaPaths: normalizeReplyMediaPaths,
|
||||
normalizeMediaPaths: replyMediaContext.normalizePayload,
|
||||
typingSignals: params.typingSignals,
|
||||
blockStreamingEnabled: params.blockStreamingEnabled,
|
||||
blockReplyPipeline,
|
||||
|
||||
@@ -17,7 +17,7 @@ const enqueueFollowupRunMock = vi.fn();
|
||||
const scheduleFollowupDrainMock = vi.fn();
|
||||
const refreshQueuedFollowupSessionMock = vi.fn();
|
||||
const resolveOutboundAttachmentFromUrlMock = vi.fn();
|
||||
const createReplyMediaPathNormalizerRuntimeMock = vi.fn();
|
||||
const createReplyMediaContextRuntimeMock = vi.fn();
|
||||
|
||||
vi.mock("../../agents/model-fallback.js", () => ({
|
||||
runWithModelFallback: (params: {
|
||||
@@ -54,14 +54,15 @@ vi.mock("../../media/outbound-attachment.js", () => ({
|
||||
}));
|
||||
|
||||
// Spy on the .runtime import path used by agent-runner-execution.ts so we can assert
|
||||
// that the fix prevents a second normalizer from being created inside runAgentTurnWithFallback.
|
||||
// that the fix prevents a second media context from being created inside runAgentTurnWithFallback.
|
||||
vi.mock("./reply-media-paths.runtime.js", async (importOriginal) => {
|
||||
const mod = await importOriginal<typeof import("./reply-media-paths.runtime.js")>();
|
||||
return {
|
||||
createReplyMediaPathNormalizer: (...args: Parameters<typeof mod.createReplyMediaPathNormalizer>) => {
|
||||
createReplyMediaPathNormalizerRuntimeMock(...args);
|
||||
return mod.createReplyMediaPathNormalizer(...args);
|
||||
createReplyMediaContext: (...args: Parameters<typeof mod.createReplyMediaContext>) => {
|
||||
createReplyMediaContextRuntimeMock(...args);
|
||||
return mod.createReplyMediaContext(...args);
|
||||
},
|
||||
createReplyMediaPathNormalizer: mod.createReplyMediaPathNormalizer,
|
||||
};
|
||||
});
|
||||
|
||||
@@ -86,7 +87,7 @@ describe("runReplyAgent media path normalization", () => {
|
||||
scheduleFollowupDrainMock.mockReset();
|
||||
refreshQueuedFollowupSessionMock.mockReset();
|
||||
resolveOutboundAttachmentFromUrlMock.mockReset();
|
||||
createReplyMediaPathNormalizerRuntimeMock.mockReset();
|
||||
createReplyMediaContextRuntimeMock.mockReset();
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
resolveOutboundAttachmentFromUrlMock.mockImplementation(async (mediaUrl: string) => ({
|
||||
path: path.join("/tmp/outbound-media", path.basename(mediaUrl)),
|
||||
@@ -175,15 +176,94 @@ describe("runReplyAgent media path normalization", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("does not create a second normalizer inside runAgentTurnWithFallback when onBlockReply is provided", async () => {
|
||||
it("shares one media cache between direct block media and final payload filtering", async () => {
|
||||
let stagedIndex = 0;
|
||||
resolveOutboundAttachmentFromUrlMock.mockImplementation(async (mediaUrl: string) => {
|
||||
stagedIndex += 1;
|
||||
return {
|
||||
path: path.join("/tmp/outbound-media", `${stagedIndex}-${path.basename(mediaUrl)}`),
|
||||
};
|
||||
});
|
||||
const onBlockReply = vi.fn();
|
||||
runEmbeddedPiAgentMock.mockImplementation(
|
||||
async (params: {
|
||||
onBlockReply?: (payload: { text?: string; mediaUrls?: string[] }) => Promise<void>;
|
||||
}) => {
|
||||
await params.onBlockReply?.({
|
||||
text: "here is the chart\nMEDIA:./out/chart.png",
|
||||
});
|
||||
return {
|
||||
payloads: [{ text: "here is the chart\nMEDIA:./out/chart.png" }],
|
||||
meta: {
|
||||
agentMeta: {
|
||||
sessionId: "session",
|
||||
provider: "anthropic",
|
||||
model: "claude",
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
const result = await runReplyAgent({
|
||||
commandBody: "generate chart",
|
||||
followupRun: createMockFollowupRun({
|
||||
prompt: "generate chart",
|
||||
run: {
|
||||
agentId: "main",
|
||||
agentDir: "/tmp/agent",
|
||||
messageProvider: "whatsapp",
|
||||
workspaceDir: "/tmp/workspace",
|
||||
},
|
||||
}) as unknown as FollowupRun,
|
||||
queueKey: "main",
|
||||
resolvedQueue: { mode: "interrupt" } as QueueSettings,
|
||||
shouldSteer: false,
|
||||
shouldFollowup: false,
|
||||
isActive: false,
|
||||
isStreaming: false,
|
||||
typing: createMockTypingController(),
|
||||
sessionCtx: {
|
||||
Provider: "whatsapp",
|
||||
Surface: "whatsapp",
|
||||
To: "chat-1",
|
||||
OriginatingTo: "chat-1",
|
||||
AccountId: "default",
|
||||
MessageSid: "msg-1",
|
||||
} as unknown as TemplateContext,
|
||||
defaultModel: "anthropic/claude",
|
||||
resolvedVerboseLevel: "off",
|
||||
isNewSession: false,
|
||||
blockStreamingEnabled: false,
|
||||
resolvedBlockStreamingBreak: "message_end",
|
||||
shouldInjectGroupIntro: false,
|
||||
typingMode: "instant",
|
||||
opts: {
|
||||
onBlockReply,
|
||||
},
|
||||
});
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
expect(resolveOutboundAttachmentFromUrlMock).toHaveBeenCalledTimes(1);
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||
expect(onBlockReply).toHaveBeenCalledWith({
|
||||
text: undefined,
|
||||
mediaUrl: "/tmp/outbound-media/1-chart.png",
|
||||
mediaUrls: ["/tmp/outbound-media/1-chart.png"],
|
||||
replyToCurrent: false,
|
||||
replyToId: "msg-1",
|
||||
replyToTag: false,
|
||||
audioAsVoice: false,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not create a second media context inside runAgentTurnWithFallback when onBlockReply is provided", async () => {
|
||||
// Regression test for openclaw/openclaw#68056.
|
||||
// Before the fix, runAgentTurnWithFallback always called createReplyMediaPathNormalizer
|
||||
// from reply-media-paths.runtime.js to build its own normalizer instance — separate from
|
||||
// the one agent-runner.ts created and passed to buildReplyPayloads. Two separate
|
||||
// persistedMediaBySource caches meant the same source could be persisted twice (two UUID
|
||||
// outbound files, two WhatsApp sends).
|
||||
// Before the fix, runAgentTurnWithFallback created its own media context, separate from
|
||||
// the one agent-runner.ts created and passed to buildReplyPayloads. Two separate caches
|
||||
// meant the same source could be persisted twice (two UUID outbound files, two sends).
|
||||
//
|
||||
// After the fix, agent-runner.ts passes its normalizer into runAgentTurnWithFallback, so
|
||||
// After the fix, agent-runner.ts passes its media context into runAgentTurnWithFallback, so
|
||||
// the .runtime import path is never called from inside that function.
|
||||
runEmbeddedPiAgentMock.mockResolvedValue({
|
||||
payloads: [],
|
||||
@@ -234,9 +314,9 @@ describe("runReplyAgent media path normalization", () => {
|
||||
},
|
||||
});
|
||||
|
||||
// The .runtime import is only used by agent-runner-execution.ts. After the fix,
|
||||
// runAgentTurnWithFallback receives the normalizer from the caller and never
|
||||
// calls createReplyMediaPathNormalizer itself.
|
||||
expect(createReplyMediaPathNormalizerRuntimeMock).not.toHaveBeenCalled();
|
||||
// The .runtime import is only used by agent-runner-execution.ts. After the fix,
|
||||
// runAgentTurnWithFallback receives the context from the caller and never
|
||||
// creates its own.
|
||||
expect(createReplyMediaContextRuntimeMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -64,7 +64,7 @@ import {
|
||||
type FollowupRun,
|
||||
type QueueSettings,
|
||||
} from "./queue.js";
|
||||
import { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
|
||||
import { createReplyMediaContext } from "./reply-media-paths.js";
|
||||
import {
|
||||
createReplyOperation,
|
||||
ReplyRunAlreadyActiveError,
|
||||
@@ -1036,7 +1036,7 @@ export async function runReplyAgent(params: {
|
||||
);
|
||||
const applyReplyToMode = createReplyToModeFilterForChannel(replyToMode, replyToChannel);
|
||||
const cfg = followupRun.run.config;
|
||||
const normalizeReplyMediaPaths = createReplyMediaPathNormalizer({
|
||||
const replyMediaContext = createReplyMediaContext({
|
||||
cfg,
|
||||
sessionKey,
|
||||
workspaceDir: followupRun.run.workspaceDir,
|
||||
@@ -1212,7 +1212,7 @@ export async function runReplyAgent(params: {
|
||||
activeSessionStore,
|
||||
storePath,
|
||||
resolvedVerboseLevel,
|
||||
normalizeMediaPaths: normalizeReplyMediaPaths,
|
||||
replyMediaContext,
|
||||
});
|
||||
|
||||
if (runOutcome.kind === "final") {
|
||||
@@ -1367,7 +1367,7 @@ export async function runReplyAgent(params: {
|
||||
to: sessionCtx.To,
|
||||
}),
|
||||
accountId: sessionCtx.AccountId,
|
||||
normalizeMediaPaths: normalizeReplyMediaPaths,
|
||||
normalizeMediaPaths: replyMediaContext.normalizePayload,
|
||||
});
|
||||
const { replyPayloads } = payloadResult;
|
||||
didLogHeartbeatStrip = payloadResult.didLogHeartbeatStrip;
|
||||
|
||||
@@ -1 +1 @@
|
||||
export { createReplyMediaPathNormalizer } from "./reply-media-paths.js";
|
||||
export { createReplyMediaContext, createReplyMediaPathNormalizer } from "./reply-media-paths.js";
|
||||
|
||||
@@ -236,3 +236,15 @@ export function createReplyMediaPathNormalizer(params: {
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export type ReplyMediaContext = {
|
||||
normalizePayload: (payload: ReplyPayload) => Promise<ReplyPayload>;
|
||||
};
|
||||
|
||||
export function createReplyMediaContext(
|
||||
params: Parameters<typeof createReplyMediaPathNormalizer>[0],
|
||||
): ReplyMediaContext {
|
||||
return {
|
||||
normalizePayload: createReplyMediaPathNormalizer(params),
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user