'use strict' const { test } = require('tap') const { Client, errors } = require('..') const EE = require('events') const { createServer } = require('http') const { pipeline, Readable, Transform, Writable, PassThrough } = require('stream') const { nodeMajor } = require('../lib/core/util') test('pipeline get', (t) => { t.plan(17) const server = createServer((req, res) => { t.equal('/', req.url) t.equal('GET', req.method) t.equal(`localhost:${server.address().port}`, req.headers.host) t.equal(undefined, req.headers['content-length']) res.setHeader('Content-Type', 'text/plain') res.end('hello') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) { const bufs = [] const signal = new EE() client.pipeline({ signal, path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') t.equal(signal.listenerCount('abort'), 1) return body }) .end() .on('data', (buf) => { bufs.push(buf) }) .on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) .on('close', () => { t.equal(signal.listenerCount('abort'), 0) }) t.equal(signal.listenerCount('abort'), 1) } { const bufs = [] client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { t.equal(statusCode, 200) t.equal(headers['content-type'], 'text/plain') return body }) .end() .on('data', (buf) => { bufs.push(buf) }) .on('end', () => { t.equal('hello', Buffer.concat(bufs).toString('utf8')) }) } }) }) test('pipeline echo', (t) => { t.plan(2) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) let res = '' const buf1 = Buffer.alloc(1e3).toString() const buf2 = Buffer.alloc(1e6).toString() pipeline( new Readable({ read () { this.push(buf1) this.push(buf2) this.push(null) } }), client.pipeline({ path: '/', method: 'PUT' }, ({ body }) => { return pipeline(body, new PassThrough(), () => {}) }), new Writable({ write (chunk, encoding, callback) { res += chunk.toString() callback() }, final (callback) { t.equal(res, buf1 + buf2) callback() } }), (err) => { t.error(err) } ) }) }) test('pipeline ignore request body', (t) => { t.plan(2) let done const server = createServer((req, res) => { res.write('asd') res.end() done() }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) let res = '' const buf1 = Buffer.alloc(1e3).toString() const buf2 = Buffer.alloc(1e6).toString() pipeline( new Readable({ read () { this.push(buf1) this.push(buf2) done = () => this.push(null) } }), client.pipeline({ path: '/', method: 'PUT' }, ({ body }) => { return pipeline(body, new PassThrough(), () => {}) }), new Writable({ write (chunk, encoding, callback) { res += chunk.toString() callback() }, final (callback) { t.equal(res, 'asd') callback() } }), (err) => { t.error(err) } ) }) }) test('pipeline invalid handler', (t) => { t.plan(1) const client = new Client('http://localhost:5000') client.pipeline({}, null).on('error', (err) => { t.ok(/handler/.test(err)) }) }) test('pipeline invalid handler return after destroy should not error', (t) => { t.plan(3) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`, { pipelining: 3 }) t.teardown(client.destroy.bind(client)) const dup = client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { body.on('error', (err) => { t.equal(err.message, 'asd') }) dup.destroy(new Error('asd')) return {} }) .on('error', (err) => { t.equal(err.message, 'asd') }) .on('close', () => { t.pass() }) .end() }) }) test('pipeline error body', (t) => { t.plan(2) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) const buf = Buffer.alloc(1e6).toString() pipeline( new Readable({ read () { this.push(buf) } }), client.pipeline({ path: '/', method: 'PUT' }, ({ body }) => { const pt = new PassThrough() process.nextTick(() => { pt.destroy(new Error('asd')) }) body.on('error', (err) => { t.ok(err) }) return pipeline(body, pt, () => {}) }), new PassThrough(), (err) => { t.ok(err) } ) }) }) test('pipeline destroy body', (t) => { t.plan(2) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) const buf = Buffer.alloc(1e6).toString() pipeline( new Readable({ read () { this.push(buf) } }), client.pipeline({ path: '/', method: 'PUT' }, ({ body }) => { const pt = new PassThrough() process.nextTick(() => { pt.destroy() }) body.on('error', (err) => { t.ok(err) }) return pipeline(body, pt, () => {}) }), new PassThrough(), (err) => { t.ok(err) } ) }) }) test('pipeline backpressure', (t) => { t.plan(1) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) const buf = Buffer.alloc(1e6).toString() const duplex = client.pipeline({ path: '/', method: 'PUT' }, ({ body }) => { const pt = new PassThrough() return pipeline(body, pt, () => {}) }) duplex.end(buf) duplex.on('data', () => { duplex.pause() setImmediate(() => { duplex.resume() }) }).on('end', () => { t.pass() }) }) }) test('pipeline invalid handler return', (t) => { t.plan(2) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { // TODO: Should body cause unhandled exception? body.on('error', () => {}) }) .on('error', (err) => { t.type(err, errors.InvalidReturnValueError) }) .end() client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { // TODO: Should body cause unhandled exception? body.on('error', () => {}) return {} }) .on('error', (err) => { t.type(err, errors.InvalidReturnValueError) }) .end() }) }) test('pipeline throw handler', (t) => { t.plan(1) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { // TODO: Should body cause unhandled exception? body.on('error', () => {}) throw new Error('asd') }) .on('error', (err) => { t.equal(err.message, 'asd') }) .end() }) }) test('pipeline destroy and throw handler', (t) => { t.plan(2) const server = createServer((req, res) => { req.pipe(res) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) const dup = client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { dup.destroy() // TODO: Should body cause unhandled exception? body.on('error', () => {}) throw new Error('asd') }) .end() .on('error', (err) => { t.type(err, errors.RequestAbortedError) }) .on('close', () => { t.pass() }) }) }) test('pipeline abort res', (t) => { t.plan(2) let _res const server = createServer((req, res) => { res.write('asd') _res = res }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { setImmediate(() => { body.destroy() _res.write('asdasdadasd') const timeout = setTimeout(() => { t.fail() }, 100) client.on('disconnect', () => { clearTimeout(timeout) t.pass() }) }) return body }) .on('error', (err) => { t.type(err, errors.RequestAbortedError) }) .end() }) }) test('pipeline abort server res', (t) => { t.plan(1) const server = createServer((req, res) => { res.destroy() }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, () => { t.fail() }) .on('error', (err) => { t.type(err, errors.SocketError) }) .end() }) }) test('pipeline abort duplex', (t) => { t.plan(2) const server = createServer((req, res) => { res.end() }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.request({ path: '/', method: 'PUT' }, (err, data) => { t.error(err) data.body.resume() client.pipeline({ path: '/', method: 'PUT' }, () => { t.fail() }).destroy().on('error', (err) => { t.type(err, errors.RequestAbortedError) }) }) }) }) test('pipeline abort piped res', (t) => { t.plan(1) const server = createServer((req, res) => { res.write('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { const pt = new PassThrough() setImmediate(() => { pt.destroy() }) return pipeline(body, pt, () => {}) }) .on('error', (err) => { // Node < 13 doesn't always detect premature close. if (nodeMajor < 13) { t.ok(err) } else { t.equal(err.code, 'UND_ERR_ABORTED') } }) .end() }) }) test('pipeline abort piped res 2', (t) => { t.plan(2) const server = createServer((req, res) => { res.write('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { const pt = new PassThrough() body.on('error', (err) => { t.type(err, errors.RequestAbortedError) }) setImmediate(() => { pt.destroy() }) body.pipe(pt) return pt }) .on('error', (err) => { t.type(err, errors.RequestAbortedError) }) .end() }) }) test('pipeline abort piped res 3', (t) => { t.plan(2) const server = createServer((req, res) => { res.write('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { const pt = new PassThrough() body.on('error', (err) => { t.equal(err.message, 'asd') }) setImmediate(() => { pt.destroy(new Error('asd')) }) body.pipe(pt) return pt }) .on('error', (err) => { t.equal(err.message, 'asd') }) .end() }) }) test('pipeline abort server res after headers', (t) => { t.plan(1) let _res const server = createServer((req, res) => { res.write('asd') _res = res }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, (data) => { _res.destroy() return data.body }) .on('error', (err) => { t.type(err, errors.SocketError) }) .end() }) }) test('pipeline w/ write abort server res after headers', (t) => { t.plan(1) let _res const server = createServer((req, res) => { req.pipe(res) _res = res }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'PUT' }, (data) => { _res.destroy() return data.body }) .on('error', (err) => { t.type(err, errors.SocketError) }) .resume() .write('asd') }) }) test('destroy in push', (t) => { t.plan(3) let _res const server = createServer((req, res) => { res.write('asd') _res = res }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { body.once('data', () => { _res.write('asd') body.on('data', (buf) => { body.destroy() _res.end() }).on('error', (err) => { t.ok(err) }) }) return body }).on('error', (err) => { t.ok(err) }).resume().end() client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { let buf = '' body.on('data', (chunk) => { buf = chunk.toString() _res.end() }).on('end', () => { t.equal('asd', buf) }) return body }).resume().end() }) }) test('pipeline args validation', (t) => { t.plan(2) const client = new Client('http://localhost:5000') const ret = client.pipeline(null, () => {}) ret.on('error', (err) => { t.ok(/opts/.test(err.message)) t.type(err, errors.InvalidArgumentError) }) }) test('pipeline factory throw not unhandled', (t) => { t.plan(1) const server = createServer((req, res) => { res.write('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, (data) => { throw new Error('asd') }) .on('error', (err) => { t.ok(err) }) .end() }) }) test('pipeline destroy before dispatch', (t) => { t.plan(1) const server = createServer((req, res) => { res.end('hello') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client .pipeline({ path: '/', method: 'GET' }, ({ body }) => { return body }) .on('error', (err) => { t.ok(err) }) .end() .destroy() }) }) test('pipeline legacy stream', (t) => { t.plan(1) const server = createServer((req, res) => { res.write(Buffer.alloc(16e3)) setImmediate(() => { res.end(Buffer.alloc(16e3)) }) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) client .pipeline({ path: '/', method: 'GET' }, ({ body }) => { const pt = new PassThrough() pt.pause = null return body.pipe(pt) }) .resume() .on('end', () => { t.pass() }) .end() }) }) test('pipeline objectMode', (t) => { t.plan(1) const server = createServer((req, res) => { res.end(JSON.stringify({ asd: 1 })) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) client .pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => { return pipeline(body, new Transform({ readableObjectMode: true, transform (chunk, encoding, callback) { callback(null, JSON.parse(chunk)) } }), () => {}) }) .on('data', data => { t.strictSame(data, { asd: 1 }) }) .end() }) }) test('pipeline invalid opts', (t) => { t.plan(2) const server = createServer((req, res) => { res.end(JSON.stringify({ asd: 1 })) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.close((err) => { t.error(err) }) client .pipeline({ path: '/', method: 'GET', objectMode: true }, ({ body }) => { t.fail() }) .on('error', (err) => { t.ok(err) }) }) }) test('pipeline CONNECT throw', (t) => { t.plan(1) const server = createServer((req, res) => { res.end('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'CONNECT' }, () => { t.fail() }).on('error', (err) => { t.type(err, errors.InvalidArgumentError) }) client.on('disconnect', () => { t.fail() }) }) }) test('pipeline body without destroy', (t) => { t.plan(1) const server = createServer((req, res) => { res.end('asd') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) client.pipeline({ path: '/', method: 'GET' }, ({ body }) => { const pt = new PassThrough({ autoDestroy: false }) pt.destroy = null return body.pipe(pt) }) .end() .on('end', () => { t.pass() }) .resume() }) }) test('pipeline ignore 1xx', (t) => { t.plan(1) const server = createServer((req, res) => { res.writeProcessing() res.end('hello') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) let buf = '' client.pipeline({ path: '/', method: 'GET' }, ({ body }) => body) .on('data', (chunk) => { buf += chunk }) .on('end', () => { t.equal(buf, 'hello') }) .end() }) }) test('pipeline ignore 1xx and use onInfo', (t) => { t.plan(3) const infos = [] const server = createServer((req, res) => { res.writeProcessing() res.end('hello') }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) let buf = '' client.pipeline({ path: '/', method: 'GET', onInfo: (x) => { infos.push(x) } }, ({ body }) => body) .on('data', (chunk) => { buf += chunk }) .on('end', () => { t.equal(buf, 'hello') t.equal(infos.length, 1) t.equal(infos[0].statusCode, 102) }) .end() }) }) test('pipeline backpressure', (t) => { t.plan(1) const expected = Buffer.alloc(1e6).toString() const server = createServer((req, res) => { res.writeProcessing() res.end(expected) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.close.bind(client)) let buf = '' client.pipeline({ path: '/', method: 'GET' }, ({ body }) => body) .end() .pipe(new Transform({ highWaterMark: 1, transform (chunk, encoding, callback) { setImmediate(() => { callback(null, chunk) }) } })) .on('data', chunk => { buf += chunk }) .on('end', () => { t.equal(buf, expected) }) }) }) test('pipeline abort after headers', (t) => { t.plan(1) const server = createServer((req, res) => { res.writeProcessing() res.write('asd') setImmediate(() => { res.write('asd') }) }) t.teardown(server.close.bind(server)) server.listen(0, () => { const client = new Client(`http://localhost:${server.address().port}`) t.teardown(client.destroy.bind(client)) const signal = new EE() client.pipeline({ path: '/', method: 'GET', signal }, ({ body }) => { process.nextTick(() => { signal.emit('abort') }) return body }) .end() .on('error', (err) => { t.type(err, errors.RequestAbortedError) }) }) })