Files
openclaw/src/media-understanding/attachments.cache.ts
Peter Steinberger 0b8aabe864 docs: document auth profile failure policy contract (#89613)
* docs: document markdown marker renderer

* docs: document rendered markdown chunking

* docs: document markdown text chunking

* docs: document shared text chunking

* docs: document plugin text chunking exports

* docs: document avatar policy constants

* docs: document node match candidates

* docs: document scoped expiring id cache

* docs: document runtime import normalization

* docs: document string sample summaries

* docs: document session usage timeseries types

* docs: document session usage response types

* docs: document manifest frontmatter shapes

* docs: document channel route input metadata

* docs: document pair loop guard settings

* docs: document migration config patch helpers

* docs: document api provider registry

* docs: document tool call repair payloads

* docs: document plugin tool payload helpers

* docs: document lazy promise loader

* docs: document store writer queue state

* docs: document thread binding lifecycle

* docs: document concurrency helper contract

* docs: document gateway client info contract

* docs: document delivery context contracts

* docs: document secret ref defaults contract

* docs: document command gating contract

* docs: document avatar policy contract

* docs: document node match policy

* docs: document message channel normalization

* docs: document boolean parsing contract

* docs: document zod parse helpers

* docs: document direct dm guard policy

* docs: document fixed window limiter contract

* docs: document node presence event contract

* docs: document secret normalization contract

* docs: document progress draft line removal

* docs: document usage formatting contracts

* docs: document agent run status contract

* docs: document runtime import helpers

* docs: document provider utility ownership

* docs: document invalid config helpers

* docs: document json compat parser

* docs: document channel config metadata ownership

* docs: document channel logging helpers

* docs: document sender identity validation ownership

* docs: document string sampling helper

* docs: document global singleton helpers

* docs: document transcript tool helpers

* docs: document exec safe-bin normalization

* docs: document reaction level resolver

* docs: document account snapshot redaction boundary

* docs: document messaging target helpers

* docs: document thread binding messages

* docs: document conversation binding context

* docs: document conversation resolution helper

* docs: document owner display secret retention

* docs: document provider request config types

* docs: document skills config types

* docs: document memory config types

* docs: document imessage config types

* docs: document crestodian config types

* docs: document tools config policies

* docs: document shared config base types

* docs: document channel config contracts

* docs: document openclaw config state types

* docs: document model config contracts

* docs: document shared agent config types

* docs: document agent defaults config types

* docs: document secret input contracts

* docs: document auth config contracts

* docs: document gateway config contracts

* docs: document tool call stream repair contracts

* docs: document memory host facades

* docs: document llm core contracts

* docs: document markdown core contracts

* docs: document gateway connect error contracts

* docs: document gateway protocol primitives

* docs: document gateway frame schemas

* docs: document gateway device schemas

* docs: document gateway environment schemas

* docs: document gateway push schemas

* docs: document gateway plugin schemas

* docs: document gateway artifact schemas

* docs: document gateway command schemas

* docs: document gateway task schemas

* docs: document gateway exec approval schemas

* docs: document gateway secret schemas

* docs: document gateway config schemas

* docs: document gateway snapshot schemas

* docs: document gateway chat schemas

* docs: document gateway wizard schemas

* docs: document gateway node schemas

* docs: document gateway plugin approval schemas

* docs: document gateway talk schemas

* docs: document gateway agent schemas

* docs: document gateway session schemas

* docs: document gateway cron schemas

* docs: document gateway agent model skill schemas

* docs: document gateway skill proposal tool schemas

* docs: document gateway protocol registry

* docs: document gateway channel status schemas

* docs: document gateway schema regression tests

* docs: document gateway schema barrel

* docs: document gateway validator tests

* docs: document gateway primitive push tests

* docs: document gateway contract tests

* docs: document native protocol guard

* docs: document channel schema tests

* docs: document gateway protocol smoke tests

* docs: document gateway protocol entrypoint

* docs: document gateway protocol type exports

* docs: document gateway error codes

* docs: document protocol schema registry

* docs: document talk audio codec

* docs: document talk activation names

* docs: document talk consult questions

* docs: document talk consult tool

* docs: document talk run control contracts

* docs: document talk run control adapter

* docs: document talkback consult queue

* docs: document talk consult transcript guard

* docs: document talk fast context runtime

* docs: document forced talk consult coordinator

* docs: document talk output activity tracker

* docs: document talk event metrics

* docs: document talk diagnostics

* docs: document talk observability hook

* docs: document talk provider resolver

* docs: document talk provider registry

* docs: document talk runtime primitives

* docs: document talk consult controller logs

* docs: document channel identity helpers

* docs: document channel account allowlist helpers

* docs: document channel metadata draft controls

* docs: document channel ingress policy

* docs: document channel sender access gates

* docs: document channel catalog message contracts

* docs: document channel account plugin helpers

* docs: document configured binding helpers

* docs: document channel acp approval config helpers

* docs: document channel bundled config write helpers

* docs: document channel plugin utility contracts

* docs: document channel config access helpers

* docs: document channel message action helpers

* docs: document channel outbound runtime helpers

* docs: document channel pairing promotion helpers

* docs: document channel registry helpers

* docs: document channel setup wizard helpers

* docs: document channel lifecycle status helpers

* docs: document channel target thread helpers

* docs: document channel session binding helpers

* docs: document channel package module probes

* docs: document channel setup wizard contracts

* docs: document channel plugin API barrels

* docs: document channel contract test helpers

* docs: document channel core helpers

* docs: document small core facades

* docs: document provider runtime helpers

* docs: document persistence and realtime helpers

* docs: document mcp and state helpers

* docs: document tool planner contracts

* docs: document music generation runtime

* docs: document crestodian command flow

* docs: document utility helpers

* docs: document node host helpers

* docs: document transcript contracts

* docs: document trajectory export contracts

* docs: document image generation contracts

* docs: document routing helper contracts

* docs: document session helper contracts

* docs: document video generation contracts

* docs: document model catalog contracts

* docs: document proxy capture contracts

* docs: document status rendering contracts

* docs: document test helper contracts

* docs: document wizard setup contracts

* docs: document process contracts

* docs: document memory host sdk contracts

* docs: document tts contracts

* docs: document secrets runtime contracts

* docs: document shared helper contracts

* docs: document hook runtime contracts

* docs: document security audit contracts

* docs: document flow contracts

* docs: document media understanding contracts

* docs: document tui contracts

* docs: document logging contracts

* docs: document llm contracts

* docs: document cron contracts

* docs: document daemon contracts

* docs: document task contracts

* docs: document acp contracts

* docs: document test utility contracts

* docs: document skill contracts

* docs: document config contracts

* docs: document outbound infra contracts

* docs: document command analysis contracts

* docs: document provider usage infra contracts

* docs: document file safety infra contracts

* docs: document exec approval infra contracts

* docs: document gateway runtime infra contracts

* docs: document infra utility contracts

* docs: document infra queue storage contracts

* docs: document heartbeat infra contracts

* docs: document remaining infra contracts

* docs: document gateway auth contracts

* docs: document gateway display helpers

* docs: document gateway http helpers

* docs: document gateway node helpers

* docs: document gateway mcp helpers

* docs: document gateway support helpers

* docs: document gateway server runtime helpers

* docs: document gateway runtime bootstrap helpers

* docs: document gateway session events

* docs: document gateway utility helpers

* docs: document gateway talk helpers

* docs: document gateway helper contracts

* docs: document gateway server method helpers

* docs: document gateway server auth helpers

* docs: document gateway server tests

* docs: document gateway test helpers

* docs: document gateway node tests

* docs: document gateway channel tests

* docs: document gateway session tests

* docs: document gateway server startup tests

* docs: document gateway tool test helpers

* docs: document gateway server test helpers

* docs: document gateway server method tests

* docs: document remaining gateway tests

* docs: document plugin sdk public subpaths

* docs: document plugin sdk runtime helpers

* docs: document plugin sdk memory provider helpers

* docs: document plugin sdk runtime facades

* docs: document plugin sdk command approval helpers

* docs: document plugin sdk runtime types

* docs: document plugin sdk browser account helpers

* docs: document plugin sdk media memory helpers

* docs: document plugin sdk core tests

* docs: document plugin sdk contract helpers

* docs: document plugin sdk test helpers

* docs: document remaining plugin sdk tests

* docs: document cli utility helpers

* docs: document cli runtime helpers

* docs: document cli command registration helpers

* docs: document node cli helpers

* docs: document cli program registration

* docs: document message cli registration

* docs: document daemon cli helpers

* docs: document cli route parsers
2026-06-03 15:20:39 -07:00

470 lines
15 KiB
TypeScript

import fs from "node:fs/promises";
import path from "node:path";
import {
isInboundPathAllowed,
mergeInboundPathRoots,
} from "@openclaw/media-core/inbound-path-policy";
import { detectMime } from "@openclaw/media-core/mime";
import { logVerbose, shouldLogVerbose } from "../globals.js";
import { FsSafeError, openLocalFileSafely } from "../infra/fs-safe.js";
import type { SsrFPolicy } from "../infra/net/ssrf.js";
import { isAbortError } from "../infra/unhandled-rejections.js";
import {
readRemoteMediaBuffer,
type MediaFetchRetryOptions,
MediaFetchError,
} from "../media/fetch.js";
import { getDefaultMediaLocalRoots } from "../media/local-roots.js";
import { buildRandomTempFilePath } from "../plugin-sdk/temp-path.js";
import { normalizeAttachmentPath } from "./attachments.normalize.js";
import { MediaUnderstandingSkipError } from "./errors.js";
import type { MediaAttachment } from "./types.js";
type MediaBufferResult = {
buffer: Buffer;
mime?: string;
fileName: string;
size: number;
};
type MediaPathResult = {
path: string;
cleanup?: () => Promise<void> | void;
};
type LocalReadResult = {
buffer: Buffer;
filePath: string;
};
const REMOTE_MEDIA_FETCH_RETRY: MediaFetchRetryOptions = {
attempts: 3,
minDelayMs: 500,
maxDelayMs: 3_000,
jitter: 0.2,
};
type AttachmentCacheEntry = {
attachment: MediaAttachment;
resolvedPath?: string;
statSize?: number;
buffer?: Buffer;
bufferMime?: string;
bufferFileName?: string;
tempPath?: string;
tempCleanup?: () => Promise<void>;
};
let defaultLocalPathRoots: readonly string[] | undefined;
function concreteMime(mime: string | undefined): string | undefined {
const normalized = mime?.trim();
if (!normalized || normalized.endsWith("/*")) {
return undefined;
}
return normalized;
}
function getDefaultLocalPathRoots(): readonly string[] {
defaultLocalPathRoots ??= mergeInboundPathRoots(getDefaultMediaLocalRoots());
return defaultLocalPathRoots;
}
/** Local/remote access policy used by the lazy media-understanding attachment cache. */
export type MediaAttachmentCacheOptions = {
localPathRoots?: readonly string[];
includeDefaultLocalPathRoots?: boolean;
ssrfPolicy?: SsrFPolicy;
workspaceDir?: string;
};
/**
* Lazy resolver for media-understanding attachments.
*
* The cache prefers allowed local paths, falls back to remote URLs when a local path is blocked
* or missing, and owns any temporary files created for providers that require a filesystem path.
*/
export class MediaAttachmentCache {
private readonly entries = new Map<number, AttachmentCacheEntry>();
private readonly attachments: MediaAttachment[];
private readonly localPathRoots: readonly string[];
private readonly ssrfPolicy: SsrFPolicy | undefined;
private readonly workspaceDir?: string;
private canonicalLocalPathRoots?: Promise<readonly string[]>;
constructor(attachments: MediaAttachment[], options?: MediaAttachmentCacheOptions) {
this.attachments = attachments;
this.ssrfPolicy = options?.ssrfPolicy;
this.localPathRoots =
options?.includeDefaultLocalPathRoots === false
? mergeInboundPathRoots(options.localPathRoots)
: mergeInboundPathRoots(options?.localPathRoots, getDefaultLocalPathRoots());
this.workspaceDir = options?.workspaceDir ? path.resolve(options.workspaceDir) : undefined;
for (const attachment of attachments) {
this.entries.set(attachment.index, { attachment });
}
}
/** Returns attachment bytes, MIME hint, filename, and size within the requested byte limit. */
async getBuffer(params: {
attachmentIndex: number;
maxBytes: number;
timeoutMs: number;
}): Promise<MediaBufferResult> {
const entry = await this.ensureEntry(params.attachmentIndex);
const url = entry.attachment.url?.trim();
if (entry.buffer) {
if (entry.buffer.length > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
return {
buffer: entry.buffer,
mime: entry.bufferMime,
fileName: entry.bufferFileName ?? `media-${params.attachmentIndex + 1}`,
size: entry.buffer.length,
};
}
if (entry.resolvedPath) {
try {
const size = await this.ensureLocalStat(entry);
if (entry.resolvedPath) {
if (size !== undefined && size > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
const { buffer, filePath } = await this.readLocalBuffer({
attachmentIndex: params.attachmentIndex,
filePath: entry.resolvedPath,
maxBytes: params.maxBytes,
});
entry.resolvedPath = filePath;
entry.buffer = buffer;
entry.bufferMime =
entry.bufferMime ??
concreteMime(entry.attachment.mime) ??
(await detectMime({
buffer,
filePath,
}));
entry.bufferFileName = path.basename(filePath) || `media-${params.attachmentIndex + 1}`;
return {
buffer,
mime: entry.bufferMime,
fileName: entry.bufferFileName,
size: buffer.length,
};
}
} catch (err) {
if (
!(err instanceof MediaUnderstandingSkipError) ||
!url ||
(err.reason !== "blocked" && err.reason !== "empty")
) {
throw err;
}
}
}
if (!url) {
throw new MediaUnderstandingSkipError(
"empty",
`Attachment ${params.attachmentIndex + 1} has no path or URL.`,
);
}
try {
const fetched = await readRemoteMediaBuffer({
url,
timeoutMs: params.timeoutMs,
maxBytes: params.maxBytes,
ssrfPolicy: this.ssrfPolicy,
retry: REMOTE_MEDIA_FETCH_RETRY,
});
entry.buffer = fetched.buffer;
entry.bufferMime =
concreteMime(entry.attachment.mime) ??
fetched.contentType ??
(await detectMime({
buffer: fetched.buffer,
filePath: fetched.fileName ?? url,
}));
entry.bufferFileName = fetched.fileName ?? `media-${params.attachmentIndex + 1}`;
return {
buffer: fetched.buffer,
mime: entry.bufferMime,
fileName: entry.bufferFileName,
size: fetched.buffer.length,
};
} catch (err) {
if (err instanceof MediaFetchError && err.code === "max_bytes") {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
if (isAbortError(err)) {
throw new MediaUnderstandingSkipError(
"timeout",
`Attachment ${params.attachmentIndex + 1} timed out while fetching.`,
);
}
throw err;
}
}
/** Returns a local path for providers that cannot accept buffers, creating a temp file if needed. */
async getPath(params: {
attachmentIndex: number;
maxBytes?: number;
timeoutMs: number;
}): Promise<MediaPathResult> {
const entry = await this.ensureEntry(params.attachmentIndex);
if (entry.resolvedPath) {
if (params.maxBytes) {
try {
const size = await this.ensureLocalStat(entry);
if (entry.resolvedPath) {
if (size !== undefined && size > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
}
} catch (err) {
if (
!(err instanceof MediaUnderstandingSkipError) ||
(err.reason !== "blocked" && err.reason !== "empty")
) {
throw err;
}
}
}
if (entry.resolvedPath) {
return { path: entry.resolvedPath };
}
}
if (entry.tempPath) {
if (params.maxBytes && entry.buffer && entry.buffer.length > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
return { path: entry.tempPath, cleanup: entry.tempCleanup };
}
const maxBytes = params.maxBytes ?? Number.POSITIVE_INFINITY;
const bufferResult = await this.getBuffer({
attachmentIndex: params.attachmentIndex,
maxBytes,
timeoutMs: params.timeoutMs,
});
const extension = path.extname(bufferResult.fileName || "") || "";
const tmpPath = buildRandomTempFilePath({
prefix: "openclaw-media",
extension,
});
await fs.writeFile(tmpPath, bufferResult.buffer);
entry.tempPath = tmpPath;
entry.tempCleanup = async () => {
await fs.unlink(tmpPath).catch(() => {});
};
return { path: tmpPath, cleanup: entry.tempCleanup };
}
/** Removes temporary files created by `getPath`; callers should run this after provider use. */
async cleanup(): Promise<void> {
const cleanups: Promise<void>[] = [];
for (const entry of this.entries.values()) {
if (entry.tempCleanup) {
cleanups.push(entry.tempCleanup());
entry.tempCleanup = undefined;
}
}
await Promise.all(cleanups);
}
private async ensureEntry(attachmentIndex: number): Promise<AttachmentCacheEntry> {
const existing = this.entries.get(attachmentIndex);
if (existing) {
if (!existing.resolvedPath) {
existing.resolvedPath = this.resolveLocalPath(existing.attachment);
}
return existing;
}
const attachment = this.attachments.find((item) => item.index === attachmentIndex) ?? {
index: attachmentIndex,
};
const entry: AttachmentCacheEntry = {
attachment,
resolvedPath: this.resolveLocalPath(attachment),
};
this.entries.set(attachmentIndex, entry);
return entry;
}
private resolveLocalPath(attachment: MediaAttachment): string | undefined {
const rawPath = normalizeAttachmentPath(attachment.path);
if (!rawPath) {
return undefined;
}
return this.workspaceDir ? path.resolve(this.workspaceDir, rawPath) : path.resolve(rawPath);
}
private async ensureLocalStat(entry: AttachmentCacheEntry): Promise<number | undefined> {
if (!entry.resolvedPath) {
return undefined;
}
if (!isInboundPathAllowed({ filePath: entry.resolvedPath, roots: this.localPathRoots })) {
entry.resolvedPath = undefined;
if (shouldLogVerbose()) {
logVerbose(
`Blocked attachment path outside allowed roots: ${entry.attachment.path ?? entry.attachment.url ?? "(unknown)"}`,
);
}
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${entry.attachment.index + 1} path is outside allowed roots.`,
);
}
if (entry.statSize !== undefined) {
return entry.statSize;
}
try {
const currentPath = entry.resolvedPath;
const opened = await openLocalFileSafely({ filePath: currentPath });
let canonicalRoots: readonly string[];
try {
canonicalRoots = await this.getCanonicalLocalPathRoots();
} finally {
await opened.handle.close().catch(() => {});
}
if (!isInboundPathAllowed({ filePath: opened.realPath, roots: canonicalRoots })) {
entry.resolvedPath = undefined;
if (shouldLogVerbose()) {
logVerbose(
`Blocked canonicalized attachment path outside allowed roots: ${opened.realPath}`,
);
}
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${entry.attachment.index + 1} path is outside allowed roots.`,
);
}
entry.resolvedPath = opened.realPath;
entry.statSize = opened.stat.size;
return opened.stat.size;
} catch (err) {
if (err instanceof MediaUnderstandingSkipError) {
throw err;
}
if (err instanceof FsSafeError) {
entry.resolvedPath = undefined;
if (err.code === "not-file") {
throw new MediaUnderstandingSkipError(
"empty",
`Attachment ${entry.attachment.index + 1} path is not a regular file.`,
);
}
if (err.code !== "not-found") {
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${entry.attachment.index + 1} path is outside allowed roots.`,
);
}
} else {
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${entry.attachment.index + 1} could not be canonicalized.`,
);
}
entry.resolvedPath = undefined;
if (shouldLogVerbose()) {
logVerbose(`Failed to read attachment ${entry.attachment.index + 1}: ${String(err)}`);
}
return undefined;
}
}
private async getCanonicalLocalPathRoots(): Promise<readonly string[]> {
if (this.canonicalLocalPathRoots) {
return await this.canonicalLocalPathRoots;
}
this.canonicalLocalPathRoots = (async () =>
mergeInboundPathRoots(
this.localPathRoots,
await Promise.all(
this.localPathRoots.map(async (root) => {
if (root.includes("*")) {
return root;
}
return await fs.realpath(root).catch(() => root);
}),
),
))();
return await this.canonicalLocalPathRoots;
}
private async readLocalBuffer(params: {
attachmentIndex: number;
filePath: string;
maxBytes: number;
}): Promise<LocalReadResult> {
let opened: Awaited<ReturnType<typeof openLocalFileSafely>> | undefined;
try {
opened = await openLocalFileSafely({ filePath: params.filePath });
if (opened.stat.size > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
const canonicalRoots = await this.getCanonicalLocalPathRoots();
if (!isInboundPathAllowed({ filePath: opened.realPath, roots: canonicalRoots })) {
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${params.attachmentIndex + 1} path is outside allowed roots.`,
);
}
const buffer = await opened.handle.readFile();
if (buffer.length > params.maxBytes) {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
return { buffer, filePath: opened.realPath };
} catch (err) {
if (err instanceof FsSafeError) {
if (err.code === "too-large") {
throw new MediaUnderstandingSkipError(
"maxBytes",
`Attachment ${params.attachmentIndex + 1} exceeds maxBytes ${params.maxBytes}`,
);
}
if (err.code === "not-file" || err.code === "not-found") {
throw new MediaUnderstandingSkipError(
"empty",
`Attachment ${params.attachmentIndex + 1} path is not a regular file.`,
);
}
throw new MediaUnderstandingSkipError(
"blocked",
`Attachment ${params.attachmentIndex + 1} path is outside allowed roots.`,
);
}
throw err;
} finally {
await opened?.handle.close().catch(() => {});
}
}
}