Skip to content

Commit

Permalink
fix(adapter): fix progress event emitting; (#6518)
Browse files Browse the repository at this point in the history
  • Loading branch information
DigitalBrainJS authored Aug 1, 2024
1 parent 85d4d0e commit e3c76fc
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 151 deletions.
56 changes: 29 additions & 27 deletions lib/adapters/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,10 @@ import AxiosError from "../core/AxiosError.js";
import composeSignals from "../helpers/composeSignals.js";
import {trackStream} from "../helpers/trackStream.js";
import AxiosHeaders from "../core/AxiosHeaders.js";
import progressEventReducer from "../helpers/progressEventReducer.js";
import {progressEventReducer, progressEventDecorator, asyncDecorator} from "../helpers/progressEventReducer.js";
import resolveConfig from "../helpers/resolveConfig.js";
import settle from "../core/settle.js";

const fetchProgressDecorator = (total, fn) => {
const lengthComputable = total != null;
return (loaded) => setTimeout(() => fn({
lengthComputable,
total,
loaded
}));
}

const isFetchSupported = typeof fetch === 'function' && typeof Request === 'function' && typeof Response === 'function';
const isReadableStreamSupported = isFetchSupported && typeof ReadableStream === 'function';

Expand All @@ -26,7 +17,15 @@ const encodeText = isFetchSupported && (typeof TextEncoder === 'function' ?
async (str) => new Uint8Array(await new Response(str).arrayBuffer())
);

const supportsRequestStream = isReadableStreamSupported && (() => {
const test = (fn, ...args) => {
try {
return !!fn(...args);
} catch (e) {
return false
}
}

const supportsRequestStream = isReadableStreamSupported && test(() => {
let duplexAccessed = false;

const hasContentType = new Request(platform.origin, {
Expand All @@ -39,17 +38,13 @@ const supportsRequestStream = isReadableStreamSupported && (() => {
}).headers.has('Content-Type');

return duplexAccessed && !hasContentType;
})();
});

const DEFAULT_CHUNK_SIZE = 64 * 1024;

const supportsResponseStream = isReadableStreamSupported && !!(()=> {
try {
return utils.isReadableStream(new Response('').body);
} catch(err) {
// return undefined
}
})();
const supportsResponseStream = isReadableStreamSupported &&
test(() => utils.isReadableStream(new Response('').body));


const resolvers = {
stream: supportsResponseStream && ((res) => res.body)
Expand Down Expand Up @@ -77,7 +72,7 @@ const getBodyLength = async (body) => {
return (await new Request(body).arrayBuffer()).byteLength;
}

if(utils.isArrayBufferView(body)) {
if(utils.isArrayBufferView(body) || utils.isArrayBuffer(body)) {
return body.byteLength;
}

Expand Down Expand Up @@ -147,10 +142,12 @@ export default isFetchSupported && (async (config) => {
}

if (_request.body) {
data = trackStream(_request.body, DEFAULT_CHUNK_SIZE, fetchProgressDecorator(
const [onProgress, flush] = progressEventDecorator(
requestContentLength,
progressEventReducer(onUploadProgress)
), null, encodeText);
progressEventReducer(asyncDecorator(onUploadProgress))
);

data = trackStream(_request.body, DEFAULT_CHUNK_SIZE, onProgress, flush, encodeText);
}
}

Expand Down Expand Up @@ -181,11 +178,16 @@ export default isFetchSupported && (async (config) => {

const responseContentLength = utils.toFiniteNumber(response.headers.get('content-length'));

const [onProgress, flush] = onDownloadProgress && progressEventDecorator(
responseContentLength,
progressEventReducer(asyncDecorator(onDownloadProgress), true)
) || [];

response = new Response(
trackStream(response.body, DEFAULT_CHUNK_SIZE, onDownloadProgress && fetchProgressDecorator(
responseContentLength,
progressEventReducer(onDownloadProgress, true)
), isStreamResponse && onFinish, encodeText),
trackStream(response.body, DEFAULT_CHUNK_SIZE, onProgress, () => {
flush && flush();
isStreamResponse && onFinish();
}, encodeText),
options
);
}
Expand Down
40 changes: 25 additions & 15 deletions lib/adapters/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import formDataToStream from "../helpers/formDataToStream.js";
import readBlob from "../helpers/readBlob.js";
import ZlibHeaderTransformStream from '../helpers/ZlibHeaderTransformStream.js';
import callbackify from "../helpers/callbackify.js";
import {progressEventReducer, progressEventDecorator, asyncDecorator} from "../helpers/progressEventReducer.js";

const zlibOptions = {
flush: zlib.constants.Z_SYNC_FLUSH,
Expand All @@ -45,6 +46,14 @@ const supportedProtocols = platform.protocols.map(protocol => {
return protocol + ':';
});

const flushOnFinish = (stream, [throttled, flush]) => {
stream
.on('end', flush)
.on('error', flush);

return throttled;
}

/**
* If the proxy or config beforeRedirects functions are defined, call them with the options
* object.
Expand Down Expand Up @@ -278,8 +287,7 @@ export default isHttpAdapterSupported && function httpAdapter(config) {
// Only set header if it hasn't been set in config
headers.set('User-Agent', 'axios/' + VERSION, false);

const onDownloadProgress = config.onDownloadProgress;
const onUploadProgress = config.onUploadProgress;
const {onUploadProgress, onDownloadProgress} = config;
const maxRate = config.maxRate;
let maxUploadRate = undefined;
let maxDownloadRate = undefined;
Expand Down Expand Up @@ -352,15 +360,16 @@ export default isHttpAdapterSupported && function httpAdapter(config) {
}

data = stream.pipeline([data, new AxiosTransformStream({
length: contentLength,
maxRate: utils.toFiniteNumber(maxUploadRate)
})], utils.noop);

onUploadProgress && data.on('progress', progress => {
onUploadProgress(Object.assign(progress, {
upload: true
}));
});
onUploadProgress && data.on('progress', flushOnFinish(
data,
progressEventDecorator(
contentLength,
progressEventReducer(asyncDecorator(onUploadProgress), false, 3)
)
));
}

// HTTP basic authentication
Expand Down Expand Up @@ -459,17 +468,18 @@ export default isHttpAdapterSupported && function httpAdapter(config) {

const responseLength = +res.headers['content-length'];

if (onDownloadProgress) {
if (onDownloadProgress || maxDownloadRate) {
const transformStream = new AxiosTransformStream({
length: utils.toFiniteNumber(responseLength),
maxRate: utils.toFiniteNumber(maxDownloadRate)
});

onDownloadProgress && transformStream.on('progress', progress => {
onDownloadProgress(Object.assign(progress, {
download: true
}));
});
onDownloadProgress && transformStream.on('progress', flushOnFinish(
transformStream,
progressEventDecorator(
responseLength,
progressEventReducer(asyncDecorator(onDownloadProgress), true, 3)
)
));

streams.push(transformStream);
}
Expand Down
31 changes: 19 additions & 12 deletions lib/adapters/xhr.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import CanceledError from '../cancel/CanceledError.js';
import parseProtocol from '../helpers/parseProtocol.js';
import platform from '../platform/index.js';
import AxiosHeaders from '../core/AxiosHeaders.js';
import progressEventReducer from '../helpers/progressEventReducer.js';
import {progressEventReducer} from '../helpers/progressEventReducer.js';
import resolveConfig from "../helpers/resolveConfig.js";

const isXHRAdapterSupported = typeof XMLHttpRequest !== 'undefined';
Expand All @@ -16,16 +16,18 @@ export default isXHRAdapterSupported && function (config) {
const _config = resolveConfig(config);
let requestData = _config.data;
const requestHeaders = AxiosHeaders.from(_config.headers).normalize();
let {responseType} = _config;
let {responseType, onUploadProgress, onDownloadProgress} = _config;
let onCanceled;
let uploadThrottled, downloadThrottled;
let flushUpload, flushDownload;

function done() {
if (_config.cancelToken) {
_config.cancelToken.unsubscribe(onCanceled);
}
flushUpload && flushUpload(); // flush events
flushDownload && flushDownload(); // flush events

if (_config.signal) {
_config.signal.removeEventListener('abort', onCanceled);
}
_config.cancelToken && _config.cancelToken.unsubscribe(onCanceled);

_config.signal && _config.signal.removeEventListener('abort', onCanceled);
}

let request = new XMLHttpRequest();
Expand Down Expand Up @@ -149,13 +151,18 @@ export default isXHRAdapterSupported && function (config) {
}

// Handle progress if needed
if (typeof _config.onDownloadProgress === 'function') {
request.addEventListener('progress', progressEventReducer(_config.onDownloadProgress, true));
if (onDownloadProgress) {
([downloadThrottled, flushDownload] = progressEventReducer(onDownloadProgress, true));
request.addEventListener('progress', downloadThrottled);
}

// Not all browsers support upload events
if (typeof _config.onUploadProgress === 'function' && request.upload) {
request.upload.addEventListener('progress', progressEventReducer(_config.onUploadProgress));
if (onUploadProgress && request.upload) {
([uploadThrottled, flushUpload] = progressEventReducer(onUploadProgress));

request.upload.addEventListener('progress', uploadThrottled);

request.upload.addEventListener('loadend', flushUpload);
}

if (_config.cancelToken || _config.signal) {
Expand Down
55 changes: 3 additions & 52 deletions lib/helpers/AxiosTransformStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import stream from 'stream';
import utils from '../utils.js';
import throttle from './throttle.js';
import speedometer from './speedometer.js';

const kInternals = Symbol('internals');

Expand All @@ -24,12 +22,8 @@ class AxiosTransformStream extends stream.Transform{
readableHighWaterMark: options.chunkSize
});

const self = this;

const internals = this[kInternals] = {
length: options.length,
timeWindow: options.timeWindow,
ticksRate: options.ticksRate,
chunkSize: options.chunkSize,
maxRate: options.maxRate,
minChunkSize: options.minChunkSize,
Expand All @@ -41,48 +35,13 @@ class AxiosTransformStream extends stream.Transform{
onReadCallback: null
};

const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow);

this.on('newListener', event => {
if (event === 'progress') {
if (!internals.isCaptured) {
internals.isCaptured = true;
}
}
});

let bytesNotified = 0;

internals.updateProgress = throttle(function throttledHandler() {
const totalBytes = internals.length;
const bytesTransferred = internals.bytesSeen;
const progressBytes = bytesTransferred - bytesNotified;
if (!progressBytes || self.destroyed) return;

const rate = _speedometer(progressBytes);

bytesNotified = bytesTransferred;

process.nextTick(() => {
self.emit('progress', {
loaded: bytesTransferred,
total: totalBytes,
progress: totalBytes ? (bytesTransferred / totalBytes) : undefined,
bytes: progressBytes,
rate: rate ? rate : undefined,
estimated: rate && totalBytes && bytesTransferred <= totalBytes ?
(totalBytes - bytesTransferred) / rate : undefined,
lengthComputable: totalBytes != null
});
});
}, internals.ticksRate);

const onFinish = () => {
internals.updateProgress.call(true);
};

this.once('end', onFinish);
this.once('error', onFinish);
}

_read(size) {
Expand All @@ -96,7 +55,6 @@ class AxiosTransformStream extends stream.Transform{
}

_transform(chunk, encoding, callback) {
const self = this;
const internals = this[kInternals];
const maxRate = internals.maxRate;

Expand All @@ -108,16 +66,14 @@ class AxiosTransformStream extends stream.Transform{
const bytesThreshold = (maxRate / divider);
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;

function pushChunk(_chunk, _callback) {
const pushChunk = (_chunk, _callback) => {
const bytes = Buffer.byteLength(_chunk);
internals.bytesSeen += bytes;
internals.bytes += bytes;

if (internals.isCaptured) {
internals.updateProgress();
}
internals.isCaptured && this.emit('progress', internals.bytesSeen);

if (self.push(_chunk)) {
if (this.push(_chunk)) {
process.nextTick(_callback);
} else {
internals.onReadCallback = () => {
Expand Down Expand Up @@ -182,11 +138,6 @@ class AxiosTransformStream extends stream.Transform{
}
});
}

setLength(length) {
this[kInternals].length = +length;
return this;
}
}

export default AxiosTransformStream;
20 changes: 16 additions & 4 deletions lib/helpers/progressEventReducer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import speedometer from "./speedometer.js";
import throttle from "./throttle.js";
import utils from "../utils.js";

export default (listener, isDownloadStream, freq = 3) => {
export const progressEventReducer = (listener, isDownloadStream, freq = 3) => {
let bytesNotified = 0;
const _speedometer = speedometer(50, 250);

Expand All @@ -22,11 +23,22 @@ export default (listener, isDownloadStream, freq = 3) => {
rate: rate ? rate : undefined,
estimated: rate && total && inRange ? (total - loaded) / rate : undefined,
event: e,
lengthComputable: total != null
lengthComputable: total != null,
[isDownloadStream ? 'download' : 'upload']: true
};

data[isDownloadStream ? 'download' : 'upload'] = true;

listener(data);
}, freq);
}

export const progressEventDecorator = (total, throttled) => {
const lengthComputable = total != null;

return [(loaded) => throttled[0]({
lengthComputable,
total,
loaded
}), throttled[1]];
}

export const asyncDecorator = (fn) => (...args) => utils.asap(() => fn(...args));
Loading

0 comments on commit e3c76fc

Please sign in to comment.