|
|
import isPlainObject from 'is-plain-obj';import {normalizePipeArguments} from './pipe-arguments.js';import {handlePipeArgumentsError} from './throw.js';import {waitForBothSubprocesses} from './sequence.js';import {pipeSubprocessStream} from './streaming.js';import {unpipeOnAbort} from './abort.js';
// Pipe a subprocess' `stdout`/`stderr`/`stdio` into another subprocess' `stdin`
export const pipeToSubprocess = (sourceInfo, ...pipeArguments) => { if (isPlainObject(pipeArguments[0])) { return pipeToSubprocess.bind(undefined, { ...sourceInfo, boundOptions: {...sourceInfo.boundOptions, ...pipeArguments[0]}, }); }
const {destination, ...normalizedInfo} = normalizePipeArguments(sourceInfo, ...pipeArguments); const promise = handlePipePromise({...normalizedInfo, destination}); promise.pipe = pipeToSubprocess.bind(undefined, { ...sourceInfo, source: destination, sourcePromise: promise, boundOptions: {}, }); return promise;};
// Asynchronous logic when piping subprocesses
const handlePipePromise = async ({ sourcePromise, sourceStream, sourceOptions, sourceError, destination, destinationStream, destinationError, unpipeSignal, fileDescriptors, startTime,}) => { const subprocessPromises = getSubprocessPromises(sourcePromise, destination); handlePipeArgumentsError({ sourceStream, sourceError, destinationStream, destinationError, fileDescriptors, sourceOptions, startTime, }); const maxListenersController = new AbortController(); try { const mergedStream = pipeSubprocessStream(sourceStream, destinationStream, maxListenersController); return await Promise.race([ waitForBothSubprocesses(subprocessPromises), ...unpipeOnAbort(unpipeSignal, { sourceStream, mergedStream, sourceOptions, fileDescriptors, startTime, }), ]); } finally { maxListenersController.abort(); }};
// `.pipe()` awaits the subprocess promises.
// When invalid arguments are passed to `.pipe()`, we throw an error, which prevents awaiting them.
// We need to ensure this does not create unhandled rejections.
const getSubprocessPromises = (sourcePromise, destination) => Promise.allSettled([sourcePromise, destination]);
|