summaryrefslogtreecommitdiffstats
path: root/test/pool.js
diff options
context:
space:
mode:
Diffstat (limited to 'test/pool.js')
-rw-r--r--test/pool.js1101
1 files changed, 1101 insertions, 0 deletions
diff --git a/test/pool.js b/test/pool.js
new file mode 100644
index 0000000..8cf7195
--- /dev/null
+++ b/test/pool.js
@@ -0,0 +1,1101 @@
+'use strict'
+
+const { EventEmitter } = require('events')
+const { createServer } = require('http')
+const net = require('net')
+const {
+ finished,
+ PassThrough,
+ Readable
+} = require('stream')
+const { promisify } = require('util')
+const proxyquire = require('proxyquire')
+const { test } = require('tap')
+const {
+ kBusy,
+ kPending,
+ kRunning,
+ kSize,
+ kUrl
+} = require('../lib/core/symbols')
+const {
+ Client,
+ Pool,
+ errors
+} = require('..')
+
+test('throws when connection is inifinite', (t) => {
+ t.plan(2)
+
+ try {
+ new Pool(null, { connections: 0 / 0 }) // eslint-disable-line
+ } catch (e) {
+ t.type(e, errors.InvalidArgumentError)
+ t.equal(e.message, 'invalid connections')
+ }
+})
+
+test('throws when connections is negative', (t) => {
+ t.plan(2)
+
+ try {
+ new Pool(null, { connections: -1 }) // eslint-disable-line no-new
+ } catch (e) {
+ t.type(e, errors.InvalidArgumentError)
+ t.equal(e.message, 'invalid connections')
+ }
+})
+
+test('throws when connection is not number', (t) => {
+ t.plan(2)
+
+ try {
+ new Pool(null, { connections: true }) // eslint-disable-line no-new
+ } catch (e) {
+ t.type(e, errors.InvalidArgumentError)
+ t.equal(e.message, 'invalid connections')
+ }
+})
+
+test('throws when factory is not a function', (t) => {
+ t.plan(2)
+
+ try {
+ new Pool(null, { factory: '' }) // eslint-disable-line no-new
+ } catch (e) {
+ t.type(e, errors.InvalidArgumentError)
+ t.equal(e.message, 'factory must be a function.')
+ }
+})
+
+test('does not throw when connect is a function', (t) => {
+ t.plan(1)
+
+ t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} }))
+})
+
+test('connect/disconnect event(s)', (t) => {
+ const clients = 2
+
+ t.plan(clients * 6)
+
+ const server = createServer((req, res) => {
+ res.writeHead(200, {
+ Connection: 'keep-alive',
+ 'Keep-Alive': 'timeout=1s'
+ })
+ res.end('ok')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const pool = new Pool(`http://localhost:${server.address().port}`, {
+ connections: clients,
+ keepAliveTimeoutThreshold: 100
+ })
+ t.teardown(pool.close.bind(pool))
+
+ pool.on('connect', (origin, [pool, client]) => {
+ t.equal(client instanceof Client, true)
+ })
+ pool.on('disconnect', (origin, [pool, client], error) => {
+ t.ok(client instanceof Client)
+ t.type(error, errors.InformationalError)
+ t.equal(error.code, 'UND_ERR_INFO')
+ t.equal(error.message, 'socket idle timeout')
+ })
+
+ for (let i = 0; i < clients; i++) {
+ pool.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { headers, body }) => {
+ t.error(err)
+ body.resume()
+ })
+ }
+ })
+})
+
+test('basic get', (t) => {
+ t.plan(14)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ t.equal(client[kUrl].origin, `http://localhost:${server.address().port}`)
+
+ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
+ t.error(err)
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ body.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ body.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+
+ t.equal(client.destroyed, false)
+ t.equal(client.closed, false)
+ client.close((err) => {
+ t.error(err)
+ t.equal(client.destroyed, true)
+ client.destroy((err) => {
+ t.error(err)
+ client.close((err) => {
+ t.type(err, errors.ClientDestroyedError)
+ })
+ })
+ })
+ t.equal(client.closed, true)
+ })
+})
+
+test('URL as arg', (t) => {
+ t.plan(9)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const url = new URL('http://localhost')
+ url.port = server.address().port
+ const client = new Pool(url)
+ t.teardown(client.destroy.bind(client))
+
+ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
+ t.error(err)
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ body.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ body.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+
+ client.close((err) => {
+ t.error(err)
+ client.destroy((err) => {
+ t.error(err)
+ client.close((err) => {
+ t.type(err, errors.ClientDestroyedError)
+ })
+ })
+ })
+ })
+})
+
+test('basic get error async/await', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.destroy()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ await client.request({ path: '/', method: 'GET' })
+ .catch((err) => {
+ t.ok(err)
+ })
+
+ await client.destroy()
+
+ await client.close().catch((err) => {
+ t.type(err, errors.ClientDestroyedError)
+ })
+ })
+})
+
+test('basic get with async/await', async (t) => {
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ await promisify(server.listen.bind(server))(0)
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+
+ body.resume()
+ await promisify(finished)(body)
+
+ await client.close()
+ await client.destroy()
+})
+
+test('stream get async/await', async (t) => {
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ await promisify(server.listen.bind(server))(0)
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ return new PassThrough()
+ })
+})
+
+test('stream get error async/await', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.destroy()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ await client.stream({ path: '/', method: 'GET' }, () => {
+
+ })
+ .catch((err) => {
+ t.ok(err)
+ })
+ })
+})
+
+test('pipeline get', (t) => {
+ t.plan(5)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ const bufs = []
+ client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ return body
+ })
+ .end()
+ .on('data', (buf) => {
+ bufs.push(buf)
+ })
+ .on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+})
+
+test('backpressure algorithm', (t) => {
+ const seen = []
+ let total = 0
+
+ let writeMore = true
+
+ class FakeClient extends EventEmitter {
+ constructor () {
+ super()
+
+ this.id = total++
+ }
+
+ dispatch (req, handler) {
+ seen.push({ req, client: this, id: this.id })
+ return writeMore
+ }
+ }
+
+ const Pool = proxyquire('../lib/pool', {
+ './client': FakeClient
+ })
+
+ const noopHandler = {
+ onError (err) {
+ throw err
+ }
+ }
+
+ const pool = new Pool('http://notahost')
+
+ pool.dispatch({}, noopHandler)
+ pool.dispatch({}, noopHandler)
+
+ const d1 = seen.shift() // d1 = c0
+ t.equal(d1.id, 0)
+ const d2 = seen.shift() // d2 = c0
+ t.equal(d2.id, 0)
+
+ t.equal(d1.id, d2.id)
+
+ writeMore = false
+
+ pool.dispatch({}, noopHandler) // d3 = c0
+
+ pool.dispatch({}, noopHandler) // d4 = c1
+
+ const d3 = seen.shift()
+ t.equal(d3.id, 0)
+ const d4 = seen.shift()
+ t.equal(d4.id, 1)
+
+ t.equal(d3.id, d2.id)
+ t.not(d3.id, d4.id)
+
+ writeMore = true
+
+ d4.client.emit('drain', new URL('http://notahost'), [])
+
+ pool.dispatch({}, noopHandler) // d5 = c1
+
+ d3.client.emit('drain', new URL('http://notahost'), [])
+
+ pool.dispatch({}, noopHandler) // d6 = c0
+
+ const d5 = seen.shift()
+ t.equal(d5.id, 1)
+ const d6 = seen.shift()
+ t.equal(d6.id, 0)
+
+ t.equal(d5.id, d4.id)
+ t.equal(d3.id, d6.id)
+
+ t.equal(total, 3)
+
+ t.end()
+})
+
+test('busy', (t) => {
+ t.plan(8 * 16 + 2 + 1)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ const connections = 2
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections,
+ pipelining: 2
+ })
+ client.on('drain', () => {
+ t.pass()
+ })
+ client.on('connect', () => {
+ t.pass()
+ })
+ t.teardown(client.destroy.bind(client))
+
+ for (let n = 1; n <= 8; ++n) {
+ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
+ t.error(err)
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ body.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ body.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+ t.equal(client[kPending], n)
+ t.equal(client[kBusy], n > 1)
+ t.equal(client[kSize], n)
+ t.equal(client[kRunning], 0)
+
+ t.equal(client.stats.connected, 0)
+ t.equal(client.stats.free, 0)
+ t.equal(client.stats.queued, Math.max(n - connections, 0))
+ t.equal(client.stats.pending, n)
+ t.equal(client.stats.size, n)
+ t.equal(client.stats.running, 0)
+ }
+ })
+})
+
+test('invalid pool dispatch options', (t) => {
+ t.plan(2)
+ const pool = new Pool('http://notahost')
+ t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler')
+ t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler')
+})
+
+test('pool upgrade promise', (t) => {
+ t.plan(2)
+
+ const server = net.createServer((c) => {
+ c.on('data', (d) => {
+ c.write('HTTP/1.1 101\r\n')
+ c.write('hello: world\r\n')
+ c.write('connection: upgrade\r\n')
+ c.write('upgrade: websocket\r\n')
+ c.write('\r\n')
+ c.write('Body')
+ })
+
+ c.on('end', () => {
+ c.end()
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const { headers, socket } = await client.upgrade({
+ path: '/',
+ method: 'GET',
+ protocol: 'Websocket'
+ })
+
+ let recvData = ''
+ socket.on('data', (d) => {
+ recvData += d
+ })
+
+ socket.on('close', () => {
+ t.equal(recvData.toString(), 'Body')
+ })
+
+ t.same(headers, {
+ hello: 'world',
+ connection: 'upgrade',
+ upgrade: 'websocket'
+ })
+ socket.end()
+ })
+})
+
+test('pool connect', (t) => {
+ t.plan(1)
+
+ const server = createServer((c) => {
+ t.fail()
+ })
+ server.on('connect', (req, socket, firstBodyChunk) => {
+ socket.write('HTTP/1.1 200 Connection established\r\n\r\n')
+
+ let data = firstBodyChunk.toString()
+ socket.on('data', (buf) => {
+ data += buf.toString()
+ })
+
+ socket.on('end', () => {
+ socket.end(data)
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ const { socket } = await client.connect({
+ path: '/'
+ })
+
+ let recvData = ''
+ socket.on('data', (d) => {
+ recvData += d
+ })
+
+ socket.on('end', () => {
+ t.equal(recvData.toString(), 'Body')
+ })
+
+ socket.write('Body')
+ socket.end()
+ })
+})
+
+test('pool dispatch', (t) => {
+ t.plan(2)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ let buf = ''
+ client.dispatch({
+ path: '/',
+ method: 'GET'
+ }, {
+ onConnect () {
+ },
+ onHeaders (statusCode, headers) {
+ t.equal(statusCode, 200)
+ },
+ onData (chunk) {
+ buf += chunk
+ },
+ onComplete () {
+ t.equal(buf, 'asd')
+ },
+ onError () {
+ }
+ })
+ })
+})
+
+test('pool pipeline args validation', (t) => {
+ t.plan(2)
+
+ const client = new Pool('http://localhost:5000')
+
+ const ret = client.pipeline(null, () => {})
+ ret.on('error', (err) => {
+ t.ok(/opts/.test(err.message))
+ t.type(err, errors.InvalidArgumentError)
+ })
+})
+
+test('300 requests succeed', (t) => {
+ t.plan(300 * 3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1
+ })
+ t.teardown(client.destroy.bind(client))
+
+ for (let n = 0; n < 300; ++n) {
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err, data) => {
+ t.error(err)
+ data.body.on('data', (chunk) => {
+ t.equal(chunk.toString(), 'asd')
+ }).on('end', () => {
+ t.pass()
+ })
+ })
+ }
+ })
+})
+
+test('pool connect error', (t) => {
+ t.plan(1)
+
+ const server = createServer((c) => {
+ t.fail()
+ })
+ server.on('connect', (req, socket, firstBodyChunk) => {
+ socket.destroy()
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ try {
+ await client.connect({
+ path: '/'
+ })
+ } catch (err) {
+ t.ok(err)
+ }
+ })
+})
+
+test('pool upgrade error', (t) => {
+ t.plan(1)
+
+ const server = net.createServer((c) => {
+ c.on('data', (d) => {
+ c.write('HTTP/1.1 101\r\n')
+ c.write('hello: world\r\n')
+ c.write('connection: upgrade\r\n')
+ c.write('\r\n')
+ c.write('Body')
+ })
+ c.on('error', () => {
+ // Whether we get an error, end or close is undefined.
+ // Ignore error.
+ })
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`)
+ t.teardown(client.close.bind(client))
+
+ try {
+ await client.upgrade({
+ path: '/',
+ method: 'GET',
+ protocol: 'Websocket'
+ })
+ } catch (err) {
+ t.ok(err)
+ }
+ })
+})
+
+test('pool dispatch error', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ client.dispatch({
+ path: '/',
+ method: 'GET'
+ }, {
+ onConnect () {
+ },
+ onHeaders (statusCode, headers) {
+ t.equal(statusCode, 200)
+ },
+ onData (chunk) {
+ },
+ onComplete () {
+ t.pass()
+ },
+ onError () {
+ }
+ })
+
+ client.dispatch({
+ path: '/',
+ method: 'GET',
+ headers: {
+ 'transfer-encoding': 'fail'
+ }
+ }, {
+ onConnect () {
+ t.fail()
+ },
+ onHeaders (statusCode, headers) {
+ t.fail()
+ },
+ onData (chunk) {
+ t.fail()
+ },
+ onError (err) {
+ t.equal(err.code, 'UND_ERR_INVALID_ARG')
+ }
+ })
+ })
+})
+
+test('pool request abort in queue', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ client.dispatch({
+ path: '/',
+ method: 'GET'
+ }, {
+ onConnect () {
+ },
+ onHeaders (statusCode, headers) {
+ t.equal(statusCode, 200)
+ },
+ onData (chunk) {
+ },
+ onComplete () {
+ t.pass()
+ },
+ onError () {
+ }
+ })
+
+ const signal = new EventEmitter()
+ client.request({
+ path: '/',
+ method: 'GET',
+ signal
+ }, (err) => {
+ t.equal(err.code, 'UND_ERR_ABORTED')
+ })
+ signal.emit('abort')
+ })
+})
+
+test('pool stream abort in queue', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ client.dispatch({
+ path: '/',
+ method: 'GET'
+ }, {
+ onConnect () {
+ },
+ onHeaders (statusCode, headers) {
+ t.equal(statusCode, 200)
+ },
+ onData (chunk) {
+ },
+ onComplete () {
+ t.pass()
+ },
+ onError () {
+ }
+ })
+
+ const signal = new EventEmitter()
+ client.stream({
+ path: '/',
+ method: 'GET',
+ signal
+ }, ({ body }) => body, (err) => {
+ t.equal(err.code, 'UND_ERR_ABORTED')
+ })
+ signal.emit('abort')
+ })
+})
+
+test('pool pipeline abort in queue', (t) => {
+ t.plan(3)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ client.dispatch({
+ path: '/',
+ method: 'GET'
+ }, {
+ onConnect () {
+ },
+ onHeaders (statusCode, headers) {
+ t.equal(statusCode, 200)
+ },
+ onData (chunk) {
+ },
+ onComplete () {
+ t.pass()
+ },
+ onError () {
+ }
+ })
+
+ const signal = new EventEmitter()
+ client.pipeline({
+ path: '/',
+ method: 'GET',
+ signal
+ }, ({ body }) => body).end().on('error', (err) => {
+ t.equal(err.code, 'UND_ERR_ABORTED')
+ })
+ signal.emit('abort')
+ })
+})
+
+test('pool stream constructor error destroy body', (t) => {
+ t.plan(4)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ {
+ const body = new Readable({
+ read () {
+ }
+ })
+ client.stream({
+ path: '/',
+ method: 'GET',
+ body,
+ headers: {
+ 'transfer-encoding': 'fail'
+ }
+ }, () => {
+ t.fail()
+ }, (err) => {
+ t.equal(err.code, 'UND_ERR_INVALID_ARG')
+ t.equal(body.destroyed, true)
+ })
+ }
+
+ {
+ const body = new Readable({
+ read () {
+ }
+ })
+ client.stream({
+ path: '/',
+ method: 'CONNECT',
+ body
+ }, () => {
+ t.fail()
+ }, (err) => {
+ t.equal(err.code, 'UND_ERR_INVALID_ARG')
+ t.equal(body.destroyed, true)
+ })
+ }
+ })
+})
+
+test('pool request constructor error destroy body', (t) => {
+ t.plan(4)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.close.bind(client))
+
+ {
+ const body = new Readable({
+ read () {
+ }
+ })
+ client.request({
+ path: '/',
+ method: 'GET',
+ body,
+ headers: {
+ 'transfer-encoding': 'fail'
+ }
+ }, (err) => {
+ t.equal(err.code, 'UND_ERR_INVALID_ARG')
+ t.equal(body.destroyed, true)
+ })
+ }
+
+ {
+ const body = new Readable({
+ read () {
+ }
+ })
+ client.request({
+ path: '/',
+ method: 'CONNECT',
+ body
+ }, (err) => {
+ t.equal(err.code, 'UND_ERR_INVALID_ARG')
+ t.equal(body.destroyed, true)
+ })
+ }
+ })
+})
+
+test('pool close waits for all requests', (t) => {
+ t.plan(5)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.destroy.bind(client))
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.error(err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.error(err)
+ })
+
+ client.close(() => {
+ t.pass()
+ })
+
+ client.close(() => {
+ t.pass()
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.type(err, errors.ClientClosedError)
+ })
+ })
+})
+
+test('pool destroyed', (t) => {
+ t.plan(1)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.destroy.bind(client))
+
+ client.destroy()
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.type(err, errors.ClientDestroyedError)
+ })
+ })
+})
+
+test('pool destroy fails queued requests', (t) => {
+ t.plan(6)
+
+ const server = createServer((req, res) => {
+ res.end('asd')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new Pool(`http://localhost:${server.address().port}`, {
+ connections: 1,
+ pipelining: 1
+ })
+ t.teardown(client.destroy.bind(client))
+
+ const _err = new Error()
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.equal(err, _err)
+ })
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.equal(err, _err)
+ })
+
+ t.equal(client.destroyed, false)
+ client.destroy(_err, () => {
+ t.pass()
+ })
+ t.equal(client.destroyed, true)
+
+ client.request({
+ path: '/',
+ method: 'GET'
+ }, (err) => {
+ t.type(err, errors.ClientDestroyedError)
+ })
+ })
+})