summaryrefslogtreecommitdiffstats
path: root/test/client-stream.js
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/client-stream.js847
1 files changed, 847 insertions, 0 deletions
diff --git a/test/client-stream.js b/test/client-stream.js
new file mode 100644
index 0000000..a230c44
--- /dev/null
+++ b/test/client-stream.js
@@ -0,0 +1,847 @@
+'use strict'
+
+const { test } = require('tap')
+const { Client, errors } = require('..')
+const { createServer } = require('http')
+const { PassThrough, Writable, Readable } = require('stream')
+const EE = require('events')
+
+test('stream get', (t) => {
+ t.plan(9)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ t.equal(`localhost:${server.address().port}`, req.headers.host)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const signal = new EE()
+ client.stream({
+ signal,
+ path: '/',
+ method: 'GET',
+ opaque: new PassThrough()
+ }, ({ statusCode, headers, opaque: pt }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ pt.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ pt.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ return pt
+ }, (err) => {
+ t.equal(signal.listenerCount('abort'), 0)
+ t.error(err)
+ })
+ t.equal(signal.listenerCount('abort'), 1)
+ })
+})
+
+test('stream promise get', (t) => {
+ t.plan(6)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ t.equal(`localhost:${server.address().port}`, req.headers.host)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ await client.stream({
+ path: '/',
+ method: 'GET',
+ opaque: new PassThrough()
+ }, ({ statusCode, headers, opaque: pt }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ pt.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ pt.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ return pt
+ })
+ })
+})
+
+test('stream GET destroy res', (t) => {
+ t.plan(14)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ t.equal(`localhost:${server.address().port}`, req.headers.host)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, ({ statusCode, headers }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+
+ const pt = new PassThrough()
+ .on('error', (err) => {
+ t.ok(err)
+ })
+ .on('data', () => {
+ pt.destroy(new Error('kaboom'))
+ })
+
+ return pt
+ }, (err) => {
+ t.ok(err)
+ })
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, ({ statusCode, headers }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+
+ let ret = ''
+ const pt = new PassThrough()
+ pt.on('data', chunk => {
+ ret += chunk
+ }).on('end', () => {
+ t.equal(ret, 'hello')
+ })
+
+ return pt
+ }, (err) => {
+ t.error(err)
+ })
+ })
+})
+
+test('stream GET remote destroy', (t) => {
+ t.plan(4)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ setImmediate(() => {
+ res.destroy()
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ const pt = new PassThrough()
+ pt.on('error', (err) => {
+ t.ok(err)
+ })
+ return pt
+ }, (err) => {
+ t.ok(err)
+ })
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ const pt = new PassThrough()
+ pt.on('error', (err) => {
+ t.ok(err)
+ })
+ return pt
+ }).catch((err) => {
+ t.ok(err)
+ })
+ })
+})
+
+test('stream response resume back pressure and non standard error', (t) => {
+ t.plan(5)
+
+ const server = createServer((req, res) => {
+ res.write(Buffer.alloc(1e3))
+ setImmediate(() => {
+ res.write(Buffer.alloc(1e7))
+ res.end()
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const pt = new PassThrough()
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ pt.on('data', () => {
+ pt.emit('error', new Error('kaboom'))
+ }).once('error', (err) => {
+ t.equal(err.message, 'kaboom')
+ })
+ return pt
+ }, (err) => {
+ t.ok(err)
+ t.equal(pt.destroyed, true)
+ })
+
+ client.once('disconnect', (err) => {
+ t.ok(err)
+ })
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ const pt = new PassThrough()
+ pt.resume()
+ return pt
+ }, (err) => {
+ t.error(err)
+ })
+ })
+})
+
+test('stream waits only for writable side', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.end(Buffer.alloc(1e3))
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const pt = new PassThrough({ autoDestroy: false })
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => pt, (err) => {
+ t.error(err)
+ t.equal(pt.destroyed, false)
+ })
+ })
+})
+
+test('stream args validation', (t) => {
+ t.plan(3)
+
+ const client = new Client('http://localhost:5000')
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, null, (err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+
+ client.stream(null, null, (err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+
+ try {
+ client.stream(null, null, 'asd')
+ } catch (err) {
+ t.type(err, errors.InvalidArgumentError)
+ }
+})
+
+test('stream args validation promise', (t) => {
+ t.plan(2)
+
+ const client = new Client('http://localhost:5000')
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, null).catch((err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+
+ client.stream(null, null).catch((err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+})
+
+test('stream destroy if not readable', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.end()
+ })
+ t.teardown(server.close.bind(server))
+
+ const pt = new PassThrough()
+ pt.readable = false
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ return pt
+ }, (err) => {
+ t.error(err)
+ t.equal(pt.destroyed, true)
+ })
+ })
+})
+
+test('stream server side destroy', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.destroy()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ t.fail()
+ }, (err) => {
+ t.type(err, errors.SocketError)
+ })
+ })
+})
+
+test('stream invalid return', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ return {}
+ }, (err) => {
+ t.type(err, errors.InvalidReturnValueError)
+ })
+ })
+})
+
+test('stream body without destroy', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ const pt = new PassThrough({ autoDestroy: false })
+ pt.destroy = null
+ pt.resume()
+ return pt
+ }, (err) => {
+ t.error(err)
+ })
+ })
+})
+
+test('stream factory abort', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const signal = new EE()
+ client.stream({
+ path: '/',
+ method: 'GET',
+ signal
+ }, () => {
+ signal.emit('abort')
+ return new PassThrough()
+ }, (err) => {
+ t.equal(signal.listenerCount('abort'), 0)
+ t.type(err, errors.RequestAbortedError)
+ })
+ t.equal(signal.listenerCount('abort'), 1)
+ })
+})
+
+test('stream factory throw', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ throw new Error('asd')
+ }, (err) => {
+ t.equal(err.message, 'asd')
+ })
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ throw new Error('asd')
+ }, (err) => {
+ t.equal(err.message, 'asd')
+ })
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ return new PassThrough()
+ }, (err) => {
+ t.error(err)
+ })
+ })
+})
+
+test('stream CONNECT throw', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'CONNECT'
+ }, () => {
+ }, (err) => {
+ t.type(err, errors.InvalidArgumentError)
+ })
+ })
+})
+
+test('stream abort after complete', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const pt = new PassThrough()
+ const signal = new EE()
+ client.stream({
+ path: '/',
+ method: 'GET',
+ signal
+ }, () => {
+ return pt
+ }, (err) => {
+ t.error(err)
+ signal.emit('abort')
+ })
+ })
+})
+
+test('stream abort before dispatch', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const pt = new PassThrough()
+ const signal = new EE()
+ client.stream({
+ path: '/',
+ method: 'GET',
+ signal
+ }, () => {
+ return pt
+ }, (err) => {
+ t.type(err, errors.RequestAbortedError)
+ })
+ signal.emit('abort')
+ })
+})
+
+test('trailers', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.writeHead(200, { Trailer: 'Content-MD5' })
+ res.addTrailers({ 'Content-MD5': 'test' })
+ res.end()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => new PassThrough(), (err, data) => {
+ t.error(err)
+ t.strictSame(data.trailers, { 'content-md5': 'test' })
+ })
+ })
+})
+
+test('stream ignore 1xx', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => new Writable({
+ write (chunk, encoding, callback) {
+ buf += chunk
+ callback()
+ }
+ }), (err, data) => {
+ t.error(err)
+ t.equal(buf, 'hello')
+ })
+ })
+})
+
+test('stream ignore 1xx and use onInfo', (t) => {
+ t.plan(4)
+
+ const infos = []
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.stream({
+ path: '/',
+ method: 'GET',
+ onInfo: (x) => {
+ infos.push(x)
+ }
+ }, () => new Writable({
+ write (chunk, encoding, callback) {
+ buf += chunk
+ callback()
+ }
+ }), (err, data) => {
+ t.error(err)
+ t.equal(buf, 'hello')
+ t.equal(infos.length, 1)
+ t.equal(infos[0].statusCode, 102)
+ })
+ })
+})
+
+test('stream backpressure', (t) => {
+ t.plan(2)
+
+ const expected = Buffer.alloc(1e6).toString()
+
+ const server = createServer((req, res) => {
+ res.writeProcessing()
+ res.end(expected)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => new Writable({
+ highWaterMark: 1,
+ write (chunk, encoding, callback) {
+ buf += chunk
+ process.nextTick(callback)
+ }
+ }), (err, data) => {
+ t.error(err)
+ t.equal(buf, expected)
+ })
+ })
+})
+
+test('stream body destroyed on invalid callback', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const body = new Readable({
+ read () {}
+ })
+ try {
+ client.stream({
+ path: '/',
+ method: 'GET',
+ body
+ }, () => {}, null)
+ } catch (err) {
+ t.equal(body.destroyed, true)
+ }
+ })
+})
+
+test('stream needDrain', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end(Buffer.alloc(4096))
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(() => {
+ client.destroy()
+ })
+
+ const dst = new PassThrough()
+ dst.pause()
+
+ if (dst.writableNeedDrain === undefined) {
+ Object.defineProperty(dst, 'writableNeedDrain', {
+ get () {
+ return this._writableState.needDrain
+ }
+ })
+ }
+
+ while (dst.write(Buffer.alloc(4096))) {
+ // Do nothing.
+ }
+
+ const orgWrite = dst.write
+ dst.write = () => t.fail()
+ const p = client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ t.equal(dst._writableState.needDrain, true)
+ t.equal(dst.writableNeedDrain, true)
+
+ setImmediate(() => {
+ dst.write = (...args) => {
+ orgWrite.call(dst, ...args)
+ }
+ dst.resume()
+ })
+
+ return dst
+ })
+
+ p.then(() => {
+ t.pass()
+ })
+ })
+})
+
+test('stream legacy needDrain', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end(Buffer.alloc(4096))
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(() => {
+ client.destroy()
+ })
+
+ const dst = new PassThrough()
+ dst.pause()
+
+ if (dst.writableNeedDrain !== undefined) {
+ Object.defineProperty(dst, 'writableNeedDrain', {
+ get () {
+ }
+ })
+ }
+
+ while (dst.write(Buffer.alloc(4096))) {
+ // Do nothing
+ }
+
+ const orgWrite = dst.write
+ dst.write = () => t.fail()
+ const p = client.stream({
+ path: '/',
+ method: 'GET'
+ }, () => {
+ t.equal(dst._writableState.needDrain, true)
+ t.equal(dst.writableNeedDrain, undefined)
+
+ setImmediate(() => {
+ dst.write = (...args) => {
+ orgWrite.call(dst, ...args)
+ }
+ dst.resume()
+ })
+
+ return dst
+ })
+
+ p.then(() => {
+ t.pass()
+ })
+ })
+
+ test('stream throwOnError', (t) => {
+ t.plan(2)
+
+ const errStatusCode = 500
+ const errMessage = 'Internal Server Error'
+
+ const server = createServer((req, res) => {
+ res.writeHead(errStatusCode, { 'Content-Type': 'text/plain' })
+ res.end(errMessage)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET',
+ throwOnError: true,
+ opaque: new PassThrough()
+ }, ({ opaque: pt }) => {
+ pt.on('data', () => {
+ t.fail()
+ })
+ return pt
+ }, (e) => {
+ t.equal(e.status, errStatusCode)
+ t.equal(e.body, errMessage)
+ t.end()
+ })
+ })
+ })
+
+ test('steam throwOnError=true, error on stream', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Client(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ client.stream({
+ path: '/',
+ method: 'GET',
+ throwOnError: true,
+ opaque: new PassThrough()
+ }, () => {
+ throw new Error('asd')
+ }, (e) => {
+ t.equal(e.message, 'asd')
+ })
+ })
+ })
+})