diff --git a/extensions/discord/src/monitor/message-forwarded.ts b/extensions/discord/src/monitor/message-forwarded.ts index f6045757eed..10ce8c8b178 100644 --- a/extensions/discord/src/monitor/message-forwarded.ts +++ b/extensions/discord/src/monitor/message-forwarded.ts @@ -83,6 +83,13 @@ export function resolveDiscordReferencedForwardMessage(message: Message): Messag : null; } +export function resolveDiscordReferencedReplyMessage(message: Message): Message | null { + const referenceType = message.messageReference?.type; + return Number(referenceType) === FORWARD_MESSAGE_REFERENCE_TYPE + ? null + : (message.referencedMessage ?? null); +} + export function formatDiscordSnapshotAuthor( author: DiscordSnapshotAuthor | null | undefined, ): string | undefined { diff --git a/extensions/discord/src/monitor/message-handler.context.ts b/extensions/discord/src/monitor/message-handler.context.ts index 9a7af896094..5ccd6df8c96 100644 --- a/extensions/discord/src/monitor/message-handler.context.ts +++ b/extensions/discord/src/monitor/message-handler.context.ts @@ -26,11 +26,16 @@ import { import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; import { buildDiscordMediaPayload, + resolveReferencedReplyMediaList, resolveDiscordMessageText, type DiscordMediaInfo, } from "./message-utils.js"; import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js"; import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; +import { + DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, +} from "./timeouts.js"; function normalizeDiscordDmOwnerEntry(entry: string): string | undefined { const normalized = normalizeDiscordAllowList([entry], ["discord:", "user:", "pk:"]); @@ -38,6 +43,10 @@ function normalizeDiscordDmOwnerEntry(entry: string): string | undefined { return typeof candidate === "string" && /^\d+$/.test(candidate) ? candidate : undefined; } +function isContextAborted(abortSignal?: AbortSignal): boolean { + return Boolean(abortSignal?.aborted); +} + export async function buildDiscordMessageProcessContext(params: { ctx: DiscordMessagePreflightContext; text: string; @@ -49,6 +58,9 @@ export async function buildDiscordMessageProcessContext(params: { discordConfig, accountId, runtime, + mediaMaxBytes, + discordRestFetch, + abortSignal, guildHistories, historyLimit, replyToMode, @@ -189,6 +201,19 @@ export async function buildDiscordMessageProcessContext(params: { if (replyContext && !filteredReplyContext && isGuildMessage) { logVerbose(`discord: drop reply context (mode=${contextVisibilityMode})`); } + const mediaListForContext = [...mediaList]; + if (filteredReplyContext) { + const referencedReplyMediaList = await resolveReferencedReplyMediaList(message, mediaMaxBytes, { + fetchImpl: discordRestFetch, + ssrfPolicy: cfg.browser?.ssrfPolicy, + readIdleTimeoutMs: DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + totalTimeoutMs: DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, + abortSignal, + }); + if (!isContextAborted(abortSignal)) { + mediaListForContext.push(...referencedReplyMediaList); + } + } if (forumContextLine) { combinedBody = `${combinedBody}\n${forumContextLine}`; } @@ -241,11 +266,11 @@ export async function buildDiscordMessageProcessContext(params: { parentSessionKey = undefined; } } - const mediaPayload = buildDiscordMediaPayload(mediaList); + const mediaPayload = buildDiscordMediaPayload(mediaListForContext); const preflightAudioIndex = preflightAudioTranscript === undefined ? -1 - : mediaList.findIndex((media) => media.contentType?.startsWith("audio/")); + : mediaListForContext.findIndex((media) => media.contentType?.startsWith("audio/")); const threadKeys = resolveThreadSessionKeys({ baseSessionKey, threadId: threadChannel ? messageChannelId : undefined, diff --git a/extensions/discord/src/monitor/message-handler.preflight.test.ts b/extensions/discord/src/monitor/message-handler.preflight.test.ts index 3b0b77b04f0..499352d6d3b 100644 --- a/extensions/discord/src/monitor/message-handler.preflight.test.ts +++ b/extensions/discord/src/monitor/message-handler.preflight.test.ts @@ -6,6 +6,7 @@ const transcribeFirstAudioMock = vi.hoisted(() => vi.fn()); const fetchPluralKitMessageInfoMock = vi.hoisted(() => vi.fn()); const resolveDiscordDmCommandAccessMock = vi.hoisted(() => vi.fn()); const handleDiscordDmCommandDecisionMock = vi.hoisted(() => vi.fn(async () => {})); +const saveRemoteMediaMock = vi.hoisted(() => vi.fn()); vi.mock("../pluralkit.js", () => ({ fetchPluralKitMessageInfo: (...args: unknown[]) => fetchPluralKitMessageInfoMock(...args), @@ -20,6 +21,15 @@ vi.mock("./dm-command-auth.js", async (importOriginal) => ({ vi.mock("./dm-command-decision.js", () => ({ handleDiscordDmCommandDecision: handleDiscordDmCommandDecisionMock, })); +vi.mock("openclaw/plugin-sdk/media-runtime", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/media-runtime", + ); + return { + ...actual, + saveRemoteMedia: (...args: unknown[]) => saveRemoteMediaMock(...args), + }; +}); import { __testing as sessionBindingTesting, registerSessionBindingAdapter, @@ -52,6 +62,15 @@ beforeAll(async () => { beforeEach(() => { fetchPluralKitMessageInfoMock.mockReset(); + saveRemoteMediaMock.mockReset(); + saveRemoteMediaMock.mockImplementation( + async (options: { fallbackContentType?: string; filePathHint?: string }) => ({ + id: "test-media", + path: `/tmp/openclaw-discord-test/${options.filePathHint ?? "media"}`, + size: 5, + contentType: options.fallbackContentType, + }), + ); }); function createThreadBinding( @@ -1487,6 +1506,294 @@ describe("preflightDiscordMessage", () => { expect(result).toBeNull(); }); + it("records local image media for skipped mention-gated guild history", async () => { + const channelId = "channel-history-image"; + const guildId = "guild-history-image"; + const guildHistories = new Map(); + saveRemoteMediaMock.mockResolvedValueOnce({ + id: "test-media", + path: "C:\\openclaw\\media\\history.png", + size: 5, + contentType: "image/png", + }); + const fetchImpl = vi.fn( + async () => + new Response(Buffer.from("image"), { + headers: { + "content-type": "image/png", + }, + }), + ) as unknown as typeof fetch; + const message = createDiscordMessage({ + id: "m-history-image", + channelId, + content: "", + attachments: [ + { + id: "att-history-image", + url: "https://cdn.discordapp.com/attachments/1/history.png", + filename: "history.png", + content_type: "image/png", + }, + ], + author: { + id: "user-1", + bot: false, + username: "Alice", + }, + }); + + const result = await preflightDiscordMessage({ + ...createPreflightArgs({ + cfg: DEFAULT_PREFLIGHT_CFG, + discordConfig: {} as DiscordConfig, + data: createGuildEvent({ + channelId, + guildId, + author: message.author, + message, + }), + client: createGuildTextClient(channelId), + }), + guildHistories, + historyLimit: 4, + discordRestFetch: fetchImpl, + guildEntries: { + [guildId]: { + channels: { + [channelId]: { + enabled: true, + requireMention: true, + }, + }, + }, + }, + }); + + expect(result).toBeNull(); + const entries = guildHistories.get(channelId); + expect(entries).toHaveLength(1); + expect(entries?.[0]).toMatchObject({ + sender: "Alice", + body: " (1 image)", + messageId: "m-history-image", + media: [ + { + contentType: "image/png", + kind: "image", + messageId: "m-history-image", + }, + ], + }); + expect(entries?.[0]?.media?.[0]?.path).toContain("history"); + expect(entries?.[0]?.media?.[0]?.path).not.toMatch(/^https?:/); + expect(entries?.[0]?.media?.[0]?.path).toBe("C:\\openclaw\\media\\history.png"); + expect(saveRemoteMediaMock).toHaveBeenCalledTimes(1); + }); + + it("does not download non-image media for skipped mention-gated guild history", async () => { + const channelId = "channel-history-doc"; + const guildId = "guild-history-doc"; + const guildHistories = new Map(); + const message = createDiscordMessage({ + id: "m-history-doc", + channelId, + content: "", + attachments: [ + { + id: "att-history-doc", + url: "https://cdn.discordapp.com/attachments/1/history.pdf", + filename: "history.pdf", + content_type: "application/pdf", + }, + ], + author: { + id: "user-1", + bot: false, + username: "Alice", + }, + }); + + const result = await preflightDiscordMessage({ + ...createPreflightArgs({ + cfg: DEFAULT_PREFLIGHT_CFG, + discordConfig: {} as DiscordConfig, + data: createGuildEvent({ + channelId, + guildId, + author: message.author, + message, + }), + client: createGuildTextClient(channelId), + }), + guildHistories, + historyLimit: 4, + guildEntries: { + [guildId]: { + channels: { + [channelId]: { + enabled: true, + requireMention: true, + }, + }, + }, + }, + }); + + expect(result).toBeNull(); + expect(saveRemoteMediaMock).not.toHaveBeenCalled(); + expect(guildHistories.get(channelId)).toEqual([ + expect.objectContaining({ + sender: "Alice", + body: " (1 file)", + messageId: "m-history-doc", + }), + ]); + expect(guildHistories.get(channelId)?.[0]?.media).toBeUndefined(); + }); + + it("records sticker image media for skipped mention-gated guild history", async () => { + const channelId = "channel-history-sticker"; + const guildId = "guild-history-sticker"; + const guildHistories = new Map(); + saveRemoteMediaMock.mockResolvedValueOnce({ + id: "test-sticker", + path: "/tmp/openclaw-discord-test/sticker.png", + size: 5, + contentType: "image/png", + }); + const message = Object.assign( + createDiscordMessage({ + id: "m-history-sticker", + channelId, + content: "", + author: { + id: "user-1", + bot: false, + username: "Alice", + }, + }), + { + stickers: [ + { + id: "sticker-history", + name: "history-sticker", + format_type: 1, + }, + ], + }, + ); + + const result = await preflightDiscordMessage({ + ...createPreflightArgs({ + cfg: DEFAULT_PREFLIGHT_CFG, + discordConfig: {} as DiscordConfig, + data: createGuildEvent({ + channelId, + guildId, + author: message.author, + message, + }), + client: createGuildTextClient(channelId), + }), + guildHistories, + historyLimit: 4, + guildEntries: { + [guildId]: { + channels: { + [channelId]: { + enabled: true, + requireMention: true, + }, + }, + }, + }, + }); + + expect(result).toBeNull(); + expect(guildHistories.get(channelId)).toEqual([ + expect.objectContaining({ + sender: "Alice", + body: " (1 sticker)", + messageId: "m-history-sticker", + media: [ + { + path: "/tmp/openclaw-discord-test/sticker.png", + contentType: "image/png", + kind: "image", + messageId: "m-history-sticker", + }, + ], + }), + ]); + expect(saveRemoteMediaMock).toHaveBeenCalledTimes(1); + }); + + it("caps skipped history media before falling back to raw Discord stickers", async () => { + const channelId = "channel-history-cap"; + const guildId = "guild-history-cap"; + const guildHistories = new Map(); + const sticker = { + id: "sticker-history-cap", + name: "history-cap-sticker", + format_type: 1, + }; + const message = Object.assign( + createDiscordMessage({ + id: "m-history-cap", + channelId, + content: "", + attachments: Array.from({ length: 4 }, (_, index) => ({ + id: `att-history-cap-${index}`, + url: `https://cdn.discordapp.com/attachments/1/history-${index}.png`, + filename: `history-${index}.png`, + content_type: "image/png", + })), + author: { + id: "user-1", + bot: false, + username: "Alice", + }, + }), + { + rawData: { + sticker_items: [sticker], + }, + stickers: [sticker], + }, + ); + + const result = await preflightDiscordMessage({ + ...createPreflightArgs({ + cfg: DEFAULT_PREFLIGHT_CFG, + discordConfig: {} as DiscordConfig, + data: createGuildEvent({ + channelId, + guildId, + author: message.author, + message, + }), + client: createGuildTextClient(channelId), + }), + guildHistories, + historyLimit: 4, + guildEntries: { + [guildId]: { + channels: { + [channelId]: { + enabled: true, + requireMention: true, + }, + }, + }, + }, + }); + + expect(result).toBeNull(); + expect(guildHistories.get(channelId)?.[0]?.media).toHaveLength(4); + expect(saveRemoteMediaMock).toHaveBeenCalledTimes(4); + }); + it("does not drop @everyone messages when ignoreOtherMentions=true", async () => { const channelId = "channel-other-mention-everyone"; const guildId = "guild-other-mention-everyone"; diff --git a/extensions/discord/src/monitor/message-handler.preflight.ts b/extensions/discord/src/monitor/message-handler.preflight.ts index 49ef101c893..b3e6184b5ab 100644 --- a/extensions/discord/src/monitor/message-handler.preflight.ts +++ b/extensions/discord/src/monitor/message-handler.preflight.ts @@ -9,7 +9,12 @@ import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; import { shouldHandleTextCommands } from "openclaw/plugin-sdk/command-surface"; import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/dangerous-name-runtime"; import { logDebug } from "openclaw/plugin-sdk/logging-core"; -import { recordPendingHistoryEntryIfEnabled } from "openclaw/plugin-sdk/reply-history"; +import { mimeTypeFromFilePath } from "openclaw/plugin-sdk/media-mime"; +import { + type HistoryEntry, + type HistoryMediaEntry, + recordPendingHistoryEntryWithMedia, +} from "openclaw/plugin-sdk/reply-history"; import { getChildLogger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { enqueueSystemEvent } from "openclaw/plugin-sdk/system-event-runtime"; import { resolveDefaultDiscordAccountId } from "../accounts.js"; @@ -22,6 +27,7 @@ import { import { resolveDiscordChannelInfoSafe, resolveDiscordChannelNameSafe } from "./channel-access.js"; import { resolveDiscordTextCommandAccess } from "./dm-command-auth.js"; import { resolveDiscordSystemLocation, resolveTimestampMs } from "./format.js"; +import { resolveDiscordMessageStickers } from "./message-forwarded.js"; import { resolveDiscordDmPreflightAccess } from "./message-handler.dm-preflight.js"; import { hydrateDiscordMessageIfNeeded } from "./message-handler.hydration.js"; import { resolveDiscordPreflightChannelAccess } from "./message-handler.preflight-channel-access.js"; @@ -56,6 +62,7 @@ import { resolveDiscordChannelInfo, resolveDiscordMessageChannelId, resolveDiscordMessageText, + resolveMediaList, } from "./message-utils.js"; import { resolveDiscordSenderIdentity, resolveDiscordWebhookId } from "./sender-identity.js"; @@ -69,6 +76,11 @@ export { shouldIgnoreBoundThreadWebhookMessage, } from "./message-handler.preflight-helpers.js"; +const DISCORD_HISTORY_MEDIA_MAX_ATTACHMENTS = 4; +const DISCORD_HISTORY_MEDIA_MAX_BYTES = 10 * 1024 * 1024; +const DISCORD_HISTORY_MEDIA_IDLE_TIMEOUT_MS = 1_000; +const DISCORD_HISTORY_MEDIA_TOTAL_TIMEOUT_MS = 3_000; + function resolveDiscordPreflightConversationKind(params: { isGuildMessage: boolean; channelType?: ChannelType; @@ -80,6 +92,102 @@ function resolveDiscordPreflightConversationKind(params: { return { isDirectMessage, isGroupDm }; } +function isDiscordImageAttachmentCandidate(attachment: { + content_type?: string | null; + filename?: string | null; + url?: string | null; +}) { + const contentType = attachment.content_type?.split(";")[0]?.trim().toLowerCase(); + if (contentType?.startsWith("image/")) { + return true; + } + return Boolean( + mimeTypeFromFilePath(attachment.filename)?.startsWith("image/") || + mimeTypeFromFilePath(attachment.url)?.startsWith("image/"), + ); +} + +async function resolveDiscordHistoryMediaForPendingRecord(params: { + preflight: DiscordMessagePreflightParams; + message: DiscordMessagePreflightContext["message"]; +}): Promise { + const imageAttachments = (params.message.attachments ?? []) + .filter(isDiscordImageAttachmentCandidate) + .slice(0, DISCORD_HISTORY_MEDIA_MAX_ATTACHMENTS); + const stickers = resolveDiscordMessageStickers(params.message).slice( + 0, + Math.max(0, DISCORD_HISTORY_MEDIA_MAX_ATTACHMENTS - imageAttachments.length), + ); + if (imageAttachments.length === 0 && stickers.length === 0) { + return []; + } + const rawData = (() => { + try { + return params.message.rawData; + } catch { + return {}; + } + })(); + const mediaMessage = Object.assign( + Object.create(Object.getPrototypeOf(params.message)), + params.message, + ) as typeof params.message; + Object.defineProperties(mediaMessage, { + attachments: { value: imageAttachments }, + rawData: { + value: { + ...rawData, + attachments: imageAttachments, + sticker_items: stickers, + stickers, + }, + }, + stickers: { value: stickers }, + }); + const mediaList = await resolveMediaList( + mediaMessage, + Math.min(params.preflight.mediaMaxBytes, DISCORD_HISTORY_MEDIA_MAX_BYTES), + { + fetchImpl: params.preflight.discordRestFetch, + ssrfPolicy: params.preflight.cfg.browser?.ssrfPolicy, + readIdleTimeoutMs: DISCORD_HISTORY_MEDIA_IDLE_TIMEOUT_MS, + totalTimeoutMs: DISCORD_HISTORY_MEDIA_TOTAL_TIMEOUT_MS, + abortSignal: params.preflight.abortSignal, + }, + ); + return mediaList.map((media) => ({ + path: media.path, + contentType: media.contentType, + kind: "image" as const, + messageId: params.message.id, + })); +} + +async function recordDiscordPendingHistoryEntry(params: { + preflight: DiscordMessagePreflightParams; + historyKey: string; + message: DiscordMessagePreflightContext["message"]; + entry?: HistoryEntry; +}) { + if (params.preflight.historyLimit <= 0) { + return; + } + await recordPendingHistoryEntryWithMedia({ + historyMap: params.preflight.guildHistories, + historyKey: params.historyKey, + limit: params.preflight.historyLimit, + entry: params.entry ?? null, + mediaLimit: DISCORD_HISTORY_MEDIA_MAX_ATTACHMENTS, + messageId: params.message.id, + shouldRecord: () => !isPreflightAborted(params.preflight.abortSignal), + media: () => + resolveDiscordHistoryMediaForPendingRecord({ + preflight: params.preflight, + message: params.message, + }), + }); +} + export async function preflightDiscordMessage( params: DiscordMessagePreflightParams, ): Promise { @@ -536,11 +644,11 @@ export async function preflightDiscordMessage( }, "discord: skipping guild message", ); - recordPendingHistoryEntryIfEnabled({ - historyMap: params.guildHistories, + await recordDiscordPendingHistoryEntry({ + preflight: params, historyKey: messageChannelId, - limit: params.historyLimit, - entry: historyEntry ?? null, + message, + entry: historyEntry, }); return null; } @@ -567,11 +675,11 @@ export async function preflightDiscordMessage( logVerbose( `discord: drop guild message (another user/role mentioned, ignoreOtherMentions=true, botId=${botId})`, ); - recordPendingHistoryEntryIfEnabled({ - historyMap: params.guildHistories, + await recordDiscordPendingHistoryEntry({ + preflight: params, historyKey: messageChannelId, - limit: params.historyLimit, - entry: historyEntry ?? null, + message, + entry: historyEntry, }); return null; } diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 77f24217013..630e85a2b02 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -1052,6 +1052,72 @@ describe("processDiscordMessage session routing", () => { }); }); + it("does not attach referenced reply media when reply context is hidden", async () => { + const fetchImpl = vi.fn(async () => { + throw new Error("hidden reply media should not be fetched"); + }); + const ctx = await createBaseContext({ + cfg: { + channels: { discord: { contextVisibility: "allowlist" } }, + messages: { ackReaction: "👀" }, + session: { store: "/tmp/openclaw-discord-process-test-sessions.json" }, + }, + author: { + id: "U1", + username: "alice", + discriminator: "0", + globalName: "Alice", + }, + channelConfig: { + allowed: true, + users: ["U1"], + }, + discordRestFetch: fetchImpl, + message: { + id: "m-reply-hidden-media", + channelId: "c1", + content: "<@bot> what is this?", + timestamp: new Date().toISOString(), + attachments: [], + messageReference: { + type: 0, + message_id: "m-hidden", + channel_id: "c1", + }, + referencedMessage: { + id: "m-hidden", + channelId: "c1", + content: "hidden image", + timestamp: new Date().toISOString(), + attachments: [ + { + id: "att-hidden", + url: "https://cdn.discordapp.com/attachments/hidden.png", + content_type: "image/png", + filename: "hidden.png", + }, + ], + author: { + id: "U2", + username: "mallory", + discriminator: "0", + globalName: "Mallory", + }, + }, + }, + baseText: "<@bot> what is this?", + messageText: "<@bot> what is this?", + }); + + await runProcessDiscordMessage(ctx); + + const dispatchCtx = requireRecord(getLastDispatchCtx(), "dispatch context"); + expect(fetchImpl).not.toHaveBeenCalled(); + expect(dispatchCtx.ReplyToBody).toBeUndefined(); + expect(dispatchCtx.MediaPath).toBeUndefined(); + expect(dispatchCtx.MediaPaths).toBeUndefined(); + }); + it("stores DM lastRoute with user target for direct-session continuity", async () => { const ctx = await createBaseContext({ ...createDirectMessageContextOverrides(), diff --git a/extensions/discord/src/monitor/message-media.ts b/extensions/discord/src/monitor/message-media.ts index 776c26b0f7b..7cc47843f16 100644 --- a/extensions/discord/src/monitor/message-media.ts +++ b/extensions/discord/src/monitor/message-media.ts @@ -13,6 +13,7 @@ import { resolveDiscordMessageSnapshots, resolveDiscordMessageStickers, resolveDiscordReferencedForwardMessage, + resolveDiscordReferencedReplyMessage, resolveDiscordSnapshotStickers, } from "./message-forwarded.js"; import { mergeAbortSignals } from "./timeouts.js"; @@ -202,6 +203,42 @@ export async function resolveForwardedMediaList( return out; } +export async function resolveReferencedReplyMediaList( + message: Message, + maxBytes: number, + options?: DiscordMediaResolveOptions, +): Promise { + const referencedReply = resolveDiscordReferencedReplyMessage(message); + const out: DiscordMediaInfo[] = []; + if (!referencedReply) { + return out; + } + const resolvedSsrFPolicy = resolveDiscordMediaSsrFPolicy(options?.ssrfPolicy); + await appendResolvedMediaFromAttachments({ + attachments: referencedReply.attachments, + maxBytes, + out, + errorPrefix: "discord: failed to download referenced reply attachment", + fetchImpl: options?.fetchImpl, + ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, + }); + await appendResolvedMediaFromStickers({ + stickers: resolveDiscordMessageStickers(referencedReply), + maxBytes, + out, + errorPrefix: "discord: failed to download referenced reply sticker", + fetchImpl: options?.fetchImpl, + ssrfPolicy: resolvedSsrFPolicy, + readIdleTimeoutMs: options?.readIdleTimeoutMs, + totalTimeoutMs: options?.totalTimeoutMs, + abortSignal: options?.abortSignal, + }); + return out; +} + async function fetchDiscordMedia(params: { url: string; filePathHint: string; diff --git a/extensions/discord/src/monitor/message-utils.test.ts b/extensions/discord/src/monitor/message-utils.test.ts index 57982726f51..f5644671e8f 100644 --- a/extensions/discord/src/monitor/message-utils.test.ts +++ b/extensions/discord/src/monitor/message-utils.test.ts @@ -51,6 +51,7 @@ let resolveDiscordMessageChannelId: typeof import("./message-utils.js").resolveD let resolveDiscordMessageText: typeof import("./message-utils.js").resolveDiscordMessageText; let resolveForwardedMediaList: typeof import("./message-utils.js").resolveForwardedMediaList; let resolveMediaList: typeof import("./message-utils.js").resolveMediaList; +let resolveReferencedReplyMediaList: typeof import("./message-utils.js").resolveReferencedReplyMediaList; beforeAll(async () => { ({ @@ -60,6 +61,7 @@ beforeAll(async () => { resolveDiscordMessageText, resolveForwardedMediaList, resolveMediaList, + resolveReferencedReplyMediaList, } = await import("./message-utils.js")); }); @@ -470,6 +472,65 @@ describe("resolveForwardedMediaList", () => { }); }); +describe("resolveReferencedReplyMediaList", () => { + beforeEach(() => { + readRemoteMediaBuffer.mockClear(); + saveMediaBuffer.mockClear(); + }); + + it("downloads referenced reply attachments", async () => { + const attachment = { + id: "att-reply-1", + url: "https://cdn.discordapp.com/attachments/1/reply-image.png", + filename: "reply-image.png", + content_type: "image/png", + }; + readRemoteMediaBuffer.mockResolvedValueOnce({ + buffer: Buffer.from("image"), + contentType: "image/png", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/reply-image.png", + contentType: "image/png", + }); + + const result = await resolveReferencedReplyMediaList( + asReferencedForwardMessage({ + messageReferenceType: MessageReferenceType.Default, + attachments: [attachment], + }), + 512, + ); + + expectSinglePngDownload({ + result, + expectedUrl: attachment.url, + filePathHint: attachment.filename, + expectedPath: "/tmp/reply-image.png", + placeholder: "", + }); + }); + + it("ignores forwarded references", async () => { + const result = await resolveReferencedReplyMediaList( + asReferencedForwardMessage({ + attachments: [ + { + id: "att-forward-1", + url: "https://cdn.discordapp.com/attachments/1/forward.png", + filename: "forward.png", + content_type: "image/png", + }, + ], + }), + 512, + ); + + expect(result).toEqual([]); + expect(readRemoteMediaBuffer).not.toHaveBeenCalled(); + }); +}); + describe("resolveMediaList", () => { beforeEach(() => { readRemoteMediaBuffer.mockClear(); diff --git a/extensions/discord/src/monitor/message-utils.ts b/extensions/discord/src/monitor/message-utils.ts index 3e40d577750..08a9dd50cfd 100644 --- a/extensions/discord/src/monitor/message-utils.ts +++ b/extensions/discord/src/monitor/message-utils.ts @@ -12,6 +12,7 @@ export { resolveDiscordMessageSnapshots, resolveDiscordMessageStickers, resolveDiscordReferencedForwardMessage, + resolveDiscordReferencedReplyMessage, resolveDiscordSnapshotStickers, type DiscordMessageSnapshot, type DiscordSnapshotAuthor, @@ -22,6 +23,7 @@ export { buildDiscordMediaPlaceholder, resolveForwardedMediaList, resolveMediaList, + resolveReferencedReplyMediaList, type DiscordMediaInfo, type DiscordMediaResolveOptions, } from "./message-media.js"; diff --git a/extensions/slack/src/monitor/media.ts b/extensions/slack/src/monitor/media.ts index 195b90ef3e5..2253b353332 100644 --- a/extensions/slack/src/monitor/media.ts +++ b/extensions/slack/src/monitor/media.ts @@ -77,7 +77,11 @@ function isMockedFetch(fetchImpl: typeof fetch | undefined): boolean { if (typeof fetchImpl !== "function") { return false; } - return typeof (fetchImpl as typeof fetch & { mock?: unknown }).mock === "object"; + const candidate = fetchImpl as typeof fetch & { + mock?: unknown; + _isMockFunction?: unknown; + }; + return candidate.mock !== undefined || candidate._isMockFunction === true; } function createSlackMediaFetch(): FetchLike { diff --git a/extensions/slack/src/monitor/message-handler/prepare-content.ts b/extensions/slack/src/monitor/message-handler/prepare-content.ts index 4b2fe122aca..62a26c087eb 100644 --- a/extensions/slack/src/monitor/message-handler/prepare-content.ts +++ b/extensions/slack/src/monitor/message-handler/prepare-content.ts @@ -293,6 +293,9 @@ export async function resolveSlackMessageContent(params: { client?: SlackWebClient; mediaMaxBytes: number; resolveUserName?: (userId: string) => Promise<{ name?: string }>; + mediaReadIdleTimeoutMs?: number; + mediaTotalTimeoutMs?: number; + abortSignal?: AbortSignal; }): Promise { const ownFiles = filterInheritedParentFiles({ files: params.message.files, @@ -308,6 +311,9 @@ export async function resolveSlackMessageContent(params: { client: params.client, token: params.botToken, maxBytes: params.mediaMaxBytes, + readIdleTimeoutMs: params.mediaReadIdleTimeoutMs, + totalTimeoutMs: params.mediaTotalTimeoutMs, + abortSignal: params.abortSignal, }), ) : Promise.resolve(null); @@ -320,6 +326,9 @@ export async function resolveSlackMessageContent(params: { client: params.client, token: params.botToken, maxBytes: params.mediaMaxBytes, + readIdleTimeoutMs: params.mediaReadIdleTimeoutMs, + totalTimeoutMs: params.mediaTotalTimeoutMs, + abortSignal: params.abortSignal, }), ) : Promise.resolve(null); diff --git a/extensions/slack/src/monitor/message-handler/prepare.test.ts b/extensions/slack/src/monitor/message-handler/prepare.test.ts index 539497ab45f..4e9e9d01551 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.test.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.test.ts @@ -667,6 +667,178 @@ Second paragraph should still reach the agent after Slack's preview cutoff.`; expect(members).not.toHaveBeenCalled(); }); + it("records skipped no-mention room images as pending history media", async () => { + const originalFetch = globalThis.fetch; + const mockFetch = vi.fn(async () => { + return new Response(Buffer.from("image data"), { + status: 200, + headers: { "content-type": "image/png" }, + }); + }); + globalThis.fetch = mockFetch as typeof fetch; + + try { + const slackCtx = createInboundSlackCtx({ + cfg: { channels: { slack: { enabled: true } } } as OpenClawConfig, + defaultRequireMention: true, + }); + slackCtx.historyLimit = 5; + slackCtx.resolveUserName = async () => ({ name: "Alice" }); + + const prepared = await prepareMessageWith( + slackCtx, + createSlackAccount(), + createSlackMessage({ + channel: "C123", + channel_type: "channel", + text: "", + ts: "500.000", + files: [ + { + id: "F1", + name: "diagram.png", + mimetype: "image/png", + url_private: "https://files.slack.com/diagram.png", + }, + ], + }), + ); + + expect(prepared).toBeNull(); + const entries = Array.from(slackCtx.channelHistories.values()).flat(); + expect(entries).toHaveLength(1); + expect(entries[0]?.body).toBe("[Slack file: diagram.png (fileId: F1)]"); + expect(entries[0]?.media).toHaveLength(1); + expect(entries[0]?.media?.[0]).toMatchObject({ + contentType: "image/png", + kind: "image", + messageId: "500.000", + }); + expect(entries[0]?.media?.[0]?.path).toEqual(expect.any(String)); + } finally { + globalThis.fetch = originalFetch; + } + }); + + it("records skipped no-mention shared images as pending history media", async () => { + const originalFetch = globalThis.fetch; + const mockFetch = vi.fn(async () => { + return new Response(Buffer.from("shared image data"), { + status: 200, + headers: { "content-type": "image/png" }, + }); + }); + globalThis.fetch = mockFetch as typeof fetch; + + try { + const slackCtx = createInboundSlackCtx({ + cfg: { channels: { slack: { enabled: true } } } as OpenClawConfig, + defaultRequireMention: true, + }); + slackCtx.historyLimit = 5; + slackCtx.resolveUserName = async () => ({ name: "Alice" }); + + const prepared = await prepareMessageWith( + slackCtx, + createSlackAccount(), + createSlackMessage({ + channel: "C123", + channel_type: "channel", + text: "", + ts: "501.000", + attachments: [ + { + is_share: true, + image_url: "https://files.slack.com/shared.png", + }, + ], + }), + ); + + expect(prepared).toBeNull(); + const entries = Array.from(slackCtx.channelHistories.values()).flat(); + expect(entries).toHaveLength(1); + expect(entries[0]?.body).toBe("[Slack media attachment]"); + expect(entries[0]?.media).toHaveLength(1); + expect(entries[0]?.media?.[0]).toMatchObject({ + contentType: "image/png", + kind: "image", + messageId: "501.000", + }); + } finally { + globalThis.fetch = originalFetch; + } + }); + + it("does not record inherited thread-starter files as skipped reply history media", async () => { + const originalFetch = globalThis.fetch; + const mockFetch = vi.fn(async () => { + throw new Error("inherited parent file should not be downloaded"); + }); + globalThis.fetch = mockFetch as typeof fetch; + + try { + const replies = vi.fn().mockResolvedValue({ + messages: [ + { + text: "starter", + user: "U2", + ts: "600.000", + files: [ + { + id: "F-parent", + name: "parent.png", + mimetype: "image/png", + }, + ], + }, + ], + }); + const slackCtx = createInboundSlackCtx({ + cfg: { channels: { slack: { enabled: true } } } as OpenClawConfig, + appClient: { conversations: { replies } } as unknown as App["client"], + defaultRequireMention: true, + }); + slackCtx.historyLimit = 5; + slackCtx.resolveUserName = async () => ({ name: "Alice" }); + + const prepared = await prepareMessageWith( + slackCtx, + createSlackAccount(), + createSlackMessage({ + channel: "C123", + channel_type: "channel", + text: "", + ts: "601.000", + thread_ts: "600.000", + files: [ + { + id: "F-parent", + name: "parent.png", + mimetype: "image/png", + url_private: "https://files.slack.com/parent.png", + }, + ], + }), + ); + + expect(prepared).toBeNull(); + expect(replies).toHaveBeenCalledWith({ + channel: "C123", + ts: "600.000", + limit: 1, + inclusive: true, + }); + const entries = Array.from(slackCtx.channelHistories.values()).flat(); + expect(entries).toHaveLength(1); + expect(entries[0]?.body).toBe("[Slack file: parent.png (fileId: F-parent)]"); + expect(entries[0]?.media).toBeUndefined(); + expect(mockFetch).not.toHaveBeenCalled(); + } finally { + globalThis.fetch = originalFetch; + } + }); + it("allows bot-authored room messages with explicit mention when allowBots is mentions", async () => { const members = vi.fn(); const slackCtx = createInboundSlackCtx({ diff --git a/extensions/slack/src/monitor/message-handler/prepare.ts b/extensions/slack/src/monitor/message-handler/prepare.ts index a0420877f02..fe9d6bd47b1 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.ts @@ -16,10 +16,13 @@ import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; import { shouldHandleTextCommands } from "openclaw/plugin-sdk/command-surface"; import { ensureConfiguredBindingRouteReady } from "openclaw/plugin-sdk/conversation-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { mimeTypeFromFilePath } from "openclaw/plugin-sdk/media-mime"; import { finalizeInboundContext } from "openclaw/plugin-sdk/reply-dispatch-runtime"; import { buildInboundHistoryFromMap, buildPendingHistoryContextFromMap, + type HistoryMediaEntry, + recordPendingHistoryEntryWithMedia, recordPendingHistoryEntryIfEnabled, } from "openclaw/plugin-sdk/reply-history"; import type { FinalizedMsgContext } from "openclaw/plugin-sdk/reply-runtime"; @@ -37,7 +40,7 @@ import { reactSlackMessage } from "../../actions.js"; import { formatSlackError } from "../../errors.js"; import { formatSlackFileReference } from "../../file-reference.js"; import { hasSlackThreadParticipationWithPersistence } from "../../sent-thread-cache.js"; -import type { SlackMessageEvent } from "../../types.js"; +import type { SlackAttachment, SlackFile, SlackMessageEvent } from "../../types.js"; import { normalizeAllowListLower, normalizeSlackAllowOwnerEntry } from "../allow-list.js"; import { authorizeSlackBotRoomMessage, @@ -60,7 +63,7 @@ import { resolveConversationLabel } from "../conversation.runtime.js"; import { authorizeSlackDirectMessage } from "../dm-auth.js"; import { resolveSlackRoomContextHints } from "../room-context.js"; import { sendMessageSlack } from "../send.runtime.js"; -import { resolveSlackThreadStarter } from "../thread.js"; +import { resolveSlackThreadStarter, type SlackThreadStarter } from "../thread.js"; import { resolveSlackMessageContent } from "./prepare-content.js"; import { resolveSlackDmHistoryContext, resolveSlackDmHistoryLimit } from "./prepare-dm-history.js"; import { resolveSlackRoutingContext } from "./prepare-routing.js"; @@ -73,6 +76,10 @@ const SLACK_ANY_MENTION_RE = /<@[^>]+>|]+>/; const SLACK_USER_MENTION_RE = /<@([^>|]+)(?:\|[^>]+)?>/g; const SLACK_SUBTEAM_MENTION_RE = /|]+)(?:\|[^>]+)?>/g; const SLACK_SUBTEAM_MENTION_MARKER = " { + const mediaMessage = buildSlackHistoryMediaCandidateMessage(params.message); + if (!mediaMessage) { + return []; + } + const content = await resolveSlackMessageContent({ + message: mediaMessage, + isThreadReply: params.isThreadReply, + threadStarter: params.threadStarter, + isBotMessage: params.isBotMessage, + client: params.ctx.app.client, + botToken: params.ctx.botToken, + mediaMaxBytes: Math.min(params.ctx.mediaMaxBytes, SLACK_HISTORY_MEDIA_MAX_BYTES), + mediaReadIdleTimeoutMs: SLACK_HISTORY_MEDIA_IDLE_TIMEOUT_MS, + mediaTotalTimeoutMs: SLACK_HISTORY_MEDIA_TOTAL_TIMEOUT_MS, + }); + return (content?.effectiveDirectMedia ?? []).map((media) => ({ + path: media.path, + contentType: media.contentType, + kind: "image" as const, + messageId: params.message.ts, + })); +} + type SlackConversationContext = { channelInfo: { name?: string; @@ -698,11 +798,22 @@ export async function prepareSlackMessage(params: { if (isRoom && shouldRequireMention && messageIngress.activationAccess.shouldSkip) { ctx.logger.info({ channel: message.channel, reason: "no-mention" }, "skipping channel message"); const pendingText = (message.text ?? "").trim(); + const historyMediaCandidate = buildSlackHistoryMediaCandidateMessage(message); const fallbackFile = message.files?.length ? `[Slack file: ${formatSlackFileReference(message.files[0])}]` : ""; - const pendingBody = pendingText || fallbackFile; - recordPendingHistoryEntryIfEnabled({ + const fallbackSharedMedia = + !fallbackFile && historyMediaCandidate ? "[Slack media attachment]" : ""; + const pendingBody = pendingText || fallbackFile || fallbackSharedMedia; + const skippedThreadStarter = + historyMediaCandidate && isThreadReply && threadTs + ? await resolveSlackThreadStarter({ + channelId: message.channel, + threadTs, + client: ctx.app.client, + }) + : null; + await recordPendingHistoryEntryWithMedia({ historyMap: ctx.channelHistories, historyKey, limit: ctx.historyLimit, @@ -714,6 +825,16 @@ export async function prepareSlackMessage(params: { messageId: message.ts, } : null, + mediaLimit: SLACK_HISTORY_MEDIA_MAX_ATTACHMENTS, + messageId: message.ts, + media: () => + resolveSlackHistoryMediaForPendingRecord({ + ctx, + message, + isThreadReply, + threadStarter: skippedThreadStarter, + isBotMessage, + }), }); return null; } diff --git a/src/auto-reply/reply/agent-turn-attachments.ts b/src/auto-reply/reply/agent-turn-attachments.ts new file mode 100644 index 00000000000..31bffe81777 --- /dev/null +++ b/src/auto-reply/reply/agent-turn-attachments.ts @@ -0,0 +1,163 @@ +import type { AcpTurnAttachment as AgentTurnAttachment } from "../../acp/control-plane/manager.types.js"; +import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { logVerbose } from "../../globals.js"; +import type { MediaAttachment } from "../../media-understanding/types.js"; +import { createLazyImportLoader } from "../../shared/lazy-promise.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import type { FinalizedMsgContext } from "../templating.js"; +import { + type RecentInboundHistoryImage, + resolveRecentInboundHistoryImages, +} from "./history-media.js"; +import { hasInboundMedia } from "./inbound-media.js"; + +const agentTurnMediaRuntimeLoader = createLazyImportLoader( + () => import("./dispatch-acp-media.runtime.js"), +); + +export function loadAgentTurnMediaRuntime() { + return agentTurnMediaRuntimeLoader.load(); +} + +export type AgentTurnAttachmentRuntime = Pick< + Awaited>, + | "MediaAttachmentCache" + | "isMediaUnderstandingSkipError" + | "normalizeAttachments" + | "resolveMediaAttachmentLocalRoots" +>; + +const AGENT_TURN_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024; +const AGENT_TURN_ATTACHMENT_TIMEOUT_MS = 1_000; + +function isImageAgentTurnAttachment(attachment: MediaAttachment): boolean { + return attachment.mime?.startsWith("image/") === true; +} + +function hasInboundHistoryMedia(ctx: FinalizedMsgContext): boolean { + return ( + Array.isArray(ctx.InboundHistory) && + ctx.InboundHistory.some((entry) => Array.isArray(entry.media) && entry.media.length > 0) + ); +} + +export function hasPotentialAgentTurnAttachments(ctx: FinalizedMsgContext): boolean { + return hasInboundMedia(ctx) || hasInboundHistoryMedia(ctx); +} + +export async function resolveAgentTurnAttachments(params: { + ctx: FinalizedMsgContext; + cfg: OpenClawConfig; + runtime?: AgentTurnAttachmentRuntime; +}): Promise<{ + attachments: AgentTurnAttachment[]; + recentHistoryImages: RecentInboundHistoryImage[]; +}> { + if (!hasPotentialAgentTurnAttachments(params.ctx)) { + return { attachments: [], recentHistoryImages: [] }; + } + const runtime = params.runtime ?? (await loadAgentTurnMediaRuntime()); + const currentAttachments = runtime + .normalizeAttachments(params.ctx) + .map((attachment) => + normalizeOptionalString(attachment.path) + ? Object.assign({}, attachment, { url: undefined }) + : attachment, + ); + const recentHistoryImages = resolveRecentInboundHistoryImages({ ctx: params.ctx }); + const firstHistoryAttachmentIndex = + currentAttachments.reduce( + (maxIndex, attachment) => + Number.isFinite(attachment.index) ? Math.max(maxIndex, attachment.index) : maxIndex, + -1, + ) + 1; + const historyAttachments: MediaAttachment[] = recentHistoryImages.map((image, index) => ({ + path: image.path, + mime: image.contentType, + index: firstHistoryAttachmentIndex + index, + })); + const historyAttachmentByIndex = new Map( + historyAttachments.map((attachment, index) => [attachment.index, recentHistoryImages[index]]), + ); + const mediaAttachments = [...currentAttachments, ...historyAttachments]; + const cache = new runtime.MediaAttachmentCache(mediaAttachments, { + localPathRoots: runtime.resolveMediaAttachmentLocalRoots({ + cfg: params.cfg, + ctx: params.ctx, + }), + }); + const results: AgentTurnAttachment[] = []; + const resolvedHistoryImages: RecentInboundHistoryImage[] = []; + const resolveImageAttachment = async (attachment: MediaAttachment): Promise => { + const mediaType = attachment.mime ?? "application/octet-stream"; + if (!isImageAgentTurnAttachment(attachment)) { + return false; + } + if (!normalizeOptionalString(attachment.path)) { + return false; + } + try { + const { buffer } = await cache.getBuffer({ + attachmentIndex: attachment.index, + maxBytes: AGENT_TURN_ATTACHMENT_MAX_BYTES, + timeoutMs: AGENT_TURN_ATTACHMENT_TIMEOUT_MS, + }); + results.push({ + mediaType, + data: buffer.toString("base64"), + }); + const historyImage = historyAttachmentByIndex.get(attachment.index); + if (historyImage) { + resolvedHistoryImages.push(historyImage); + } + return true; + } catch (error) { + if (runtime.isMediaUnderstandingSkipError(error)) { + logVerbose( + `agent-turn-attachments: skipping attachment #${attachment.index + 1} (${error.reason})`, + ); + } else { + const errorName = error instanceof Error ? error.name : typeof error; + logVerbose( + `agent-turn-attachments: failed to read attachment #${attachment.index + 1} (${errorName})`, + ); + } + return false; + } + }; + + let currentImageResolved = false; + const hasCurrentMedia = currentAttachments.length > 0; + const hasCurrentImageCandidate = currentAttachments.some(isImageAgentTurnAttachment); + for (const attachment of currentAttachments) { + currentImageResolved = (await resolveImageAttachment(attachment)) || currentImageResolved; + } + if (!currentImageResolved && (!hasCurrentMedia || hasCurrentImageCandidate)) { + for (const attachment of historyAttachments) { + await resolveImageAttachment(attachment); + } + } + return { attachments: results, recentHistoryImages: resolvedHistoryImages }; +} + +export async function resolveAgentAttachments(params: { + ctx: FinalizedMsgContext; + cfg: OpenClawConfig; + runtime?: AgentTurnAttachmentRuntime; +}): Promise { + return (await resolveAgentTurnAttachments(params)).attachments; +} + +export function resolveInlineAgentImageAttachments( + images: Array<{ data: string; mimeType: string }> | undefined, +): AgentTurnAttachment[] { + if (!Array.isArray(images)) { + return []; + } + return images + .map((image) => ({ + mediaType: image.mimeType, + data: image.data, + })) + .filter((image) => image.mediaType.startsWith("image/") && image.data.trim().length > 0); +} diff --git a/src/auto-reply/reply/dispatch-acp-attachments.ts b/src/auto-reply/reply/dispatch-acp-attachments.ts deleted file mode 100644 index 06258321587..00000000000 --- a/src/auto-reply/reply/dispatch-acp-attachments.ts +++ /dev/null @@ -1,91 +0,0 @@ -import type { AcpTurnAttachment } from "../../acp/control-plane/manager.types.js"; -import type { OpenClawConfig } from "../../config/types.openclaw.js"; -import { logVerbose } from "../../globals.js"; -import { createLazyImportLoader } from "../../shared/lazy-promise.js"; -import { normalizeOptionalString } from "../../shared/string-coerce.js"; -import type { FinalizedMsgContext } from "../templating.js"; - -const dispatchAcpMediaRuntimeLoader = createLazyImportLoader( - () => import("./dispatch-acp-media.runtime.js"), -); - -export function loadDispatchAcpMediaRuntime() { - return dispatchAcpMediaRuntimeLoader.load(); -} - -export type DispatchAcpAttachmentRuntime = Pick< - Awaited>, - | "MediaAttachmentCache" - | "isMediaUnderstandingSkipError" - | "normalizeAttachments" - | "resolveMediaAttachmentLocalRoots" ->; - -const ACP_ATTACHMENT_MAX_BYTES = 10 * 1024 * 1024; -const ACP_ATTACHMENT_TIMEOUT_MS = 1_000; - -export async function resolveAcpAttachments(params: { - ctx: FinalizedMsgContext; - cfg: OpenClawConfig; - runtime?: DispatchAcpAttachmentRuntime; -}): Promise { - const runtime = params.runtime ?? (await loadDispatchAcpMediaRuntime()); - const mediaAttachments = runtime - .normalizeAttachments(params.ctx) - .map((attachment) => - normalizeOptionalString(attachment.path) - ? Object.assign({}, attachment, { url: undefined }) - : attachment, - ); - const cache = new runtime.MediaAttachmentCache(mediaAttachments, { - localPathRoots: runtime.resolveMediaAttachmentLocalRoots({ - cfg: params.cfg, - ctx: params.ctx, - }), - }); - const results: AcpTurnAttachment[] = []; - for (const attachment of mediaAttachments) { - const mediaType = attachment.mime ?? "application/octet-stream"; - if (!mediaType.startsWith("image/")) { - continue; - } - if (!normalizeOptionalString(attachment.path)) { - continue; - } - try { - const { buffer } = await cache.getBuffer({ - attachmentIndex: attachment.index, - maxBytes: ACP_ATTACHMENT_MAX_BYTES, - timeoutMs: ACP_ATTACHMENT_TIMEOUT_MS, - }); - results.push({ - mediaType, - data: buffer.toString("base64"), - }); - } catch (error) { - if (runtime.isMediaUnderstandingSkipError(error)) { - logVerbose(`dispatch-acp: skipping attachment #${attachment.index + 1} (${error.reason})`); - } else { - const errorName = error instanceof Error ? error.name : typeof error; - logVerbose( - `dispatch-acp: failed to read attachment #${attachment.index + 1} (${errorName})`, - ); - } - } - } - return results; -} - -export function resolveAcpInlineImageAttachments( - images: Array<{ data: string; mimeType: string }> | undefined, -): AcpTurnAttachment[] { - if (!Array.isArray(images)) { - return []; - } - return images - .map((image) => ({ - mediaType: image.mimeType, - data: image.data, - })) - .filter((image) => image.mediaType.startsWith("image/") && image.data.trim().length > 0); -} diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 1c6561f48a9..5d32f0562ee 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -9,10 +9,15 @@ import type { SessionBindingRecord } from "../../infra/outbound/session-binding- import type { MediaUnderstandingSkipError } from "../../media-understanding/errors.js"; import { withFetchPreconnect } from "../../test-utils/fetch-mock.js"; import { - resolveAcpAttachments, - resolveAcpInlineImageAttachments, -} from "./dispatch-acp-attachments.js"; + resolveAgentAttachments, + resolveAgentTurnAttachments, + resolveInlineAgentImageAttachments, +} from "./agent-turn-attachments.js"; import { tryDispatchAcpReply } from "./dispatch-acp.js"; +import { + appendRecentHistoryImageContext, + resolveRecentInboundHistoryImages, +} from "./history-media.js"; import type { ReplyDispatcher } from "./reply-dispatcher.js"; import { buildTestCtx } from "./test-ctx.js"; import { createAcpSessionMeta, createAcpTestConfig } from "./test-fixtures/acp-runtime.js"; @@ -74,6 +79,8 @@ const mediaUnderstandingMocks = vi.hoisted(() => ({ applyMediaUnderstanding: vi.fn(async (_params: unknown) => undefined), })); +const acpAttachmentBuffers = vi.hoisted(() => new Map()); + const diagnosticMocks = vi.hoisted(() => ({ markDiagnosticSessionProgress: vi.fn(), })); @@ -159,7 +166,19 @@ vi.mock("./dispatch-acp-media.runtime.js", () => ({ return params.cfg.channels?.[channel]?.attachmentRoots ?? []; }, MediaAttachmentCache: class { - async getBuffer(): Promise { + constructor(private readonly attachments: Array<{ path?: string; index: number }>) {} + async getBuffer({ attachmentIndex }: { attachmentIndex: number }) { + const attachment = this.attachments.find((item) => item.index === attachmentIndex); + const path = attachment?.path; + const buffer = path ? acpAttachmentBuffers.get(path) : undefined; + if (buffer) { + return { + buffer, + mime: "image/png", + fileName: path, + size: buffer.length, + }; + } const error = new Error("outside allowed roots"); error.name = "MediaUnderstandingSkipError"; throw error; @@ -431,6 +450,7 @@ describe("tryDispatchAcpReply", () => { ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); mediaUnderstandingMocks.applyMediaUnderstanding.mockReset(); mediaUnderstandingMocks.applyMediaUnderstanding.mockResolvedValue(undefined); + acpAttachmentBuffers.clear(); diagnosticMocks.markDiagnosticSessionProgress.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReset(); sessionMetaMocks.readAcpSessionEntry.mockReturnValue(null); @@ -702,12 +722,12 @@ describe("tryDispatchAcpReply", () => { } }); - it("forwards normalized image attachments into ACP turns", async () => { + it("forwards normalized image attachments into agent runtime turns", async () => { const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const imagePath = path.join(tempDir, "inbound.png"); try { await fs.writeFile(imagePath, "image-bytes"); - const attachments = await resolveAcpAttachments({ + const attachments = await resolveAgentAttachments({ cfg: createAcpTestConfig({ channels: { imessage: { @@ -756,14 +776,417 @@ describe("tryDispatchAcpReply", () => { } }); - it("forwards chat.send inline image attachments into ACP turns", async () => { + it("selects bounded recent local history images", () => { + const now = 1_700_000_000_000; + const ctx = buildTestCtx({ + Timestamp: now, + InboundHistory: [ + { + sender: "Old", + body: "", + timestamp: now - 31 * 60_000, + messageId: "old", + media: [{ path: "/tmp/old.png", contentType: "image/png", kind: "image" }], + }, + { + sender: "Doc", + body: "", + timestamp: now - 1_000, + messageId: "doc", + media: [{ path: "/tmp/doc.pdf", contentType: "application/pdf", kind: "document" }], + }, + { + sender: "Remote", + body: "", + timestamp: now - 1_000, + messageId: "remote", + media: [ + { path: "https://example.com/image.png", contentType: "image/png", kind: "image" }, + ], + }, + ...Array.from({ length: 5 }, (_, index) => ({ + sender: `Recent ${index}`, + body: "", + timestamp: now - (5 - index) * 1_000, + messageId: `recent-${index}`, + media: [ + { path: `/tmp/recent-${index}.png`, contentType: "image/png", kind: "image" as const }, + ], + })), + { + sender: "Windows", + body: "", + timestamp: now - 500, + messageId: "windows", + media: [ + { + path: "C:\\Users\\Alice\\Pictures\\recent.png", + contentType: "image/png", + kind: "image", + }, + ], + }, + ], + }); + + expect(resolveRecentInboundHistoryImages({ ctx })).toEqual([ + { + path: "/tmp/recent-2.png", + contentType: "image/png", + sender: "Recent 2", + messageId: "recent-2", + }, + { + path: "/tmp/recent-3.png", + contentType: "image/png", + sender: "Recent 3", + messageId: "recent-3", + }, + { + path: "/tmp/recent-4.png", + contentType: "image/png", + sender: "Recent 4", + messageId: "recent-4", + }, + { + path: "C:\\Users\\Alice\\Pictures\\recent.png", + contentType: "image/png", + sender: "Windows", + messageId: "windows", + }, + ]); + }); + + it("adds recent history image context without exposing paths", () => { + const text = appendRecentHistoryImageContext({ + promptText: "what is this?", + images: [ + { + path: "/tmp/secret.png", + contentType: "image/png", + sender: "@alice", + messageId: "msg-1", + }, + ], + }); + + expect(text).toContain("what is this?"); + expect(text).toContain("Recent image 1 from @alice, message msg-1"); + expect(text).not.toContain("/tmp/secret.png"); + }); + + it("forwards recent history image attachments into agent runtime turns", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-history-")); + const imagePath = path.join(tempDir, "recent.png"); + try { + await fs.writeFile(imagePath, "recent-image"); + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + messageId: "msg-1", + media: [{ path: imagePath, contentType: "image/png", kind: "image" }], + }, + ], + }), + runtime: { + MediaAttachmentCache: class { + constructor(private readonly attachments: Array<{ path?: string; index: number }>) {} + async getBuffer({ attachmentIndex }: { attachmentIndex: number }) { + const attachment = this.attachments.find((item) => item.index === attachmentIndex); + return { + buffer: Buffer.from(attachment?.path ?? ""), + mime: "image/png", + fileName: "recent.png", + size: attachment?.path?.length ?? 0, + }; + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: () => [], + resolveMediaAttachmentLocalRoots: () => [tempDir], + }, + }); + + expect(result.attachments).toEqual([ + { + mediaType: "image/png", + data: Buffer.from(imagePath).toString("base64"), + }, + ]); + expect(result.recentHistoryImages).toEqual([ + { + path: imagePath, + contentType: "image/png", + sender: "@alice", + messageId: "msg-1", + }, + ]); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("keeps text-only turns off the agent media runtime", async () => { + const normalizeAttachments = vi.fn(() => { + throw new Error("media runtime should not be touched"); + }); + + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + BodyForAgent: "hello", + }), + runtime: { + MediaAttachmentCache: class { + readonly __mock = true; + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments, + resolveMediaAttachmentLocalRoots: () => [], + }, + }); + + expect(result).toEqual({ attachments: [], recentHistoryImages: [] }); + expect(normalizeAttachments).not.toHaveBeenCalled(); + }); + + it("does not inject recent history images when the current turn already has an image", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-current-")); + const currentPath = path.join(tempDir, "current.png"); + const historyPath = path.join(tempDir, "history.png"); + try { + await fs.writeFile(currentPath, "current-image"); + await fs.writeFile(historyPath, "history-image"); + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + MediaPath: currentPath, + MediaType: "image/png", + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + media: [{ path: historyPath, contentType: "image/png", kind: "image" }], + }, + ], + }), + runtime: { + MediaAttachmentCache: class { + async getBuffer() { + return { + buffer: Buffer.from("current-image"), + mime: "image/png", + fileName: "current.png", + size: "current-image".length, + }; + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: (ctx) => [{ path: ctx.MediaPath, mime: ctx.MediaType, index: 0 }], + resolveMediaAttachmentLocalRoots: () => [tempDir], + }, + }); + + expect(result.attachments).toHaveLength(1); + expect(result.recentHistoryImages).toEqual([]); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("keeps history attachment indexes distinct from sparse current media indexes", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-sparse-history-")); + const currentPath = path.join(tempDir, "current.png"); + const historyPath = path.join(tempDir, "history.png"); + const seenAttachmentIndexes: number[] = []; + try { + await fs.writeFile(currentPath, "current-image"); + await fs.writeFile(historyPath, "history-image"); + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + MediaPath: currentPath, + MediaType: "image/png", + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + messageId: "msg-history", + media: [{ path: historyPath, contentType: "image/png", kind: "image" }], + }, + ], + }), + runtime: { + MediaAttachmentCache: class { + constructor(private readonly attachments: Array<{ path?: string; index: number }>) {} + async getBuffer({ attachmentIndex }: { attachmentIndex: number }) { + seenAttachmentIndexes.push(attachmentIndex); + const attachment = this.attachments.find((item) => item.index === attachmentIndex); + return { + buffer: Buffer.from(attachment?.path ?? ""), + mime: "image/png", + fileName: "current.png", + size: attachment?.path?.length ?? 0, + }; + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: (ctx) => [{ path: ctx.MediaPath, mime: ctx.MediaType, index: 1 }], + resolveMediaAttachmentLocalRoots: () => [tempDir], + }, + }); + + expect(result.attachments).toEqual([ + { + mediaType: "image/png", + data: Buffer.from(currentPath).toString("base64"), + }, + ]); + expect(result.recentHistoryImages).toEqual([]); + expect(seenAttachmentIndexes).toEqual([1]); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("does not fall back to recent history images when the current turn has non-image media", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-current-pdf-")); + const documentPath = path.join(tempDir, "current.pdf"); + const historyPath = path.join(tempDir, "history.png"); + const getBuffer = vi.fn(); + try { + await fs.writeFile(documentPath, "current-pdf"); + await fs.writeFile(historyPath, "history-image"); + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + MediaPath: documentPath, + MediaType: "application/pdf", + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + messageId: "msg-history", + media: [{ path: historyPath, contentType: "image/png", kind: "image" }], + }, + ], + }), + runtime: { + MediaAttachmentCache: class { + async getBuffer(params: { attachmentIndex: number }) { + return getBuffer(params); + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: (ctx) => [{ path: ctx.MediaPath, mime: ctx.MediaType, index: 0 }], + resolveMediaAttachmentLocalRoots: () => [tempDir], + }, + }); + + expect(result).toEqual({ attachments: [], recentHistoryImages: [] }); + expect(getBuffer).not.toHaveBeenCalled(); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("falls back to recent history images when current image attachments are unusable", async () => { + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-history-fallback-")); + const historyPath = path.join(tempDir, "history.png"); + try { + await fs.writeFile(historyPath, "history-image"); + const result = await resolveAgentTurnAttachments({ + cfg: createAcpTestConfig(), + ctx: buildTestCtx({ + Provider: "discord", + Surface: "discord", + MediaUrl: "https://example.com/current.png", + MediaType: "image/png", + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + messageId: "msg-history", + media: [{ path: historyPath, contentType: "image/png", kind: "image" }], + }, + ], + }), + runtime: { + MediaAttachmentCache: class { + constructor(private readonly attachments: Array<{ path?: string; index: number }>) {} + async getBuffer({ attachmentIndex }: { attachmentIndex: number }) { + const attachment = this.attachments.find((item) => item.index === attachmentIndex); + return { + buffer: Buffer.from(attachment?.path ?? ""), + mime: "image/png", + fileName: "history.png", + size: attachment?.path?.length ?? 0, + }; + } + } as unknown as typeof import("./dispatch-acp-media.runtime.js").MediaAttachmentCache, + isMediaUnderstandingSkipError: (_error: unknown): _error is MediaUnderstandingSkipError => + false, + normalizeAttachments: (ctx) => [{ url: ctx.MediaUrl, mime: ctx.MediaType, index: 0 }], + resolveMediaAttachmentLocalRoots: () => [tempDir], + }, + }); + + expect(result.attachments).toEqual([ + { + mediaType: "image/png", + data: Buffer.from(historyPath).toString("base64"), + }, + ]); + expect(result.recentHistoryImages).toEqual([ + { + path: historyPath, + contentType: "image/png", + sender: "@alice", + messageId: "msg-history", + }, + ]); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + + it("forwards chat.send inline image attachments into agent runtime turns", async () => { setReadyAcpResolution(); const image = { mimeType: "image/png", data: Buffer.from("image-bytes").toString("base64"), }; - expect(resolveAcpInlineImageAttachments([image])).toEqual([ + expect(resolveInlineAgentImageAttachments([image])).toEqual([ { mediaType: "image/png", data: image.data, @@ -784,7 +1207,42 @@ describe("tryDispatchAcpReply", () => { ]); }); - it("skips ACP attachments outside allowed inbound roots", async () => { + it("preserves chat.send inline image attachments over recent history images", async () => { + setReadyAcpResolution(); + const image = { + mimeType: "image/png", + data: Buffer.from("inline-image").toString("base64"), + }; + const historyPath = "/tmp/openclaw-history-inline.png"; + acpAttachmentBuffers.set(historyPath, Buffer.from("history-image")); + + await runDispatch({ + bodyForAgent: "describe image", + images: [image], + ctxOverrides: { + Timestamp: 1_700_000_000_000, + InboundHistory: [ + { + sender: "@alice", + body: "", + timestamp: 1_700_000_000_000, + messageId: "msg-history", + media: [{ path: historyPath, contentType: "image/png", kind: "image" }], + }, + ], + }, + }); + + expect(runTurnCall().text).toBe("describe image"); + expect(runTurnCall().attachments).toEqual([ + { + mediaType: "image/png", + data: image.data, + }, + ]); + }); + + it("skips agent runtime attachments outside allowed inbound roots", async () => { setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const imagePath = path.join(tempDir, "outside-root.png"); @@ -806,7 +1264,7 @@ describe("tryDispatchAcpReply", () => { } }); - it("skips file URL ACP attachments outside allowed inbound roots", async () => { + it("skips file URL agent runtime attachments outside allowed inbound roots", async () => { setReadyAcpResolution(); const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "dispatch-acp-")); const imagePath = path.join(tempDir, "outside-root.png"); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 1b1f22b435a..d9c34e56a70 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -29,14 +29,15 @@ import type { SourceReplyDeliveryMode } from "../get-reply-options.types.js"; import type { FinalizedMsgContext } from "../templating.js"; import { createAcpReplyProjector } from "./acp-projector.js"; import { - loadDispatchAcpMediaRuntime, - resolveAcpAttachments, - resolveAcpInlineImageAttachments, -} from "./dispatch-acp-attachments.js"; + loadAgentTurnMediaRuntime, + resolveAgentTurnAttachments, + resolveInlineAgentImageAttachments, +} from "./agent-turn-attachments.js"; import { createAcpDispatchDeliveryCoordinator, type AcpDispatchDeliveryCoordinator, } from "./dispatch-acp-delivery.js"; +import { appendRecentHistoryImageContext } from "./history-media.js"; import { hasInboundMedia } from "./inbound-media.js"; import type { ReplyDispatchKind, ReplyDispatcher } from "./reply-dispatcher.types.js"; @@ -456,7 +457,7 @@ export async function tryDispatchAcpReply(params: { } if (hasInboundMedia(params.ctx) && !params.ctx.MediaUnderstanding?.length) { try { - const { applyMediaUnderstanding } = await loadDispatchAcpMediaRuntime(); + const { applyMediaUnderstanding } = await loadAgentTurnMediaRuntime(); await applyMediaUnderstanding({ ctx: params.ctx, cfg: params.cfg, @@ -470,14 +471,28 @@ export async function tryDispatchAcpReply(params: { } const promptText = resolveAcpPromptText(params.ctx); - const mediaAttachments = hasInboundMedia(params.ctx) - ? await resolveAcpAttachments({ ctx: params.ctx, cfg: params.cfg }) - : []; + const resolvedTurnAttachments = await resolveAgentTurnAttachments({ + ctx: params.ctx, + cfg: params.cfg, + }); + const mediaAttachments = resolvedTurnAttachments.attachments; + const inlineAttachments = resolveInlineAgentImageAttachments(params.images); + const mediaAttachmentsAreOnlyRecentHistory = + mediaAttachments.length > 0 && + mediaAttachments.length === resolvedTurnAttachments.recentHistoryImages.length; const attachments = - mediaAttachments.length > 0 + mediaAttachments.length > 0 && + !(mediaAttachmentsAreOnlyRecentHistory && inlineAttachments.length > 0) ? mediaAttachments - : resolveAcpInlineImageAttachments(params.images); - if (!promptText && attachments.length === 0) { + : inlineAttachments; + const turnPromptText = + attachments === mediaAttachments + ? appendRecentHistoryImageContext({ + promptText, + images: resolvedTurnAttachments.recentHistoryImages, + }) + : promptText; + if (!turnPromptText && attachments.length === 0) { const counts = params.dispatcher.getQueuedCounts(); delivery.applyRoutedCounts(counts); params.recordProcessed("completed", { reason: "acp_empty_prompt" }); @@ -495,7 +510,7 @@ export async function tryDispatchAcpReply(params: { cfg: params.cfg, sessionKey: canonicalSessionKey, text: resolveAcpTurnText({ - promptText, + promptText: turnPromptText, sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, }), attachments: attachments.length > 0 ? attachments : undefined, @@ -518,7 +533,7 @@ export async function tryDispatchAcpReply(params: { await persistAcpDispatchTranscript({ cfg: params.cfg, sessionKey: canonicalSessionKey, - promptText, + promptText: turnPromptText, finalText: delivery.getAccumulatedFinalText() || delivery.getAccumulatedBlockText(), meta: acpResolution.meta, threadId: params.ctx.MessageThreadId, diff --git a/src/auto-reply/reply/history-media.ts b/src/auto-reply/reply/history-media.ts new file mode 100644 index 00000000000..c8e660212b1 --- /dev/null +++ b/src/auto-reply/reply/history-media.ts @@ -0,0 +1,122 @@ +import { mimeTypeFromFilePath } from "../../media/mime.js"; +import { normalizeOptionalString } from "../../shared/string-coerce.js"; +import type { FinalizedMsgContext } from "../templating.js"; +import type { HistoryEntry, HistoryMediaEntry } from "./history.types.js"; + +export const RECENT_HISTORY_IMAGE_TTL_MS = 30 * 60_000; +export const RECENT_HISTORY_IMAGE_LIMIT = 4; + +export type RecentInboundHistoryImage = { + path: string; + contentType: string; + sender: string; + messageId?: string; +}; + +function isRemotePath(value: string): boolean { + if (/^[a-z]:[\\/]/i.test(value)) { + return false; + } + try { + return new URL(value).protocol !== "file:"; + } catch { + return false; + } +} + +function resolveHistoryImageContentType(media: HistoryMediaEntry): string | undefined { + const contentType = normalizeOptionalString(media.contentType); + if (contentType?.startsWith("image/")) { + return contentType; + } + const path = normalizeOptionalString(media.path); + return mimeTypeFromFilePath(path); +} + +function isHistoryImageMedia(media: HistoryMediaEntry): boolean { + if (media.kind === "image") { + return true; + } + return Boolean(resolveHistoryImageContentType(media)?.startsWith("image/")); +} + +function resolveTimestamp(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function resolveHistoryEntries(ctx: FinalizedMsgContext): HistoryEntry[] { + return Array.isArray(ctx.InboundHistory) ? ctx.InboundHistory : []; +} + +export function resolveRecentInboundHistoryImages(params: { + ctx: FinalizedMsgContext; + nowMs?: number; + ttlMs?: number; + limit?: number; +}): RecentInboundHistoryImage[] { + const nowMs = params.nowMs ?? resolveTimestamp(params.ctx.Timestamp) ?? Date.now(); + const ttlMs = params.ttlMs ?? RECENT_HISTORY_IMAGE_TTL_MS; + const limit = Math.max(0, params.limit ?? RECENT_HISTORY_IMAGE_LIMIT); + if (limit === 0) { + return []; + } + + const out: RecentInboundHistoryImage[] = []; + const seen = new Set(); + const entries = resolveHistoryEntries(params.ctx); + for (let index = entries.length - 1; index >= 0 && out.length < limit; index -= 1) { + const entry = entries[index]; + const timestamp = resolveTimestamp(entry?.timestamp); + if (timestamp === undefined || Math.abs(nowMs - timestamp) > ttlMs) { + continue; + } + const mediaEntries = Array.isArray(entry.media) ? entry.media : []; + for ( + let mediaIndex = mediaEntries.length - 1; + mediaIndex >= 0 && out.length < limit; + mediaIndex -= 1 + ) { + const media = mediaEntries[mediaIndex]; + if (!media || !isHistoryImageMedia(media)) { + continue; + } + const mediaPath = normalizeOptionalString(media.path); + if (!mediaPath || isRemotePath(mediaPath)) { + continue; + } + const contentType = resolveHistoryImageContentType(media); + if (!contentType?.startsWith("image/")) { + continue; + } + const messageId = normalizeOptionalString(media.messageId) ?? entry.messageId; + const key = [messageId ?? "", mediaPath].join("\0"); + if (seen.has(key)) { + continue; + } + seen.add(key); + out.push({ + path: mediaPath, + contentType, + sender: entry.sender, + ...(messageId ? { messageId } : {}), + }); + } + } + return out.toReversed(); +} + +export function appendRecentHistoryImageContext(params: { + promptText: string; + images: RecentInboundHistoryImage[]; +}): string { + if (params.images.length === 0) { + return params.promptText; + } + const notes = params.images.map((image, index) => { + const message = image.messageId ? `, message ${image.messageId}` : ""; + return `[Recent image ${index + 1} from ${image.sender}${message}, attached as media.]`; + }); + return [params.promptText, notes.join("\n")] + .filter((part) => part.trim().length > 0) + .join("\n\n"); +} diff --git a/src/auto-reply/reply/history.test.ts b/src/auto-reply/reply/history.test.ts new file mode 100644 index 00000000000..193a5083bc8 --- /dev/null +++ b/src/auto-reply/reply/history.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, it } from "vitest"; +import { normalizeHistoryMediaEntries, recordPendingHistoryEntryWithMedia } from "./history.js"; +import type { HistoryEntry } from "./history.types.js"; + +describe("history media recording", () => { + it("keeps only bounded local image media", () => { + expect( + normalizeHistoryMediaEntries({ + limit: 2, + messageId: "msg-1", + media: [ + { path: "/tmp/a.png", contentType: "image/png" }, + { path: "https://example.com/b.png", contentType: "image/png" }, + { path: "/tmp/c.pdf", contentType: "application/pdf", kind: "document" }, + { path: "C:\\tmp\\d.jpg", kind: "image" }, + { path: "/tmp/e.jpg", kind: "image" }, + ], + }), + ).toEqual([ + { path: "/tmp/a.png", contentType: "image/png", kind: "image", messageId: "msg-1" }, + { path: "C:\\tmp\\d.jpg", kind: "image", messageId: "msg-1" }, + ]); + }); + + it("records text history unchanged when media resolver has no usable media", async () => { + const historyMap = new Map(); + + await recordPendingHistoryEntryWithMedia({ + historyMap, + historyKey: "channel-1", + limit: 5, + entry: { sender: "Alice", body: "hello", messageId: "msg-1" }, + media: async () => [{ path: "https://example.com/a.png", contentType: "image/png" }], + }); + + expect(historyMap.get("channel-1")).toEqual([ + { sender: "Alice", body: "hello", messageId: "msg-1" }, + ]); + }); + + it("records text history before async media resolution finishes", async () => { + const historyMap = new Map(); + let resolveMedia!: (media: HistoryEntry["media"]) => void; + const mediaPromise = new Promise((resolve) => { + resolveMedia = resolve; + }); + + const pending = recordPendingHistoryEntryWithMedia({ + historyMap, + historyKey: "channel-1", + limit: 5, + entry: { sender: "Alice", body: "", messageId: "msg-1" }, + media: async () => await mediaPromise, + }); + + expect(historyMap.get("channel-1")).toEqual([ + { sender: "Alice", body: "", messageId: "msg-1" }, + ]); + + resolveMedia([{ path: "/tmp/a.png", contentType: "image/png" }]); + await pending; + + expect(historyMap.get("channel-1")).toEqual([ + { + sender: "Alice", + body: "", + messageId: "msg-1", + media: [ + { path: "/tmp/a.png", contentType: "image/png", kind: "image", messageId: "msg-1" }, + ], + }, + ]); + }); +}); diff --git a/src/auto-reply/reply/history.ts b/src/auto-reply/reply/history.ts index 6029f7e15f1..e48eeca9103 100644 --- a/src/auto-reply/reply/history.ts +++ b/src/auto-reply/reply/history.ts @@ -1,4 +1,4 @@ -import type { HistoryEntry } from "./history.types.js"; +import type { HistoryEntry, HistoryMediaEntry } from "./history.types.js"; import { CURRENT_MESSAGE_MARKER } from "./mentions.js"; export const HISTORY_CONTEXT_MARKER = "[Chat messages since your last reply - for context]"; @@ -100,6 +100,123 @@ export function recordPendingHistoryEntryIfEnabled(param }); } +type MaybePromise = T | Promise; + +const DEFAULT_HISTORY_MEDIA_LIMIT = 4; + +function isLocalHistoryMediaPath(path: string): boolean { + if (/^[a-z]:[\\/]/i.test(path)) { + return true; + } + return !/^[a-z][a-z0-9+.-]*:/i.test(path); +} + +function isImageHistoryMediaEntry(entry: HistoryMediaEntry): boolean { + const contentType = entry.contentType?.split(";")[0]?.trim().toLowerCase(); + return entry.kind === "image" || contentType?.startsWith("image/") === true; +} + +export function normalizeHistoryMediaEntries(params: { + media?: readonly HistoryMediaEntry[] | null; + limit?: number; + messageId?: string; +}): HistoryMediaEntry[] { + const limit = Math.max(0, params.limit ?? DEFAULT_HISTORY_MEDIA_LIMIT); + if (limit <= 0 || !params.media?.length) { + return []; + } + const out: HistoryMediaEntry[] = []; + const seen = new Set(); + for (const entry of params.media) { + if (!isImageHistoryMediaEntry(entry)) { + continue; + } + const path = entry.path?.trim(); + if (!path || !isLocalHistoryMediaPath(path)) { + continue; + } + const dedupeKey = `${entry.messageId ?? params.messageId ?? ""}\0${path}`; + if (seen.has(dedupeKey)) { + continue; + } + seen.add(dedupeKey); + out.push({ + path, + contentType: entry.contentType, + kind: "image", + messageId: entry.messageId ?? params.messageId, + }); + if (out.length >= limit) { + break; + } + } + return out; +} + +export async function recordPendingHistoryEntryWithMedia(params: { + historyMap: Map; + historyKey: string; + entry?: T | null; + limit: number; + media?: + | readonly HistoryMediaEntry[] + | null + | (() => MaybePromise); + mediaLimit?: number; + messageId?: string; + shouldRecord?: () => boolean; +}): Promise { + if (!params.entry || params.limit <= 0) { + return []; + } + if (params.shouldRecord && !params.shouldRecord()) { + return []; + } + if (typeof params.media === "function") { + const recordedEntry = params.entry; + const history = recordPendingHistoryEntry({ + historyMap: params.historyMap, + historyKey: params.historyKey, + entry: recordedEntry, + limit: params.limit, + }); + const resolvedMedia = await params.media(); + if (params.shouldRecord && !params.shouldRecord()) { + return history; + } + const media = normalizeHistoryMediaEntries({ + media: resolvedMedia, + limit: params.mediaLimit, + messageId: params.messageId ?? params.entry.messageId, + }); + if (media.length === 0) { + return history; + } + const currentHistory = params.historyMap.get(params.historyKey); + const entryIndex = currentHistory?.indexOf(recordedEntry) ?? -1; + if (currentHistory && entryIndex >= 0) { + currentHistory[entryIndex] = { ...recordedEntry, media } as T; + } + return history; + } + const resolvedMedia = params.media ?? undefined; + if (params.shouldRecord && !params.shouldRecord()) { + return []; + } + const media = normalizeHistoryMediaEntries({ + media: resolvedMedia, + limit: params.mediaLimit, + messageId: params.messageId ?? params.entry.messageId, + }); + const entry = media.length > 0 ? ({ ...params.entry, media } as T) : params.entry; + return recordPendingHistoryEntry({ + historyMap: params.historyMap, + historyKey: params.historyKey, + entry, + limit: params.limit, + }); +} + export function buildPendingHistoryContextFromMap(params: { historyMap: Map; historyKey: string; diff --git a/src/plugin-sdk/reply-history.ts b/src/plugin-sdk/reply-history.ts index 63ed7700903..25a81a13ec3 100644 --- a/src/plugin-sdk/reply-history.ts +++ b/src/plugin-sdk/reply-history.ts @@ -12,6 +12,8 @@ export { clearHistoryEntries, clearHistoryEntriesIfEnabled, evictOldHistoryKeys, + normalizeHistoryMediaEntries, recordPendingHistoryEntry, + recordPendingHistoryEntryWithMedia, recordPendingHistoryEntryIfEnabled, } from "../auto-reply/reply/history.js";