Skip to content

Commit

Permalink
fix(fetch): optimize signals composing logic; (#6582)
Browse files Browse the repository at this point in the history
  • Loading branch information
DigitalBrainJS authored Aug 30, 2024
1 parent ee208cf commit df9889b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 46 deletions.
27 changes: 10 additions & 17 deletions lib/adapters/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,13 @@ export default isFetchSupported && (async (config) => {

responseType = responseType ? (responseType + '').toLowerCase() : 'text';

let [composedSignal, stopTimeout] = (signal || cancelToken || timeout) ?
composeSignals([signal, cancelToken], timeout) : [];
let composedSignal = composeSignals([signal, cancelToken && cancelToken.toAbortSignal()], timeout);

let finished, request;
let request;

const onFinish = () => {
!finished && setTimeout(() => {
composedSignal && composedSignal.unsubscribe();
});

finished = true;
}
const unsubscribe = composedSignal && composedSignal.unsubscribe && (() => {
composedSignal.unsubscribe();
});

let requestContentLength;

Expand Down Expand Up @@ -161,7 +156,7 @@ export default isFetchSupported && (async (config) => {

// Cloudflare Workers throws when credentials are defined
// see https://github.com/cloudflare/workerd/issues/902
const isCredentialsSupported = "credentials" in Request.prototype;
const isCredentialsSupported = "credentials" in Request.prototype;
request = new Request(url, {
...fetchOptions,
signal: composedSignal,
Expand All @@ -176,7 +171,7 @@ export default isFetchSupported && (async (config) => {

const isStreamResponse = supportsResponseStream && (responseType === 'stream' || responseType === 'response');

if (supportsResponseStream && (onDownloadProgress || isStreamResponse)) {
if (supportsResponseStream && (onDownloadProgress || (isStreamResponse && unsubscribe))) {
const options = {};

['status', 'statusText', 'headers'].forEach(prop => {
Expand All @@ -193,7 +188,7 @@ export default isFetchSupported && (async (config) => {
response = new Response(
trackStream(response.body, DEFAULT_CHUNK_SIZE, onProgress, () => {
flush && flush();
isStreamResponse && onFinish();
unsubscribe && unsubscribe();
}, encodeText),
options
);
Expand All @@ -203,9 +198,7 @@ export default isFetchSupported && (async (config) => {

let responseData = await resolvers[utils.findKey(resolvers, responseType) || 'text'](response, config);

!isStreamResponse && onFinish();

stopTimeout && stopTimeout();
!isStreamResponse && unsubscribe && unsubscribe();

return await new Promise((resolve, reject) => {
settle(resolve, reject, {
Expand All @@ -218,7 +211,7 @@ export default isFetchSupported && (async (config) => {
})
})
} catch (err) {
onFinish();
unsubscribe && unsubscribe();

if (err && err.name === 'TypeError' && /fetch/i.test(err.message)) {
throw Object.assign(
Expand Down
14 changes: 14 additions & 0 deletions lib/cancel/CancelToken.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ class CancelToken {
}
}

toAbortSignal() {
const controller = new AbortController();

const abort = (err) => {
controller.abort(err);
};

this.subscribe(abort);

controller.signal.unsubscribe = () => this.unsubscribe(abort);

return controller.signal;
}

/**
* Returns an object that contains a new `CancelToken` and a function that, when called,
* cancels the `CancelToken`.
Expand Down
60 changes: 31 additions & 29 deletions lib/helpers/composeSignals.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,48 @@
import CanceledError from "../cancel/CanceledError.js";
import AxiosError from "../core/AxiosError.js";
import utils from '../utils.js';

const composeSignals = (signals, timeout) => {
let controller = new AbortController();
const {length} = (signals = signals ? signals.filter(Boolean) : []);

let aborted;
if (timeout || length) {
let controller = new AbortController();

const onabort = function (cancel) {
if (!aborted) {
aborted = true;
unsubscribe();
const err = cancel instanceof Error ? cancel : this.reason;
controller.abort(err instanceof AxiosError ? err : new CanceledError(err instanceof Error ? err.message : err));
}
}
let aborted;

let timer = timeout && setTimeout(() => {
onabort(new AxiosError(`timeout ${timeout} of ms exceeded`, AxiosError.ETIMEDOUT))
}, timeout)
const onabort = function (reason) {
if (!aborted) {
aborted = true;
unsubscribe();
const err = reason instanceof Error ? reason : this.reason;
controller.abort(err instanceof AxiosError ? err : new CanceledError(err instanceof Error ? err.message : err));
}
}

const unsubscribe = () => {
if (signals) {
timer && clearTimeout(timer);
let timer = timeout && setTimeout(() => {
timer = null;
signals.forEach(signal => {
signal &&
(signal.removeEventListener ? signal.removeEventListener('abort', onabort) : signal.unsubscribe(onabort));
});
signals = null;
onabort(new AxiosError(`timeout ${timeout} of ms exceeded`, AxiosError.ETIMEDOUT))
}, timeout)

const unsubscribe = () => {
if (signals) {
timer && clearTimeout(timer);
timer = null;
signals.forEach(signal => {
signal.unsubscribe ? signal.unsubscribe(onabort) : signal.removeEventListener('abort', onabort);
});
signals = null;
}
}
}

signals.forEach((signal) => signal && signal.addEventListener && signal.addEventListener('abort', onabort));
signals.forEach((signal) => signal.addEventListener('abort', onabort));

const {signal} = controller;
const {signal} = controller;

signal.unsubscribe = unsubscribe;
signal.unsubscribe = () => utils.asap(unsubscribe);

return [signal, () => {
timer && clearTimeout(timer);
timer = null;
}];
return signal;
}
}

export default composeSignals;
43 changes: 43 additions & 0 deletions test/unit/helpers/composeSignals.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import assert from 'assert';
import composeSignals from '../../../lib/helpers/composeSignals.js';

describe('helpers::composeSignals', () => {
before(function () {
if (typeof AbortController !== 'function') {
this.skip();
}
});

it('should abort when any of the signals abort', () => {
let called;

const controllerA = new AbortController();
const controllerB = new AbortController();

const signal = composeSignals([controllerA.signal, controllerB.signal]);

signal.addEventListener('abort', () => {
called = true;
});

controllerA.abort(new Error('test'));

assert.ok(called);
});

it('should abort on timeout', async () => {
const signal = composeSignals([], 100);

await new Promise(resolve => {
signal.addEventListener('abort', resolve);
});

assert.match(String(signal.reason), /timeout 100 of ms exceeded/);
});

it('should return undefined if signals and timeout are not provided', async () => {
const signal = composeSignals([]);

assert.strictEqual(signal, undefined);
});
});

0 comments on commit df9889b

Please sign in to comment.