summaryrefslogtreecommitdiffstats
path: root/test/client-pipeline.js
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-21 20:56:19 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-21 20:56:19 +0000
commit0b6210cd37b68b94252cb798598b12974a20e1c1 (patch)
treee371686554a877842d95aa94f100bee552ff2a8e /test/client-pipeline.js
parentInitial commit. (diff)
downloadnode-undici-0b6210cd37b68b94252cb798598b12974a20e1c1.tar.xz
node-undici-0b6210cd37b68b94252cb798598b12974a20e1c1.zip
Adding upstream version 5.28.2+dfsg1+~cs23.11.12.3.upstream/5.28.2+dfsg1+_cs23.11.12.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/client-pipeline.js')
-rw-r--r--test/client-pipeline.js1042
1 files changed, 1042 insertions, 0 deletions
diff --git a/test/client-pipeline.js b/test/client-pipeline.js
new file mode 100644
index 0000000..9b677a0
--- /dev/null
+++ b/test/client-pipeline.js
@@ -0,0 +1,1042 @@
+'use strict'
+
+const { test } = require('tap')
+const { Client, errors } = require('..')
+const EE = require('events')
+const { createServer } = require('http')
+const {
+ pipeline,
+ Readable,
+ Transform,
+ Writable,
+ PassThrough
+} = require('stream')
+const { nodeMajor } = require('../lib/core/util')
+
+test('pipeline get', (t) => {
+ t.plan(17)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ t.equal(`localhost:${server.address().port}`, req.headers.host)
+ t.equal(undefined, req.headers['content-length'])
+ res.setHeader('Content-Type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ {
+ const bufs = []
+ const signal = new EE()
+ client.pipeline({ signal, path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ t.equal(signal.listenerCount('abort'), 1)
+ return body
+ })
+ .end()
+ .on('data', (buf) => {
+ bufs.push(buf)
+ })
+ .on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ .on('close', () => {
+ t.equal(signal.listenerCount('abort'), 0)
+ })
+ t.equal(signal.listenerCount('abort'), 1)
+ }
+
+ {
+ const bufs = []
+ client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ return body
+ })
+ .end()
+ .on('data', (buf) => {
+ bufs.push(buf)
+ })
+ .on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ }
+ })
+})
+
+test('pipeline echo', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let res = ''
+ const buf1 = Buffer.alloc(1e3).toString()
+ const buf2 = Buffer.alloc(1e6).toString()
+ pipeline(
+ new Readable({
+ read () {
+ this.push(buf1)
+ this.push(buf2)
+ this.push(null)
+ }
+ }),
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, ({ body }) => {
+ return pipeline(body, new PassThrough(), () => {})
+ }),
+ new Writable({
+ write (chunk, encoding, callback) {
+ res += chunk.toString()
+ callback()
+ },
+ final (callback) {
+ t.equal(res, buf1 + buf2)
+ callback()
+ }
+ }),
+ (err) => {
+ t.error(err)
+ }
+ )
+ })
+})
+
+test('pipeline ignore request body', (t) => {
+ t.plan(2)
+
+ let done
+ const server = createServer((req, res) => {
+ res.write('asd')
+ res.end()
+ done()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let res = ''
+ const buf1 = Buffer.alloc(1e3).toString()
+ const buf2 = Buffer.alloc(1e6).toString()
+ pipeline(
+ new Readable({
+ read () {
+ this.push(buf1)
+ this.push(buf2)
+ done = () => this.push(null)
+ }
+ }),
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, ({ body }) => {
+ return pipeline(body, new PassThrough(), () => {})
+ }),
+ new Writable({
+ write (chunk, encoding, callback) {
+ res += chunk.toString()
+ callback()
+ },
+ final (callback) {
+ t.equal(res, 'asd')
+ callback()
+ }
+ }),
+ (err) => {
+ t.error(err)
+ }
+ )
+ })
+})
+
+test('pipeline invalid handler', (t) => {
+ t.plan(1)
+
+ const client = new Client('http://localhost:5000')
+ client.pipeline({}, null).on('error', (err) => {
+ t.ok(/handler/.test(err))
+ })
+})
+
+test('pipeline invalid handler return after destroy should not error', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 3
+ })
+ t.teardown(client.destroy.bind(client))
+
+ const dup = client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ body.on('error', (err) => {
+ t.equal(err.message, 'asd')
+ })
+ dup.destroy(new Error('asd'))
+ return {}
+ })
+ .on('error', (err) => {
+ t.equal(err.message, 'asd')
+ })
+ .on('close', () => {
+ t.pass()
+ })
+ .end()
+ })
+})
+
+test('pipeline error body', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const buf = Buffer.alloc(1e6).toString()
+ pipeline(
+ new Readable({
+ read () {
+ this.push(buf)
+ }
+ }),
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ process.nextTick(() => {
+ pt.destroy(new Error('asd'))
+ })
+ body.on('error', (err) => {
+ t.ok(err)
+ })
+ return pipeline(body, pt, () => {})
+ }),
+ new PassThrough(),
+ (err) => {
+ t.ok(err)
+ }
+ )
+ })
+})
+
+test('pipeline destroy body', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const buf = Buffer.alloc(1e6).toString()
+ pipeline(
+ new Readable({
+ read () {
+ this.push(buf)
+ }
+ }),
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ process.nextTick(() => {
+ pt.destroy()
+ })
+ body.on('error', (err) => {
+ t.ok(err)
+ })
+ return pipeline(body, pt, () => {})
+ }),
+ new PassThrough(),
+ (err) => {
+ t.ok(err)
+ }
+ )
+ })
+})
+
+test('pipeline backpressure', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const buf = Buffer.alloc(1e6).toString()
+ const duplex = client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ return pipeline(body, pt, () => {})
+ })
+
+ duplex.end(buf)
+ duplex.on('data', () => {
+ duplex.pause()
+ setImmediate(() => {
+ duplex.resume()
+ })
+ }).on('end', () => {
+ t.pass()
+ })
+ })
+})
+
+test('pipeline invalid handler return', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ // TODO: Should body cause unhandled exception?
+ body.on('error', () => {})
+ })
+ .on('error', (err) => {
+ t.type(err, errors.InvalidReturnValueError)
+ })
+ .end()
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ // TODO: Should body cause unhandled exception?
+ body.on('error', () => {})
+ return {}
+ })
+ .on('error', (err) => {
+ t.type(err, errors.InvalidReturnValueError)
+ })
+ .end()
+ })
+})
+
+test('pipeline throw handler', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ // TODO: Should body cause unhandled exception?
+ body.on('error', () => {})
+ throw new Error('asd')
+ })
+ .on('error', (err) => {
+ t.equal(err.message, 'asd')
+ })
+ .end()
+ })
+})
+
+test('pipeline destroy and throw handler', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const dup = client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ dup.destroy()
+ // TODO: Should body cause unhandled exception?
+ body.on('error', () => {})
+ throw new Error('asd')
+ })
+ .end()
+ .on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ .on('close', () => {
+ t.pass()
+ })
+ })
+})
+
+test('pipeline abort res', (t) => {
+ t.plan(2)
+
+ let _res
+ const server = createServer((req, res) => {
+ res.write('asd')
+ _res = res
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ setImmediate(() => {
+ body.destroy()
+ _res.write('asdasdadasd')
+ const timeout = setTimeout(() => {
+ t.fail()
+ }, 100)
+ client.on('disconnect', () => {
+ clearTimeout(timeout)
+ t.pass()
+ })
+ })
+ return body
+ })
+ .on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ .end()
+ })
+})
+
+test('pipeline abort server res', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.destroy()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ t.fail()
+ })
+ .on('error', (err) => {
+ t.type(err, errors.SocketError)
+ })
+ .end()
+ })
+})
+
+test('pipeline abort duplex', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.end()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.request({
+ path: '/',
+ method: 'PUT'
+ }, (err, data) => {
+ t.error(err)
+ data.body.resume()
+
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, () => {
+ t.fail()
+ }).destroy().on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ })
+ })
+})
+
+test('pipeline abort piped res', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ setImmediate(() => {
+ pt.destroy()
+ })
+ return pipeline(body, pt, () => {})
+ })
+ .on('error', (err) => {
+ // Node < 13 doesn't always detect premature close.
+ if (nodeMajor < 13) {
+ t.ok(err)
+ } else {
+ t.equal(err.code, 'UND_ERR_ABORTED')
+ }
+ })
+ .end()
+ })
+})
+
+test('pipeline abort piped res 2', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ body.on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ setImmediate(() => {
+ pt.destroy()
+ })
+ body.pipe(pt)
+ return pt
+ })
+ .on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ .end()
+ })
+})
+
+test('pipeline abort piped res 3', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ const pt = new PassThrough()
+ body.on('error', (err) => {
+ t.equal(err.message, 'asd')
+ })
+ setImmediate(() => {
+ pt.destroy(new Error('asd'))
+ })
+ body.pipe(pt)
+ return pt
+ })
+ .on('error', (err) => {
+ t.equal(err.message, 'asd')
+ })
+ .end()
+ })
+})
+
+test('pipeline abort server res after headers', (t) => {
+ t.plan(1)
+
+ let _res
+ const server = createServer((req, res) => {
+ res.write('asd')
+ _res = res
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, (data) => {
+ _res.destroy()
+ return data.body
+ })
+ .on('error', (err) => {
+ t.type(err, errors.SocketError)
+ })
+ .end()
+ })
+})
+
+test('pipeline w/ write abort server res after headers', (t) => {
+ t.plan(1)
+
+ let _res
+ const server = createServer((req, res) => {
+ req.pipe(res)
+ _res = res
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'PUT'
+ }, (data) => {
+ _res.destroy()
+ return data.body
+ })
+ .on('error', (err) => {
+ t.type(err, errors.SocketError)
+ })
+ .resume()
+ .write('asd')
+ })
+})
+
+test('destroy in push', (t) => {
+ t.plan(3)
+
+ let _res
+ const server = createServer((req, res) => {
+ res.write('asd')
+ _res = res
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.pipeline({ path: '/', method: 'GET' }, ({ body }) => {
+ body.once('data', () => {
+ _res.write('asd')
+ body.on('data', (buf) => {
+ body.destroy()
+ _res.end()
+ }).on('error', (err) => {
+ t.ok(err)
+ })
+ })
+ return body
+ }).on('error', (err) => {
+ t.ok(err)
+ }).resume().end()
+
+ client.pipeline({ path: '/', method: 'GET' }, ({ body }) => {
+ let buf = ''
+ body.on('data', (chunk) => {
+ buf = chunk.toString()
+ _res.end()
+ }).on('end', () => {
+ t.equal('asd', buf)
+ })
+ return body
+ }).resume().end()
+ })
+})
+
+test('pipeline args validation', (t) => {
+ t.plan(2)
+
+ const client = new Client('http://localhost:5000')
+
+ const ret = client.pipeline(null, () => {})
+ ret.on('error', (err) => {
+ t.ok(/opts/.test(err.message))
+ t.type(err, errors.InvalidArgumentError)
+ })
+})
+
+test('pipeline factory throw not unhandled', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, (data) => {
+ throw new Error('asd')
+ })
+ .on('error', (err) => {
+ t.ok(err)
+ })
+ .end()
+ })
+})
+
+test('pipeline destroy before dispatch', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client
+ .pipeline({ path: '/', method: 'GET' }, ({ body }) => {
+ return body
+ })
+ .on('error', (err) => {
+ t.ok(err)
+ })
+ .end()
+ .destroy()
+ })
+})
+
+test('pipeline legacy stream', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.write(Buffer.alloc(16e3))
+ setImmediate(() => {
+ res.end(Buffer.alloc(16e3))
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client
+ .pipeline({ path: '/', method: 'GET' }, ({ body }) => {
+ const pt = new PassThrough()
+ pt.pause = null
+ return body.pipe(pt)
+ })
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ .end()
+ })
+})
+
+test('pipeline objectMode', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end(JSON.stringify({ asd: 1 }))
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client
+ .pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => {
+ return pipeline(body, new Transform({
+ readableObjectMode: true,
+ transform (chunk, encoding, callback) {
+ callback(null, JSON.parse(chunk))
+ }
+ }), () => {})
+ })
+ .on('data', data => {
+ t.strictSame(data, { asd: 1 })
+ })
+ .end()
+ })
+})
+
+test('pipeline invalid opts', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.end(JSON.stringify({ asd: 1 }))
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.close((err) => {
+ t.error(err)
+ })
+ client
+ .pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => {
+ t.fail()
+ })
+ .on('error', (err) => {
+ t.ok(err)
+ })
+ })
+})
+
+test('pipeline CONNECT throw', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'CONNECT'
+ }, () => {
+ t.fail()
+ }).on('error', (err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+ client.on('disconnect', () => {
+ t.fail()
+ })
+ })
+})
+
+test('pipeline body without destroy', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => {
+ const pt = new PassThrough({ autoDestroy: false })
+ pt.destroy = null
+ return body.pipe(pt)
+ })
+ .end()
+ .on('end', () => {
+ t.pass()
+ })
+ .resume()
+ })
+})
+
+test('pipeline ignore 1xx', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => body)
+ .on('data', (chunk) => {
+ buf += chunk
+ })
+ .on('end', () => {
+ t.equal(buf, 'hello')
+ })
+ .end()
+ })
+})
+test('pipeline ignore 1xx and use onInfo', (t) => {
+ t.plan(3)
+
+ const infos = []
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.pipeline({
+ path: '/',
+ method: 'GET',
+ onInfo: (x) => {
+ infos.push(x)
+ }
+ }, ({ body }) => body)
+ .on('data', (chunk) => {
+ buf += chunk
+ })
+ .on('end', () => {
+ t.equal(buf, 'hello')
+ t.equal(infos.length, 1)
+ t.equal(infos[0].statusCode, 102)
+ })
+ .end()
+ })
+})
+
+test('pipeline backpressure', (t) => {
+ t.plan(1)
+
+ const expected = Buffer.alloc(1e6).toString()
+
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end(expected)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.pipeline({
+ path: '/',
+ method: 'GET'
+ }, ({ body }) => body)
+ .end()
+ .pipe(new Transform({
+ highWaterMark: 1,
+ transform (chunk, encoding, callback) {
+ setImmediate(() => {
+ callback(null, chunk)
+ })
+ }
+ }))
+ .on('data', chunk => {
+ buf += chunk
+ })
+ .on('end', () => {
+ t.equal(buf, expected)
+ })
+ })
+})
+
+test('pipeline abort after headers', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.write('asd')
+ setImmediate(() => {
+ res.write('asd')
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const signal = new EE()
+ client.pipeline({
+ path: '/',
+ method: 'GET',
+ signal
+ }, ({ body }) => {
+ process.nextTick(() => {
+ signal.emit('abort')
+ })
+ return body
+ })
+ .end()
+ .on('error', (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ })
+})