Skip to content

Commit

Permalink
fix: request abort signal (nodejs#3209)
Browse files Browse the repository at this point in the history
* fix: request abort signal

* fixup

* fixup

* fixup
  • Loading branch information
ronag authored May 7, 2024
1 parent d51dabc commit 9f26aff
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 8 deletions.
36 changes: 30 additions & 6 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

const assert = require('node:assert')
const { Readable } = require('./readable')
const { InvalidArgumentError } = require('../core/errors')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

class RequestHandler extends AsyncResource {
constructor (opts, callback) {
Expand Down Expand Up @@ -56,14 +55,33 @@ class RequestHandler extends AsyncResource {
this.onInfo = onInfo || null
this.throwOnError = throwOnError
this.highWaterMark = highWaterMark
this.signal = signal
this.reason = null
this.removeAbortListener = null

if (util.isStream(body)) {
body.on('error', (err) => {
this.onError(err)
})
}

addSignal(this, signal)
if (this.signal) {
if (this.signal.aborted) {
this.reason = this.signal.reason ?? new RequestAbortedError()
} else {
this.removeAbortListener = util.addAbortListener(this.signal, () => {
this.removeAbortListener?.()
this.removeAbortListener = null

this.reason = this.signal.reason ?? new RequestAbortedError()
if (this.res) {
util.destroy(this.res, this.reason)
} else if (this.abort) {
this.abort(this.reason)
}
})
}
}
}

onConnect (abort, context) {
Expand Down Expand Up @@ -95,6 +113,13 @@ class RequestHandler extends AsyncResource {
const contentLength = parsedHeaders['content-length']
const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark })

if (this.removeAbortListener) {
// TODO (fix): 'close' is sufficient but breaks tests.
body
.on('end', this.removeAbortListener)
.on('error', this.removeAbortListener)
}

this.callback = null
this.res = body
if (callback !== null) {
Expand Down Expand Up @@ -123,8 +148,6 @@ class RequestHandler extends AsyncResource {
onComplete (trailers) {
const { res } = this

removeSignal(this)

util.parseHeaders(trailers, this.trailers)

res.push(null)
Expand All @@ -133,7 +156,8 @@ class RequestHandler extends AsyncResource {
onError (err) {
const { res, callback, body, opaque } = this

removeSignal(this)
this.removeAbortListener?.()
this.removeAbortListener = null

if (callback) {
// TODO: Does this need queueMicrotask?
Expand Down
4 changes: 2 additions & 2 deletions test/issue-2590.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ test('aborting request with custom reason', async (t) => {

await t.rejects(
request(`http://localhost:${server.address().port}`, { signal: ac.signal }),
/Request aborted/
/Error: aborted/
)

await t.rejects(
request(`http://localhost:${server.address().port}`, { signal: ac2.signal }),
{ code: 'UND_ERR_ABORTED' }
{ name: 'AbortError' }
)

await t.completed
Expand Down
76 changes: 76 additions & 0 deletions test/request-signal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict'

const { createServer } = require('node:http')
const { test, after } = require('node:test')
const { tspl } = require('@matteo.collina/tspl')
const { request } = require('..')

test('pre abort signal w/ reason', async (t) => {
t = tspl(t, { plan: 1 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const ac = new AbortController()
const _err = new Error()
ac.abort(_err)
try {
await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
} catch (err) {
t.equal(err, _err)
}
})
await t.completed
})

test('post abort signal', async (t) => {
t = tspl(t, { plan: 1 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const ac = new AbortController()
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
ac.abort()
try {
/* eslint-disable-next-line no-unused-vars */
for await (const chunk of ures.body) {
// Do nothing...
}
} catch (err) {
t.equal(err.name, 'AbortError')
}
})
await t.completed
})

test('post abort signal w/ reason', async (t) => {
t = tspl(t, { plan: 1 })

const server = createServer((req, res) => {
res.end('asd')
})
after(() => server.close())

server.listen(0, async () => {
const ac = new AbortController()
const _err = new Error()
const ures = await request(`http://0.0.0.0:${server.address().port}`, { signal: ac.signal })
ac.abort(_err)
try {
/* eslint-disable-next-line no-unused-vars */
for await (const chunk of ures.body) {
// Do nothing...
}
} catch (err) {
t.equal(err, _err)
}
})
await t.completed
})

0 comments on commit 9f26aff

Please sign in to comment.