diff options
Diffstat (limited to 'test/client-stream.js')
-rw-r--r-- | test/client-stream.js | 847 |
1 files changed, 847 insertions, 0 deletions
diff --git a/test/client-stream.js b/test/client-stream.js new file mode 100644 index 0000000..a230c44 --- /dev/null +++ b/test/client-stream.js @@ -0,0 +1,847 @@ +'use strict' + +const { test } = require('tap') +const { Client, errors } = require('..') +const { createServer } = require('http') +const { PassThrough, Writable, Readable } = require('stream') +const EE = require('events') + +test('stream get', (t) => { + t.plan(9) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + t.equal(`localhost:${server.address().port}`, req.headers.host) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + const signal = new EE() + client.stream({ + signal, + path: '/', + method: 'GET', + opaque: new PassThrough() + }, ({ statusCode, headers, opaque: pt }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + pt.on('data', (buf) => { + bufs.push(buf) + }) + pt.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + return pt + }, (err) => { + t.equal(signal.listenerCount('abort'), 0) + t.error(err) + }) + t.equal(signal.listenerCount('abort'), 1) + }) +}) + +test('stream promise get', (t) => { + t.plan(6) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + t.equal(`localhost:${server.address().port}`, req.headers.host) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + await client.stream({ + path: '/', + method: 'GET', + opaque: new PassThrough() + }, ({ statusCode, headers, opaque: pt }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + pt.on('data', (buf) => { + bufs.push(buf) + }) + pt.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + return pt + }) + }) +}) + +test('stream GET destroy res', (t) => { + t.plan(14) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + t.equal(`localhost:${server.address().port}`, req.headers.host) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, ({ statusCode, headers }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + + const pt = new PassThrough() + .on('error', (err) => { + t.ok(err) + }) + .on('data', () => { + pt.destroy(new Error('kaboom')) + }) + + return pt + }, (err) => { + t.ok(err) + }) + + client.stream({ + path: '/', + method: 'GET' + }, ({ statusCode, headers }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + + let ret = '' + const pt = new PassThrough() + pt.on('data', chunk => { + ret += chunk + }).on('end', () => { + t.equal(ret, 'hello') + }) + + return pt + }, (err) => { + t.error(err) + }) + }) +}) + +test('stream GET remote destroy', (t) => { + t.plan(4) + + const server = createServer((req, res) => { + res.write('asd') + setImmediate(() => { + res.destroy() + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + const pt = new PassThrough() + pt.on('error', (err) => { + t.ok(err) + }) + return pt + }, (err) => { + t.ok(err) + }) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + const pt = new PassThrough() + pt.on('error', (err) => { + t.ok(err) + }) + return pt + }).catch((err) => { + t.ok(err) + }) + }) +}) + +test('stream response resume back pressure and non standard error', (t) => { + t.plan(5) + + const server = createServer((req, res) => { + res.write(Buffer.alloc(1e3)) + setImmediate(() => { + res.write(Buffer.alloc(1e7)) + res.end() + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + const pt = new PassThrough() + client.stream({ + path: '/', + method: 'GET' + }, () => { + pt.on('data', () => { + pt.emit('error', new Error('kaboom')) + }).once('error', (err) => { + t.equal(err.message, 'kaboom') + }) + return pt + }, (err) => { + t.ok(err) + t.equal(pt.destroyed, true) + }) + + client.once('disconnect', (err) => { + t.ok(err) + }) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + const pt = new PassThrough() + pt.resume() + return pt + }, (err) => { + t.error(err) + }) + }) +}) + +test('stream waits only for writable side', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(1e3)) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + const pt = new PassThrough({ autoDestroy: false }) + client.stream({ + path: '/', + method: 'GET' + }, () => pt, (err) => { + t.error(err) + t.equal(pt.destroyed, false) + }) + }) +}) + +test('stream args validation', (t) => { + t.plan(3) + + const client = new Client('http://localhost:5000') + client.stream({ + path: '/', + method: 'GET' + }, null, (err) => { + t.type(err, errors.InvalidArgumentError) + }) + + client.stream(null, null, (err) => { + t.type(err, errors.InvalidArgumentError) + }) + + try { + client.stream(null, null, 'asd') + } catch (err) { + t.type(err, errors.InvalidArgumentError) + } +}) + +test('stream args validation promise', (t) => { + t.plan(2) + + const client = new Client('http://localhost:5000') + client.stream({ + path: '/', + method: 'GET' + }, null).catch((err) => { + t.type(err, errors.InvalidArgumentError) + }) + + client.stream(null, null).catch((err) => { + t.type(err, errors.InvalidArgumentError) + }) +}) + +test('stream destroy if not readable', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.end() + }) + t.teardown(server.close.bind(server)) + + const pt = new PassThrough() + pt.readable = false + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + return pt + }, (err) => { + t.error(err) + t.equal(pt.destroyed, true) + }) + }) +}) + +test('stream server side destroy', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.destroy() + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + t.fail() + }, (err) => { + t.type(err, errors.SocketError) + }) + }) +}) + +test('stream invalid return', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.write('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + return {} + }, (err) => { + t.type(err, errors.InvalidReturnValueError) + }) + }) +}) + +test('stream body without destroy', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + const pt = new PassThrough({ autoDestroy: false }) + pt.destroy = null + pt.resume() + return pt + }, (err) => { + t.error(err) + }) + }) +}) + +test('stream factory abort', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const signal = new EE() + client.stream({ + path: '/', + method: 'GET', + signal + }, () => { + signal.emit('abort') + return new PassThrough() + }, (err) => { + t.equal(signal.listenerCount('abort'), 0) + t.type(err, errors.RequestAbortedError) + }) + t.equal(signal.listenerCount('abort'), 1) + }) +}) + +test('stream factory throw', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => { + throw new Error('asd') + }, (err) => { + t.equal(err.message, 'asd') + }) + client.stream({ + path: '/', + method: 'GET' + }, () => { + throw new Error('asd') + }, (err) => { + t.equal(err.message, 'asd') + }) + client.stream({ + path: '/', + method: 'GET' + }, () => { + return new PassThrough() + }, (err) => { + t.error(err) + }) + }) +}) + +test('stream CONNECT throw', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + client.stream({ + path: '/', + method: 'CONNECT' + }, () => { + }, (err) => { + t.type(err, errors.InvalidArgumentError) + }) + }) +}) + +test('stream abort after complete', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const pt = new PassThrough() + const signal = new EE() + client.stream({ + path: '/', + method: 'GET', + signal + }, () => { + return pt + }, (err) => { + t.error(err) + signal.emit('abort') + }) + }) +}) + +test('stream abort before dispatch', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const pt = new PassThrough() + const signal = new EE() + client.stream({ + path: '/', + method: 'GET', + signal + }, () => { + return pt + }, (err) => { + t.type(err, errors.RequestAbortedError) + }) + signal.emit('abort') + }) +}) + +test('trailers', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.writeHead(200, { Trailer: 'Content-MD5' }) + res.addTrailers({ 'Content-MD5': 'test' }) + res.end() + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET' + }, () => new PassThrough(), (err, data) => { + t.error(err) + t.strictSame(data.trailers, { 'content-md5': 'test' }) + }) + }) +}) + +test('stream ignore 1xx', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.writeProcessing() + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + let buf = '' + client.stream({ + path: '/', + method: 'GET' + }, () => new Writable({ + write (chunk, encoding, callback) { + buf += chunk + callback() + } + }), (err, data) => { + t.error(err) + t.equal(buf, 'hello') + }) + }) +}) + +test('stream ignore 1xx and use onInfo', (t) => { + t.plan(4) + + const infos = [] + const server = createServer((req, res) => { + res.writeProcessing() + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + let buf = '' + client.stream({ + path: '/', + method: 'GET', + onInfo: (x) => { + infos.push(x) + } + }, () => new Writable({ + write (chunk, encoding, callback) { + buf += chunk + callback() + } + }), (err, data) => { + t.error(err) + t.equal(buf, 'hello') + t.equal(infos.length, 1) + t.equal(infos[0].statusCode, 102) + }) + }) +}) + +test('stream backpressure', (t) => { + t.plan(2) + + const expected = Buffer.alloc(1e6).toString() + + const server = createServer((req, res) => { + res.writeProcessing() + res.end(expected) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + let buf = '' + client.stream({ + path: '/', + method: 'GET' + }, () => new Writable({ + highWaterMark: 1, + write (chunk, encoding, callback) { + buf += chunk + process.nextTick(callback) + } + }), (err, data) => { + t.error(err) + t.equal(buf, expected) + }) + }) +}) + +test('stream body destroyed on invalid callback', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const body = new Readable({ + read () {} + }) + try { + client.stream({ + path: '/', + method: 'GET', + body + }, () => {}, null) + } catch (err) { + t.equal(body.destroyed, true) + } + }) +}) + +test('stream needDrain', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(() => { + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + if (dst.writableNeedDrain === undefined) { + Object.defineProperty(dst, 'writableNeedDrain', { + get () { + return this._writableState.needDrain + } + }) + } + + while (dst.write(Buffer.alloc(4096))) { + // Do nothing. + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + t.equal(dst._writableState.needDrain, true) + t.equal(dst.writableNeedDrain, true) + + setImmediate(() => { + dst.write = (...args) => { + orgWrite.call(dst, ...args) + } + dst.resume() + }) + + return dst + }) + + p.then(() => { + t.pass() + }) + }) +}) + +test('stream legacy needDrain', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end(Buffer.alloc(4096)) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(() => { + client.destroy() + }) + + const dst = new PassThrough() + dst.pause() + + if (dst.writableNeedDrain !== undefined) { + Object.defineProperty(dst, 'writableNeedDrain', { + get () { + } + }) + } + + while (dst.write(Buffer.alloc(4096))) { + // Do nothing + } + + const orgWrite = dst.write + dst.write = () => t.fail() + const p = client.stream({ + path: '/', + method: 'GET' + }, () => { + t.equal(dst._writableState.needDrain, true) + t.equal(dst.writableNeedDrain, undefined) + + setImmediate(() => { + dst.write = (...args) => { + orgWrite.call(dst, ...args) + } + dst.resume() + }) + + return dst + }) + + p.then(() => { + t.pass() + }) + }) + + test('stream throwOnError', (t) => { + t.plan(2) + + const errStatusCode = 500 + const errMessage = 'Internal Server Error' + + const server = createServer((req, res) => { + res.writeHead(errStatusCode, { 'Content-Type': 'text/plain' }) + res.end(errMessage) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET', + throwOnError: true, + opaque: new PassThrough() + }, ({ opaque: pt }) => { + pt.on('data', () => { + t.fail() + }) + return pt + }, (e) => { + t.equal(e.status, errStatusCode) + t.equal(e.body, errMessage) + t.end() + }) + }) + }) + + test('steam throwOnError=true, error on stream', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Client(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + client.stream({ + path: '/', + method: 'GET', + throwOnError: true, + opaque: new PassThrough() + }, () => { + throw new Error('asd') + }, (e) => { + t.equal(e.message, 'asd') + }) + }) + }) +}) |