|
|
'use strict';
import stream from 'stream'; import utils from '../utils.js';
const kInternals = Symbol('internals');
class AxiosTransformStream extends stream.Transform{ constructor(options) { options = utils.toFlatObject(options, { maxRate: 0, chunkSize: 64 * 1024, minChunkSize: 100, timeWindow: 500, ticksRate: 2, samplesCount: 15 }, null, (prop, source) => { return !utils.isUndefined(source[prop]); });
super({ readableHighWaterMark: options.chunkSize });
const internals = this[kInternals] = { timeWindow: options.timeWindow, chunkSize: options.chunkSize, maxRate: options.maxRate, minChunkSize: options.minChunkSize, bytesSeen: 0, isCaptured: false, notifiedBytesLoaded: 0, ts: Date.now(), bytes: 0, onReadCallback: null };
this.on('newListener', event => { if (event === 'progress') { if (!internals.isCaptured) { internals.isCaptured = true; } } }); }
_read(size) { const internals = this[kInternals];
if (internals.onReadCallback) { internals.onReadCallback(); }
return super._read(size); }
_transform(chunk, encoding, callback) { const internals = this[kInternals]; const maxRate = internals.maxRate;
const readableHighWaterMark = this.readableHighWaterMark;
const timeWindow = internals.timeWindow;
const divider = 1000 / timeWindow; const bytesThreshold = (maxRate / divider); const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
const pushChunk = (_chunk, _callback) => { const bytes = Buffer.byteLength(_chunk); internals.bytesSeen += bytes; internals.bytes += bytes;
internals.isCaptured && this.emit('progress', internals.bytesSeen);
if (this.push(_chunk)) { process.nextTick(_callback); } else { internals.onReadCallback = () => { internals.onReadCallback = null; process.nextTick(_callback); }; } }
const transformChunk = (_chunk, _callback) => { const chunkSize = Buffer.byteLength(_chunk); let chunkRemainder = null; let maxChunkSize = readableHighWaterMark; let bytesLeft; let passed = 0;
if (maxRate) { const now = Date.now();
if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { internals.ts = now; bytesLeft = bytesThreshold - internals.bytes; internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; passed = 0; }
bytesLeft = bytesThreshold - internals.bytes; }
if (maxRate) { if (bytesLeft <= 0) { // next time window
return setTimeout(() => { _callback(null, _chunk); }, timeWindow - passed); }
if (bytesLeft < maxChunkSize) { maxChunkSize = bytesLeft; } }
if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { chunkRemainder = _chunk.subarray(maxChunkSize); _chunk = _chunk.subarray(0, maxChunkSize); }
pushChunk(_chunk, chunkRemainder ? () => { process.nextTick(_callback, null, chunkRemainder); } : _callback); };
transformChunk(chunk, function transformNextChunk(err, _chunk) { if (err) { return callback(err); }
if (_chunk) { transformChunk(_chunk, transformNextChunk); } else { callback(null); } }); } }
export default AxiosTransformStream;
|