summaryrefslogtreecommitdiffstats
path: root/test/client-pipelining.js
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/client-pipelining.js752
1 files changed, 752 insertions, 0 deletions
diff --git a/test/client-pipelining.js b/test/client-pipelining.js
new file mode 100644
index 0000000..8cd21fe
--- /dev/null
+++ b/test/client-pipelining.js
@@ -0,0 +1,752 @@
+'use strict'
+
+const { test } = require('tap')
+const { Client } = require('..')
+const { createServer } = require('http')
+const { finished, Readable } = require('stream')
+const { kConnect } = require('../lib/core/symbols')
+const EE = require('events')
+const { kBusy, kRunning, kSize } = require('../lib/core/symbols')
+const { maybeWrapStream, consts } = require('./utils/async-iterators')
+
+test('20 times GET with pipelining 10', (t) => {
+ const num = 20
+ t.plan(3 * num + 1)
+
+ let count = 0
+ let countGreaterThanOne = false
+ const server = createServer((req, res) => {
+ count++
+ setTimeout(function () {
+ countGreaterThanOne = countGreaterThanOne || count > 1
+ res.end(req.url)
+ }, 10)
+ })
+ t.teardown(server.close.bind(server))
+
+ // needed to check for a warning on the maxListeners on the socket
+ function onWarning (warning) {
+ if (!/ExperimentalWarning/.test(warning)) {
+ t.fail()
+ }
+ }
+ process.on('warning', onWarning)
+ t.teardown(() => {
+ process.removeListener('warning', onWarning)
+ })
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 10
+ })
+ t.teardown(client.close.bind(client))
+
+ for (let i = 0; i < num; i++) {
+ makeRequest(i)
+ }
+
+ function makeRequest (i) {
+ makeRequestAndExpectUrl(client, i, t, () => {
+ count--
+
+ if (i === num - 1) {
+ t.ok(countGreaterThanOne, 'seen more than one parallel request')
+ }
+ })
+ }
+ })
+})
+
+function makeRequestAndExpectUrl (client, i, t, cb) {
+ return client.request({ path: '/' + i, method: 'GET' }, (err, { statusCode, headers, body }) => {
+ cb()
+ t.error(err)
+ t.equal(statusCode, 200)
+ const bufs = []
+ body.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ body.on('end', () => {
+ t.equal('/' + i, Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+}
+
+test('A client should enqueue as much as twice its pipelining factor', (t) => {
+ const num = 10
+ let sent = 0
+ // x * 6 + 1 t.ok + 5 drain
+ t.plan(num * 6 + 1 + 5 + 2)
+
+ let count = 0
+ let countGreaterThanOne = false
+ const server = createServer((req, res) => {
+ count++
+ t.ok(count <= 5)
+ setTimeout(function () {
+ countGreaterThanOne = countGreaterThanOne || count > 1
+ res.end(req.url)
+ }, 10)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.close.bind(client))
+
+ for (; sent < 2;) {
+ t.notOk(client[kSize] > client.pipelining, 'client is not full')
+ makeRequest()
+ t.ok(client[kSize] <= client.pipelining, 'we can send more requests')
+ }
+
+ t.ok(client[kBusy], 'client is busy')
+ t.notOk(client[kSize] > client.pipelining, 'client is full')
+ makeRequest()
+ t.ok(client[kBusy], 'we must stop now')
+ t.ok(client[kBusy], 'client is busy')
+ t.ok(client[kSize] > client.pipelining, 'client is full')
+
+ function makeRequest () {
+ makeRequestAndExpectUrl(client, sent++, t, () => {
+ count--
+ setImmediate(() => {
+ if (client[kSize] === 0) {
+ t.ok(countGreaterThanOne, 'seen more than one parallel request')
+ const start = sent
+ for (; sent < start + 2 && sent < num;) {
+ t.notOk(client[kSize] > client.pipelining, 'client is not full')
+ t.ok(makeRequest())
+ }
+ }
+ })
+ })
+ return client[kSize] <= client.pipelining
+ }
+ })
+})
+
+test('pipeline 1 is 1 active request', (t) => {
+ t.plan(9)
+
+ let res2
+ const server = createServer((req, res) => {
+ res.write('asd')
+ res2 = res
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 1
+ })
+ t.teardown(client.destroy.bind(client))
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.equal(client[kSize], 1)
+ t.error(err)
+ t.notOk(client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ finished(data.body, (err) => {
+ t.ok(err)
+ client.close((err) => {
+ t.error(err)
+ })
+ })
+ data.body.destroy()
+ res2.end()
+ }))
+ data.body.resume()
+ res2.end()
+ })
+ t.ok(client[kSize] <= client.pipelining)
+ t.ok(client[kBusy])
+ t.equal(client[kSize], 1)
+ })
+})
+
+test('pipelined chunked POST stream', (t) => {
+ t.plan(4 + 8 + 8)
+
+ let a = 0
+ let b = 0
+
+ const server = createServer((req, res) => {
+ req.on('data', chunk => {
+ // Make sure a and b don't interleave.
+ t.ok(a === 9 || b === 0)
+ res.write(chunk)
+ }).on('end', () => {
+ res.end()
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.close.bind(client))
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'POST',
+ body: new Readable({
+ read () {
+ this.push(++a > 8 ? null : 'a')
+ }
+ })
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'POST',
+ body: new Readable({
+ read () {
+ this.push(++b > 8 ? null : 'b')
+ }
+ })
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+ })
+})
+
+test('pipelined chunked POST iterator', (t) => {
+ t.plan(4 + 8 + 8)
+
+ let a = 0
+ let b = 0
+
+ const server = createServer((req, res) => {
+ req.on('data', chunk => {
+ // Make sure a and b don't interleave.
+ t.ok(a === 9 || b === 0)
+ res.write(chunk)
+ }).on('end', () => {
+ res.end()
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.close.bind(client))
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'POST',
+ body: (async function * () {
+ while (++a <= 8) {
+ yield 'a'
+ }
+ })()
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'POST',
+ body: (async function * () {
+ while (++b <= 8) {
+ yield 'b'
+ }
+ })()
+ }, (err, { body }) => {
+ body.resume()
+ t.error(err)
+ })
+ })
+})
+
+function errordInflightPost (bodyType) {
+ test(`errored POST body lets inflight complete ${bodyType}`, (t) => {
+ t.plan(6)
+
+ let serverRes
+ const server = createServer()
+ server.on('request', (req, res) => {
+ serverRes = res
+ res.write('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.destroy.bind(client))
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .once('data', () => {
+ client.request({
+ path: '/',
+ method: 'POST',
+ opaque: 'asd',
+ body: maybeWrapStream(new Readable({
+ read () {
+ this.destroy(new Error('kaboom'))
+ }
+ }).once('error', (err) => {
+ t.ok(err)
+ }).on('error', () => {
+ // Readable emits error twice...
+ }), bodyType)
+ }, (err, data) => {
+ t.ok(err)
+ t.equal(data.opaque, 'asd')
+ })
+ client.close((err) => {
+ t.error(err)
+ })
+ serverRes.end()
+ })
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ })
+ })
+}
+
+errordInflightPost(consts.STREAM)
+errordInflightPost(consts.ASYNC_ITERATOR)
+
+test('pipelining non-idempotent', (t) => {
+ t.plan(4)
+
+ const server = createServer()
+ server.on('request', (req, res) => {
+ setTimeout(() => {
+ res.end('asd')
+ }, 10)
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.close.bind(client))
+
+ let ended = false
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ ended = true
+ })
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET',
+ idempotent: false
+ }, (err, data) => {
+ t.error(err)
+ t.equal(ended, true)
+ data.body.resume()
+ })
+ })
+})
+
+function pipeliningNonIdempotentWithBody (bodyType) {
+ test(`pipelining non-idempotent w body ${bodyType}`, (t) => {
+ t.plan(4)
+
+ const server = createServer()
+ server.on('request', (req, res) => {
+ setImmediate(() => {
+ res.end('asd')
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 2
+ })
+ t.teardown(client.close.bind(client))
+
+ let ended = false
+ let reading = false
+ client.request({
+ path: '/',
+ method: 'POST',
+ body: maybeWrapStream(new Readable({
+ read () {
+ if (reading) {
+ return
+ }
+ reading = true
+ this.push('asd')
+ setImmediate(() => {
+ this.push(null)
+ ended = true
+ })
+ }
+ }), bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET',
+ idempotent: false
+ }, (err, data) => {
+ t.error(err)
+ t.equal(ended, true)
+ data.body.resume()
+ })
+ })
+ })
+}
+
+pipeliningNonIdempotentWithBody(consts.STREAM)
+pipeliningNonIdempotentWithBody(consts.ASYNC_ITERATOR)
+
+function pipeliningHeadBusy (bodyType) {
+ test(`pipelining HEAD busy ${bodyType}`, (t) => {
+ t.plan(7)
+
+ const server = createServer()
+ server.on('request', (req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 10
+ })
+ t.teardown(client.close.bind(client))
+
+ client[kConnect](() => {
+ let ended = false
+ client.once('disconnect', () => {
+ t.equal(ended, true)
+ })
+
+ {
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ body: maybeWrapStream(body, bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ body.push(null)
+ t.equal(client[kBusy], true)
+ }
+
+ {
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'HEAD',
+ body: maybeWrapStream(body, bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ ended = true
+ t.pass()
+ })
+ })
+ body.push(null)
+ t.equal(client[kBusy], true)
+ }
+ })
+ })
+ })
+}
+
+pipeliningHeadBusy(consts.STREAM)
+pipeliningHeadBusy(consts.ASYNC_ITERATOR)
+
+test('pipelining empty pipeline before reset', (t) => {
+ t.plan(8)
+
+ let c = 0
+ const server = createServer()
+ server.on('request', (req, res) => {
+ if (c++ === 0) {
+ res.end('asd')
+ } else {
+ setTimeout(() => {
+ res.end('asd')
+ }, 100)
+ }
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 10
+ })
+ t.teardown(client.close.bind(client))
+
+ client[kConnect](() => {
+ let ended = false
+ client.once('disconnect', () => {
+ t.equal(ended, true)
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ t.equal(client[kBusy], false)
+
+ client.request({
+ path: '/',
+ method: 'HEAD',
+ body: 'asd'
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ ended = true
+ t.pass()
+ })
+ })
+ t.equal(client[kBusy], true)
+ t.equal(client[kRunning], 2)
+ })
+ })
+})
+
+function pipeliningIdempotentBusy (bodyType) {
+ test(`pipelining idempotent busy ${bodyType}`, (t) => {
+ t.plan(12)
+
+ const server = createServer()
+ server.on('request', (req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 10
+ })
+ t.teardown(client.close.bind(client))
+
+ {
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ body: maybeWrapStream(body, bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ body.push(null)
+ t.equal(client[kBusy], true)
+ }
+
+ client[kConnect](() => {
+ {
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ body: maybeWrapStream(body, bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ body.push(null)
+ t.equal(client[kBusy], true)
+ }
+
+ {
+ const signal = new EE()
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ body: maybeWrapStream(body, bodyType),
+ signal
+ }, (err, data) => {
+ t.ok(err)
+ })
+ t.equal(client[kBusy], true)
+ signal.emit('abort')
+ t.equal(client[kBusy], true)
+ }
+
+ {
+ const body = new Readable({
+ read () { }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ idempotent: false,
+ body: maybeWrapStream(body, bodyType)
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ body.push(null)
+ t.equal(client[kBusy], true)
+ }
+ })
+ })
+ })
+}
+
+pipeliningIdempotentBusy(consts.STREAM)
+pipeliningIdempotentBusy(consts.ASYNC_ITERATOR)
+
+test('pipelining blocked', (t) => {
+ t.plan(6)
+
+ const server = createServer()
+
+ let blocking = true
+ let count = 0
+
+ server.on('request', (req, res) => {
+ t.ok(!count || !blocking)
+ count++
+ setImmediate(() => {
+ res.end('asd')
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Client(`http://localhost:${server.address().port}`, {
+ pipelining: 10
+ })
+ t.teardown(client.close.bind(client))
+ client.request({
+ path: '/',
+ method: 'GET',
+ blocking: true
+ }, (err, data) => {
+ t.error(err)
+ blocking = false
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ data.body
+ .resume()
+ .on('end', () => {
+ t.pass()
+ })
+ })
+ })
+})