fix(media): handle ffprobe stdin EPIPE

Handle broken-pipe errors from stdin-backed ffprobe without leaking as uncaught exceptions.
This commit is contained in:
openclaw-clownfish[bot]
2026-04-29 00:49:52 -07:00
committed by GitHub
parent 5cc834a11a
commit 0f11dcd15f
2 changed files with 102 additions and 2 deletions

View File

@@ -1,5 +1,64 @@
import { describe, expect, it } from "vitest";
import { parseFfprobeCodecAndSampleRate, parseFfprobeCsvFields } from "./ffmpeg-exec.js";
import type { ChildProcess, ExecFileOptions } from "node:child_process";
import { EventEmitter } from "node:events";
import { PassThrough } from "node:stream";
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
parseFfprobeCodecAndSampleRate,
parseFfprobeCsvFields,
runFfprobe,
} from "./ffmpeg-exec.js";
const { execFileMock, resolveSystemBinMock } = vi.hoisted(() => ({
execFileMock: vi.fn(),
resolveSystemBinMock: vi.fn(),
}));
vi.mock("node:child_process", async (importOriginal) => ({
...(await importOriginal<typeof import("node:child_process")>()),
execFile: execFileMock,
}));
vi.mock("../infra/resolve-system-bin.js", () => ({
resolveSystemBin: resolveSystemBinMock,
}));
type ExecFileCallback = (
error: Error | null,
stdout: string | Buffer,
stderr: string | Buffer,
) => void;
function createExecFileChild(): ChildProcess {
const child = new EventEmitter() as ChildProcess;
child.stdin = new PassThrough() as ChildProcess["stdin"];
return child;
}
function mockFfprobeExecFile(child: ChildProcess): {
execCallback: () => ExecFileCallback;
} {
let execCallback: ExecFileCallback | undefined;
execFileMock.mockImplementationOnce(
(_file: string, _args: string[], _options: ExecFileOptions, callback: ExecFileCallback) => {
execCallback = callback;
return child;
},
);
return {
execCallback: () => {
if (!execCallback) {
throw new Error("execFile callback was not captured");
}
return execCallback;
},
};
}
beforeEach(() => {
execFileMock.mockReset();
resolveSystemBinMock.mockReset();
resolveSystemBinMock.mockReturnValue("/usr/bin/ffprobe");
});
describe("parseFfprobeCsvFields", () => {
function expectParsedFfprobeCsvCase(input: string, fieldCount: number, expected: string[]) {
@@ -43,3 +102,32 @@ describe("parseFfprobeCodecAndSampleRate", () => {
expectParsedCodecAndSampleRateCase(input, expected);
});
});
describe("runFfprobe", () => {
it("handles stdin EPIPE without overriding successful ffprobe stdout", async () => {
const child = createExecFileChild();
const { execCallback } = mockFfprobeExecFile(child);
const promise = runFfprobe(["pipe:0"], { input: Buffer.alloc(1024) });
const stdinError = Object.assign(new Error("write EPIPE"), { code: "EPIPE" });
expect(() => child.stdin?.emit("error", stdinError)).not.toThrow();
execCallback()(null, Buffer.from("ok"), Buffer.alloc(0));
await expect(promise).resolves.toBe("ok");
});
it("preserves the child callback error after stdin EPIPE", async () => {
const child = createExecFileChild();
const { execCallback } = mockFfprobeExecFile(child);
const promise = runFfprobe(["pipe:0"], { input: Buffer.alloc(1024) });
const stdinError = Object.assign(new Error("write EPIPE"), { code: "EPIPE" });
expect(() => child.stdin?.emit("error", stdinError)).not.toThrow();
const childError = new Error("ffprobe failed");
execCallback()(childError, "", "");
await expect(promise).rejects.toBe(childError);
});
});

View File

@@ -41,6 +41,10 @@ function requireSystemBin(name: string): string {
return resolved;
}
function isBrokenPipeError(error: Error): boolean {
return (error as NodeJS.ErrnoException).code === "EPIPE";
}
export async function runFfprobe(args: string[], options?: MediaExecOptions): Promise<string> {
const execOptions = resolveExecOptions(MEDIA_FFPROBE_TIMEOUT_MS, options);
if (options?.input == null) {
@@ -49,13 +53,21 @@ export async function runFfprobe(args: string[], options?: MediaExecOptions): Pr
}
return await new Promise<string>((resolve, reject) => {
let stdinWriteError: Error | undefined;
const proc = execFile(requireSystemBin("ffprobe"), args, execOptions, (err, stdout) => {
if (err) {
reject(err);
return;
}
if (stdinWriteError && !isBrokenPipeError(stdinWriteError)) {
reject(stdinWriteError);
return;
}
resolve(stdout.toString());
});
proc.stdin?.once("error", (err: Error) => {
stdinWriteError = err;
});
proc.stdin?.end(options.input);
});
}