diff options
Diffstat (limited to '')
-rw-r--r-- | test/client-pipelining.js | 752 |
1 files changed, 752 insertions, 0 deletions
diff --git a/test/client-pipelining.js b/test/client-pipelining.js new file mode 100644 index 0000000..8cd21fe --- /dev/null +++ b/test/client-pipelining.js @@ -0,0 +1,752 @@ +'use strict' + +const { test } = require('tap') +const { Client } = require('..') +const { createServer } = require('http') +const { finished, Readable } = require('stream') +const { kConnect } = require('../lib/core/symbols') +const EE = require('events') +const { kBusy, kRunning, kSize } = require('../lib/core/symbols') +const { maybeWrapStream, consts } = require('./utils/async-iterators') + +test('20 times GET with pipelining 10', (t) => { + const num = 20 + t.plan(3 * num + 1) + + let count = 0 + let countGreaterThanOne = false + const server = createServer((req, res) => { + count++ + setTimeout(function () { + countGreaterThanOne = countGreaterThanOne || count > 1 + res.end(req.url) + }, 10) + }) + t.teardown(server.close.bind(server)) + + // needed to check for a warning on the maxListeners on the socket + function onWarning (warning) { + if (!/ExperimentalWarning/.test(warning)) { + t.fail() + } + } + process.on('warning', onWarning) + t.teardown(() => { + process.removeListener('warning', onWarning) + }) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 10 + }) + t.teardown(client.close.bind(client)) + + for (let i = 0; i < num; i++) { + makeRequest(i) + } + + function makeRequest (i) { + makeRequestAndExpectUrl(client, i, t, () => { + count-- + + if (i === num - 1) { + t.ok(countGreaterThanOne, 'seen more than one parallel request') + } + }) + } + }) +}) + +function makeRequestAndExpectUrl (client, i, t, cb) { + return client.request({ path: '/' + i, method: 'GET' }, (err, { statusCode, headers, body }) => { + cb() + t.error(err) + t.equal(statusCode, 200) + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.equal('/' + i, Buffer.concat(bufs).toString('utf8')) + }) + }) +} + +test('A client should enqueue as much as twice its pipelining factor', (t) => { + const num = 10 + let sent = 0 + // x * 6 + 1 t.ok + 5 drain + t.plan(num * 6 + 1 + 5 + 2) + + let count = 0 + let countGreaterThanOne = false + const server = createServer((req, res) => { + count++ + t.ok(count <= 5) + setTimeout(function () { + countGreaterThanOne = countGreaterThanOne || count > 1 + res.end(req.url) + }, 10) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.close.bind(client)) + + for (; sent < 2;) { + t.notOk(client[kSize] > client.pipelining, 'client is not full') + makeRequest() + t.ok(client[kSize] <= client.pipelining, 'we can send more requests') + } + + t.ok(client[kBusy], 'client is busy') + t.notOk(client[kSize] > client.pipelining, 'client is full') + makeRequest() + t.ok(client[kBusy], 'we must stop now') + t.ok(client[kBusy], 'client is busy') + t.ok(client[kSize] > client.pipelining, 'client is full') + + function makeRequest () { + makeRequestAndExpectUrl(client, sent++, t, () => { + count-- + setImmediate(() => { + if (client[kSize] === 0) { + t.ok(countGreaterThanOne, 'seen more than one parallel request') + const start = sent + for (; sent < start + 2 && sent < num;) { + t.notOk(client[kSize] > client.pipelining, 'client is not full') + t.ok(makeRequest()) + } + } + }) + }) + return client[kSize] <= client.pipelining + } + }) +}) + +test('pipeline 1 is 1 active request', (t) => { + t.plan(9) + + let res2 + const server = createServer((req, res) => { + res.write('asd') + res2 = res + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 1 + }) + t.teardown(client.destroy.bind(client)) + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.equal(client[kSize], 1) + t.error(err) + t.notOk(client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + finished(data.body, (err) => { + t.ok(err) + client.close((err) => { + t.error(err) + }) + }) + data.body.destroy() + res2.end() + })) + data.body.resume() + res2.end() + }) + t.ok(client[kSize] <= client.pipelining) + t.ok(client[kBusy]) + t.equal(client[kSize], 1) + }) +}) + +test('pipelined chunked POST stream', (t) => { + t.plan(4 + 8 + 8) + + let a = 0 + let b = 0 + + const server = createServer((req, res) => { + req.on('data', chunk => { + // Make sure a and b don't interleave. + t.ok(a === 9 || b === 0) + res.write(chunk) + }).on('end', () => { + res.end() + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.close.bind(client)) + + client.request({ + path: '/', + method: 'GET' + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'POST', + body: new Readable({ + read () { + this.push(++a > 8 ? null : 'a') + } + }) + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'POST', + body: new Readable({ + read () { + this.push(++b > 8 ? null : 'b') + } + }) + }, (err, { body }) => { + body.resume() + t.error(err) + }) + }) +}) + +test('pipelined chunked POST iterator', (t) => { + t.plan(4 + 8 + 8) + + let a = 0 + let b = 0 + + const server = createServer((req, res) => { + req.on('data', chunk => { + // Make sure a and b don't interleave. + t.ok(a === 9 || b === 0) + res.write(chunk) + }).on('end', () => { + res.end() + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.close.bind(client)) + + client.request({ + path: '/', + method: 'GET' + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'POST', + body: (async function * () { + while (++a <= 8) { + yield 'a' + } + })() + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err, { body }) => { + body.resume() + t.error(err) + }) + + client.request({ + path: '/', + method: 'POST', + body: (async function * () { + while (++b <= 8) { + yield 'b' + } + })() + }, (err, { body }) => { + body.resume() + t.error(err) + }) + }) +}) + +function errordInflightPost (bodyType) { + test(`errored POST body lets inflight complete ${bodyType}`, (t) => { + t.plan(6) + + let serverRes + const server = createServer() + server.on('request', (req, res) => { + serverRes = res + res.write('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.destroy.bind(client)) + + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + data.body + .resume() + .once('data', () => { + client.request({ + path: '/', + method: 'POST', + opaque: 'asd', + body: maybeWrapStream(new Readable({ + read () { + this.destroy(new Error('kaboom')) + } + }).once('error', (err) => { + t.ok(err) + }).on('error', () => { + // Readable emits error twice... + }), bodyType) + }, (err, data) => { + t.ok(err) + t.equal(data.opaque, 'asd') + }) + client.close((err) => { + t.error(err) + }) + serverRes.end() + }) + .on('end', () => { + t.pass() + }) + }) + }) + }) +} + +errordInflightPost(consts.STREAM) +errordInflightPost(consts.ASYNC_ITERATOR) + +test('pipelining non-idempotent', (t) => { + t.plan(4) + + const server = createServer() + server.on('request', (req, res) => { + setTimeout(() => { + res.end('asd') + }, 10) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.close.bind(client)) + + let ended = false + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + ended = true + }) + }) + + client.request({ + path: '/', + method: 'GET', + idempotent: false + }, (err, data) => { + t.error(err) + t.equal(ended, true) + data.body.resume() + }) + }) +}) + +function pipeliningNonIdempotentWithBody (bodyType) { + test(`pipelining non-idempotent w body ${bodyType}`, (t) => { + t.plan(4) + + const server = createServer() + server.on('request', (req, res) => { + setImmediate(() => { + res.end('asd') + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 2 + }) + t.teardown(client.close.bind(client)) + + let ended = false + let reading = false + client.request({ + path: '/', + method: 'POST', + body: maybeWrapStream(new Readable({ + read () { + if (reading) { + return + } + reading = true + this.push('asd') + setImmediate(() => { + this.push(null) + ended = true + }) + } + }), bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + + client.request({ + path: '/', + method: 'GET', + idempotent: false + }, (err, data) => { + t.error(err) + t.equal(ended, true) + data.body.resume() + }) + }) + }) +} + +pipeliningNonIdempotentWithBody(consts.STREAM) +pipeliningNonIdempotentWithBody(consts.ASYNC_ITERATOR) + +function pipeliningHeadBusy (bodyType) { + test(`pipelining HEAD busy ${bodyType}`, (t) => { + t.plan(7) + + const server = createServer() + server.on('request', (req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 10 + }) + t.teardown(client.close.bind(client)) + + client[kConnect](() => { + let ended = false + client.once('disconnect', () => { + t.equal(ended, true) + }) + + { + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'GET', + body: maybeWrapStream(body, bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + body.push(null) + t.equal(client[kBusy], true) + } + + { + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'HEAD', + body: maybeWrapStream(body, bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + ended = true + t.pass() + }) + }) + body.push(null) + t.equal(client[kBusy], true) + } + }) + }) + }) +} + +pipeliningHeadBusy(consts.STREAM) +pipeliningHeadBusy(consts.ASYNC_ITERATOR) + +test('pipelining empty pipeline before reset', (t) => { + t.plan(8) + + let c = 0 + const server = createServer() + server.on('request', (req, res) => { + if (c++ === 0) { + res.end('asd') + } else { + setTimeout(() => { + res.end('asd') + }, 100) + } + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 10 + }) + t.teardown(client.close.bind(client)) + + client[kConnect](() => { + let ended = false + client.once('disconnect', () => { + t.equal(ended, true) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + t.equal(client[kBusy], false) + + client.request({ + path: '/', + method: 'HEAD', + body: 'asd' + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + ended = true + t.pass() + }) + }) + t.equal(client[kBusy], true) + t.equal(client[kRunning], 2) + }) + }) +}) + +function pipeliningIdempotentBusy (bodyType) { + test(`pipelining idempotent busy ${bodyType}`, (t) => { + t.plan(12) + + const server = createServer() + server.on('request', (req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 10 + }) + t.teardown(client.close.bind(client)) + + { + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'GET', + body: maybeWrapStream(body, bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + body.push(null) + t.equal(client[kBusy], true) + } + + client[kConnect](() => { + { + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'GET', + body: maybeWrapStream(body, bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + body.push(null) + t.equal(client[kBusy], true) + } + + { + const signal = new EE() + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'GET', + body: maybeWrapStream(body, bodyType), + signal + }, (err, data) => { + t.ok(err) + }) + t.equal(client[kBusy], true) + signal.emit('abort') + t.equal(client[kBusy], true) + } + + { + const body = new Readable({ + read () { } + }) + client.request({ + path: '/', + method: 'GET', + idempotent: false, + body: maybeWrapStream(body, bodyType) + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + body.push(null) + t.equal(client[kBusy], true) + } + }) + }) + }) +} + +pipeliningIdempotentBusy(consts.STREAM) +pipeliningIdempotentBusy(consts.ASYNC_ITERATOR) + +test('pipelining blocked', (t) => { + t.plan(6) + + const server = createServer() + + let blocking = true + let count = 0 + + server.on('request', (req, res) => { + t.ok(!count || !blocking) + count++ + setImmediate(() => { + res.end('asd') + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Client(`http://localhost:${server.address().port}`, { + pipelining: 10 + }) + t.teardown(client.close.bind(client)) + client.request({ + path: '/', + method: 'GET', + blocking: true + }, (err, data) => { + t.error(err) + blocking = false + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + data.body + .resume() + .on('end', () => { + t.pass() + }) + }) + }) +}) |