diff options
Diffstat (limited to 'test/pool.js')
-rw-r--r-- | test/pool.js | 1101 |
1 files changed, 1101 insertions, 0 deletions
diff --git a/test/pool.js b/test/pool.js new file mode 100644 index 0000000..8cf7195 --- /dev/null +++ b/test/pool.js @@ -0,0 +1,1101 @@ +'use strict' + +const { EventEmitter } = require('events') +const { createServer } = require('http') +const net = require('net') +const { + finished, + PassThrough, + Readable +} = require('stream') +const { promisify } = require('util') +const proxyquire = require('proxyquire') +const { test } = require('tap') +const { + kBusy, + kPending, + kRunning, + kSize, + kUrl +} = require('../lib/core/symbols') +const { + Client, + Pool, + errors +} = require('..') + +test('throws when connection is inifinite', (t) => { + t.plan(2) + + try { + new Pool(null, { connections: 0 / 0 }) // eslint-disable-line + } catch (e) { + t.type(e, errors.InvalidArgumentError) + t.equal(e.message, 'invalid connections') + } +}) + +test('throws when connections is negative', (t) => { + t.plan(2) + + try { + new Pool(null, { connections: -1 }) // eslint-disable-line no-new + } catch (e) { + t.type(e, errors.InvalidArgumentError) + t.equal(e.message, 'invalid connections') + } +}) + +test('throws when connection is not number', (t) => { + t.plan(2) + + try { + new Pool(null, { connections: true }) // eslint-disable-line no-new + } catch (e) { + t.type(e, errors.InvalidArgumentError) + t.equal(e.message, 'invalid connections') + } +}) + +test('throws when factory is not a function', (t) => { + t.plan(2) + + try { + new Pool(null, { factory: '' }) // eslint-disable-line no-new + } catch (e) { + t.type(e, errors.InvalidArgumentError) + t.equal(e.message, 'factory must be a function.') + } +}) + +test('does not throw when connect is a function', (t) => { + t.plan(1) + + t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} })) +}) + +test('connect/disconnect event(s)', (t) => { + const clients = 2 + + t.plan(clients * 6) + + const server = createServer((req, res) => { + res.writeHead(200, { + Connection: 'keep-alive', + 'Keep-Alive': 'timeout=1s' + }) + res.end('ok') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const pool = new Pool(`http://localhost:${server.address().port}`, { + connections: clients, + keepAliveTimeoutThreshold: 100 + }) + t.teardown(pool.close.bind(pool)) + + pool.on('connect', (origin, [pool, client]) => { + t.equal(client instanceof Client, true) + }) + pool.on('disconnect', (origin, [pool, client], error) => { + t.ok(client instanceof Client) + t.type(error, errors.InformationalError) + t.equal(error.code, 'UND_ERR_INFO') + t.equal(error.message, 'socket idle timeout') + }) + + for (let i = 0; i < clients; i++) { + pool.request({ + path: '/', + method: 'GET' + }, (err, { headers, body }) => { + t.error(err) + body.resume() + }) + } + }) +}) + +test('basic get', (t) => { + t.plan(14) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + t.equal(client[kUrl].origin, `http://localhost:${server.address().port}`) + + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.error(err) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + + t.equal(client.destroyed, false) + t.equal(client.closed, false) + client.close((err) => { + t.error(err) + t.equal(client.destroyed, true) + client.destroy((err) => { + t.error(err) + client.close((err) => { + t.type(err, errors.ClientDestroyedError) + }) + }) + }) + t.equal(client.closed, true) + }) +}) + +test('URL as arg', (t) => { + t.plan(9) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const url = new URL('http://localhost') + url.port = server.address().port + const client = new Pool(url) + t.teardown(client.destroy.bind(client)) + + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.error(err) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + + client.close((err) => { + t.error(err) + client.destroy((err) => { + t.error(err) + client.close((err) => { + t.type(err, errors.ClientDestroyedError) + }) + }) + }) + }) +}) + +test('basic get error async/await', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.destroy() + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + await client.request({ path: '/', method: 'GET' }) + .catch((err) => { + t.ok(err) + }) + + await client.destroy() + + await client.close().catch((err) => { + t.type(err, errors.ClientDestroyedError) + }) + }) +}) + +test('basic get with async/await', async (t) => { + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + await promisify(server.listen.bind(server))(0) + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + + body.resume() + await promisify(finished)(body) + + await client.close() + await client.destroy() +}) + +test('stream get async/await', async (t) => { + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + await promisify(server.listen.bind(server))(0) + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + return new PassThrough() + }) +}) + +test('stream get error async/await', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.destroy() + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + await client.stream({ path: '/', method: 'GET' }, () => { + + }) + .catch((err) => { + t.ok(err) + }) + }) +}) + +test('pipeline get', (t) => { + t.plan(5) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.destroy.bind(client)) + + const bufs = [] + client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + return body + }) + .end() + .on('data', (buf) => { + bufs.push(buf) + }) + .on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) +}) + +test('backpressure algorithm', (t) => { + const seen = [] + let total = 0 + + let writeMore = true + + class FakeClient extends EventEmitter { + constructor () { + super() + + this.id = total++ + } + + dispatch (req, handler) { + seen.push({ req, client: this, id: this.id }) + return writeMore + } + } + + const Pool = proxyquire('../lib/pool', { + './client': FakeClient + }) + + const noopHandler = { + onError (err) { + throw err + } + } + + const pool = new Pool('http://notahost') + + pool.dispatch({}, noopHandler) + pool.dispatch({}, noopHandler) + + const d1 = seen.shift() // d1 = c0 + t.equal(d1.id, 0) + const d2 = seen.shift() // d2 = c0 + t.equal(d2.id, 0) + + t.equal(d1.id, d2.id) + + writeMore = false + + pool.dispatch({}, noopHandler) // d3 = c0 + + pool.dispatch({}, noopHandler) // d4 = c1 + + const d3 = seen.shift() + t.equal(d3.id, 0) + const d4 = seen.shift() + t.equal(d4.id, 1) + + t.equal(d3.id, d2.id) + t.not(d3.id, d4.id) + + writeMore = true + + d4.client.emit('drain', new URL('http://notahost'), []) + + pool.dispatch({}, noopHandler) // d5 = c1 + + d3.client.emit('drain', new URL('http://notahost'), []) + + pool.dispatch({}, noopHandler) // d6 = c0 + + const d5 = seen.shift() + t.equal(d5.id, 1) + const d6 = seen.shift() + t.equal(d6.id, 0) + + t.equal(d5.id, d4.id) + t.equal(d3.id, d6.id) + + t.equal(total, 3) + + t.end() +}) + +test('busy', (t) => { + t.plan(8 * 16 + 2 + 1) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + const connections = 2 + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections, + pipelining: 2 + }) + client.on('drain', () => { + t.pass() + }) + client.on('connect', () => { + t.pass() + }) + t.teardown(client.destroy.bind(client)) + + for (let n = 1; n <= 8; ++n) { + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.error(err) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + t.equal(client[kPending], n) + t.equal(client[kBusy], n > 1) + t.equal(client[kSize], n) + t.equal(client[kRunning], 0) + + t.equal(client.stats.connected, 0) + t.equal(client.stats.free, 0) + t.equal(client.stats.queued, Math.max(n - connections, 0)) + t.equal(client.stats.pending, n) + t.equal(client.stats.size, n) + t.equal(client.stats.running, 0) + } + }) +}) + +test('invalid pool dispatch options', (t) => { + t.plan(2) + const pool = new Pool('http://notahost') + t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler') + t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler') +}) + +test('pool upgrade promise', (t) => { + t.plan(2) + + const server = net.createServer((c) => { + c.on('data', (d) => { + c.write('HTTP/1.1 101\r\n') + c.write('hello: world\r\n') + c.write('connection: upgrade\r\n') + c.write('upgrade: websocket\r\n') + c.write('\r\n') + c.write('Body') + }) + + c.on('end', () => { + c.end() + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + const { headers, socket } = await client.upgrade({ + path: '/', + method: 'GET', + protocol: 'Websocket' + }) + + let recvData = '' + socket.on('data', (d) => { + recvData += d + }) + + socket.on('close', () => { + t.equal(recvData.toString(), 'Body') + }) + + t.same(headers, { + hello: 'world', + connection: 'upgrade', + upgrade: 'websocket' + }) + socket.end() + }) +}) + +test('pool connect', (t) => { + t.plan(1) + + const server = createServer((c) => { + t.fail() + }) + server.on('connect', (req, socket, firstBodyChunk) => { + socket.write('HTTP/1.1 200 Connection established\r\n\r\n') + + let data = firstBodyChunk.toString() + socket.on('data', (buf) => { + data += buf.toString() + }) + + socket.on('end', () => { + socket.end(data) + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + const { socket } = await client.connect({ + path: '/' + }) + + let recvData = '' + socket.on('data', (d) => { + recvData += d + }) + + socket.on('end', () => { + t.equal(recvData.toString(), 'Body') + }) + + socket.write('Body') + socket.end() + }) +}) + +test('pool dispatch', (t) => { + t.plan(2) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + let buf = '' + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + }, + onData (chunk) { + buf += chunk + }, + onComplete () { + t.equal(buf, 'asd') + }, + onError () { + } + }) + }) +}) + +test('pool pipeline args validation', (t) => { + t.plan(2) + + const client = new Pool('http://localhost:5000') + + const ret = client.pipeline(null, () => {}) + ret.on('error', (err) => { + t.ok(/opts/.test(err.message)) + t.type(err, errors.InvalidArgumentError) + }) +}) + +test('300 requests succeed', (t) => { + t.plan(300 * 3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1 + }) + t.teardown(client.destroy.bind(client)) + + for (let n = 0; n < 300; ++n) { + client.request({ + path: '/', + method: 'GET' + }, (err, data) => { + t.error(err) + data.body.on('data', (chunk) => { + t.equal(chunk.toString(), 'asd') + }).on('end', () => { + t.pass() + }) + }) + } + }) +}) + +test('pool connect error', (t) => { + t.plan(1) + + const server = createServer((c) => { + t.fail() + }) + server.on('connect', (req, socket, firstBodyChunk) => { + socket.destroy() + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + try { + await client.connect({ + path: '/' + }) + } catch (err) { + t.ok(err) + } + }) +}) + +test('pool upgrade error', (t) => { + t.plan(1) + + const server = net.createServer((c) => { + c.on('data', (d) => { + c.write('HTTP/1.1 101\r\n') + c.write('hello: world\r\n') + c.write('connection: upgrade\r\n') + c.write('\r\n') + c.write('Body') + }) + c.on('error', () => { + // Whether we get an error, end or close is undefined. + // Ignore error. + }) + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`) + t.teardown(client.close.bind(client)) + + try { + await client.upgrade({ + path: '/', + method: 'GET', + protocol: 'Websocket' + }) + } catch (err) { + t.ok(err) + } + }) +}) + +test('pool dispatch error', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.pass() + }, + onError () { + } + }) + + client.dispatch({ + path: '/', + method: 'GET', + headers: { + 'transfer-encoding': 'fail' + } + }, { + onConnect () { + t.fail() + }, + onHeaders (statusCode, headers) { + t.fail() + }, + onData (chunk) { + t.fail() + }, + onError (err) { + t.equal(err.code, 'UND_ERR_INVALID_ARG') + } + }) + }) +}) + +test('pool request abort in queue', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.pass() + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.request({ + path: '/', + method: 'GET', + signal + }, (err) => { + t.equal(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) +}) + +test('pool stream abort in queue', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.pass() + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.stream({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body, (err) => { + t.equal(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) +}) + +test('pool pipeline abort in queue', (t) => { + t.plan(3) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + client.dispatch({ + path: '/', + method: 'GET' + }, { + onConnect () { + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + }, + onData (chunk) { + }, + onComplete () { + t.pass() + }, + onError () { + } + }) + + const signal = new EventEmitter() + client.pipeline({ + path: '/', + method: 'GET', + signal + }, ({ body }) => body).end().on('error', (err) => { + t.equal(err.code, 'UND_ERR_ABORTED') + }) + signal.emit('abort') + }) +}) + +test('pool stream constructor error destroy body', (t) => { + t.plan(4) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + { + const body = new Readable({ + read () { + } + }) + client.stream({ + path: '/', + method: 'GET', + body, + headers: { + 'transfer-encoding': 'fail' + } + }, () => { + t.fail() + }, (err) => { + t.equal(err.code, 'UND_ERR_INVALID_ARG') + t.equal(body.destroyed, true) + }) + } + + { + const body = new Readable({ + read () { + } + }) + client.stream({ + path: '/', + method: 'CONNECT', + body + }, () => { + t.fail() + }, (err) => { + t.equal(err.code, 'UND_ERR_INVALID_ARG') + t.equal(body.destroyed, true) + }) + } + }) +}) + +test('pool request constructor error destroy body', (t) => { + t.plan(4) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.close.bind(client)) + + { + const body = new Readable({ + read () { + } + }) + client.request({ + path: '/', + method: 'GET', + body, + headers: { + 'transfer-encoding': 'fail' + } + }, (err) => { + t.equal(err.code, 'UND_ERR_INVALID_ARG') + t.equal(body.destroyed, true) + }) + } + + { + const body = new Readable({ + read () { + } + }) + client.request({ + path: '/', + method: 'CONNECT', + body + }, (err) => { + t.equal(err.code, 'UND_ERR_INVALID_ARG') + t.equal(body.destroyed, true) + }) + } + }) +}) + +test('pool close waits for all requests', (t) => { + t.plan(5) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.destroy.bind(client)) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.error(err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.error(err) + }) + + client.close(() => { + t.pass() + }) + + client.close(() => { + t.pass() + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.type(err, errors.ClientClosedError) + }) + }) +}) + +test('pool destroyed', (t) => { + t.plan(1) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.destroy.bind(client)) + + client.destroy() + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.type(err, errors.ClientDestroyedError) + }) + }) +}) + +test('pool destroy fails queued requests', (t) => { + t.plan(6) + + const server = createServer((req, res) => { + res.end('asd') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new Pool(`http://localhost:${server.address().port}`, { + connections: 1, + pipelining: 1 + }) + t.teardown(client.destroy.bind(client)) + + const _err = new Error() + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.equal(err, _err) + }) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.equal(err, _err) + }) + + t.equal(client.destroyed, false) + client.destroy(_err, () => { + t.pass() + }) + t.equal(client.destroyed, true) + + client.request({ + path: '/', + method: 'GET' + }, (err) => { + t.type(err, errors.ClientDestroyedError) + }) + }) +}) |