summaryrefslogtreecommitdiffstats
path: root/test/balanced-pool.js
diff options
context:
space:
mode:
Diffstat (limited to 'test/balanced-pool.js')
-rw-r--r--test/balanced-pool.js566
1 files changed, 566 insertions, 0 deletions
diff --git a/test/balanced-pool.js b/test/balanced-pool.js
new file mode 100644
index 0000000..d20f926
--- /dev/null
+++ b/test/balanced-pool.js
@@ -0,0 +1,566 @@
+'use strict'
+
+const { test } = require('tap')
+const { BalancedPool, Pool, Client, errors } = require('..')
+const { nodeMajor } = require('../lib/core/util')
+const { createServer } = require('http')
+const { promisify } = require('util')
+
+test('throws when factory is not a function', (t) => {
+ t.plan(2)
+
+ try {
+ new BalancedPool(null, { factory: '' }) // eslint-disable-line
+ } catch (err) {
+ t.type(err, errors.InvalidArgumentError)
+ t.equal(err.message, 'factory must be a function.')
+ }
+})
+
+test('add/remove upstreams', (t) => {
+ t.plan(7)
+
+ const upstream01 = 'http://localhost:1'
+ const upstream02 = 'http://localhost:2'
+
+ const pool = new BalancedPool()
+ t.same(pool.upstreams, [])
+
+ // try to remove non-existent upstream
+ pool.removeUpstream(upstream01)
+ t.same(pool.upstreams, [])
+
+ pool.addUpstream(upstream01)
+ t.same(pool.upstreams, [upstream01])
+
+ // try to add the same upstream
+ pool.addUpstream(upstream01)
+ t.same(pool.upstreams, [upstream01])
+
+ pool.addUpstream(upstream02)
+ t.same(pool.upstreams, [upstream01, upstream02])
+
+ pool.removeUpstream(upstream02)
+ t.same(pool.upstreams, [upstream01])
+
+ pool.removeUpstream(upstream01)
+ t.same(pool.upstreams, [])
+})
+
+test('basic get', async (t) => {
+ t.plan(16)
+
+ let server1Called = 0
+ const server1 = createServer((req, res) => {
+ server1Called++
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server1.close.bind(server1))
+
+ await promisify(server1.listen).call(server1, 0)
+
+ let server2Called = 0
+ const server2 = createServer((req, res) => {
+ server2Called++
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server2.close.bind(server2))
+
+ await promisify(server2.listen).call(server2, 0)
+
+ const client = new BalancedPool()
+ client.addUpstream(`http://localhost:${server1.address().port}`)
+ client.addUpstream(`http://localhost:${server2.address().port}`)
+ t.teardown(client.destroy.bind(client))
+
+ {
+ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ t.equal('hello', await body.text())
+ }
+
+ {
+ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ t.equal('hello', await body.text())
+ }
+
+ t.equal(server1Called, 1)
+ t.equal(server2Called, 1)
+
+ t.equal(client.destroyed, false)
+ t.equal(client.closed, false)
+ await client.close()
+ t.equal(client.destroyed, true)
+ t.equal(client.closed, true)
+})
+
+test('connect/disconnect event(s)', (t) => {
+ const clients = 2
+
+ t.plan(clients * 5)
+
+ const server = createServer((req, res) => {
+ res.writeHead(200, {
+ Connection: 'keep-alive',
+ 'Keep-Alive': 'timeout=1s'
+ })
+ res.end('ok')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, () => {
+ const pool = new BalancedPool(`http://localhost:${server.address().port}`, {
+ connections: clients,
+ keepAliveTimeoutThreshold: 100
+ })
+ t.teardown(pool.close.bind(pool))
+
+ pool.on('connect', (origin, [pool, pool2, client]) => {
+ t.equal(client instanceof Client, true)
+ })
+ pool.on('disconnect', (origin, [pool, pool2, client], error) => {
+ t.ok(client instanceof Client)
+ t.type(error, errors.InformationalError)
+ t.equal(error.code, 'UND_ERR_INFO')
+ })
+
+ for (let i = 0; i < clients; i++) {
+ pool.request({
+ path: '/',
+ method: 'GET'
+ }, (err, { headers, body }) => {
+ t.error(err)
+ body.resume()
+ })
+ }
+ })
+})
+
+test('busy', (t) => {
+ t.plan(8 * 6 + 2 + 1)
+
+ const server = createServer((req, res) => {
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ server.listen(0, async () => {
+ const client = new BalancedPool(`http://localhost:${server.address().port}`, {
+ connections: 2,
+ pipelining: 2
+ })
+ client.on('drain', () => {
+ t.pass()
+ })
+ client.on('connect', () => {
+ t.pass()
+ })
+ t.teardown(client.destroy.bind(client))
+
+ for (let n = 1; n <= 8; ++n) {
+ client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
+ t.error(err)
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ const bufs = []
+ body.on('data', (buf) => {
+ bufs.push(buf)
+ })
+ body.on('end', () => {
+ t.equal('hello', Buffer.concat(bufs).toString('utf8'))
+ })
+ })
+ }
+ })
+})
+
+test('factory option with basic get request', async (t) => {
+ t.plan(12)
+
+ let factoryCalled = 0
+ const opts = {
+ factory: (origin, opts) => {
+ factoryCalled++
+ return new Pool(origin, opts)
+ }
+ }
+
+ const client = new BalancedPool([], opts) // eslint-disable-line
+
+ let serverCalled = 0
+ const server = createServer((req, res) => {
+ serverCalled++
+ t.equal('/', req.url)
+ t.equal('GET', req.method)
+ res.setHeader('content-type', 'text/plain')
+ res.end('hello')
+ })
+ t.teardown(server.close.bind(server))
+
+ await promisify(server.listen).call(server, 0)
+
+ client.addUpstream(`http://localhost:${server.address().port}`)
+
+ t.same(client.upstreams, [`http://localhost:${server.address().port}`])
+
+ t.teardown(client.destroy.bind(client))
+
+ {
+ const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
+ t.equal(statusCode, 200)
+ t.equal(headers['content-type'], 'text/plain')
+ t.equal('hello', await body.text())
+ }
+
+ t.equal(serverCalled, 1)
+ t.equal(factoryCalled, 1)
+
+ t.equal(client.destroyed, false)
+ t.equal(client.closed, false)
+ await client.close()
+ t.equal(client.destroyed, true)
+ t.equal(client.closed, true)
+})
+
+test('throws when upstream is missing', async (t) => {
+ t.plan(2)
+
+ const pool = new BalancedPool()
+
+ try {
+ await pool.request({ path: '/', method: 'GET' })
+ } catch (e) {
+ t.type(e, errors.BalancedPoolMissingUpstreamError)
+ t.equal(e.message, 'No upstream has been added to the BalancedPool')
+ }
+})
+
+class TestServer {
+ constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) {
+ this.config = {
+ downOnRequests: downOnRequests || [],
+ socketHangupOnRequests: socketHangupOnRequests || [],
+ socketHangup
+ }
+ this.name = server
+ // start a server listening to any port available on the host
+ this.port = 0
+ this.iteration = 0
+ this.requestsCount = 0
+ this.onRequest = onRequest
+ this.server = null
+ }
+
+ _shouldHangupOnClient () {
+ if (this.config.socketHangup) {
+ return true
+ }
+ if (this.config.socketHangupOnRequests.includes(this.requestsCount)) {
+ return true
+ }
+
+ return false
+ }
+
+ _shouldStopServer () {
+ if (this.config.upstreamDown === true || this.config.downOnRequests.includes(this.requestsCount)) {
+ return true
+ }
+ return false
+ }
+
+ async prepareForIteration (iteration) {
+ // set current iteration
+ this.iteration = iteration
+
+ if (this._shouldStopServer()) {
+ await this.stop()
+ } else if (!this.isRunning()) {
+ await this.start()
+ }
+ }
+
+ start () {
+ this.server = createServer((req, res) => {
+ if (this._shouldHangupOnClient()) {
+ req.destroy(new Error('(ツ)'))
+ return
+ }
+ this.requestsCount++
+ res.end('server is running!')
+
+ this.onRequest(this)
+ }).listen(this.port)
+
+ this.server.keepAliveTimeout = 2000
+
+ return new Promise((resolve) => {
+ this.server.on('listening', () => {
+ // store the used port to use it again if the server was stopped as part of test and then started again
+ this.port = this.server.address().port
+
+ return resolve()
+ })
+ })
+ }
+
+ isRunning () {
+ return !!this.server.address()
+ }
+
+ stop () {
+ if (!this.isRunning()) {
+ return
+ }
+
+ return new Promise(resolve => {
+ this.server.close(() => resolve())
+ })
+ }
+}
+
+const cases = [
+
+ // 0
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 7,
+ config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }],
+ expected: ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 0,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.34, 0.33, 0.33]
+ },
+
+ // 1
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', downOnRequests: [0] }, { server: 'B' }, { server: 'C' }],
+ expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A', 'B', 'C', 'A'],
+ expectedConnectionRefusedErrors: 1,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.32, 0.34, 0.34]
+ },
+
+ // 2
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C' }],
+ expected: ['A', 'B/connectionRefused', 'C', 'A', 'C', 'A', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 1,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.34, 0.32, 0.34]
+ },
+
+ // 3
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }],
+ expected: ['A', 'B/connectionRefused', 'C/connectionRefused', 'A', 'A', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 2,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.35, 0.33, 0.32]
+ },
+
+ // 4
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }],
+ expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 3,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.34, 0.33, 0.33]
+ },
+
+ // 5
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', downOnRequests: [0, 1, 2] }, { server: 'B', downOnRequests: [0, 1, 2] }, { server: 'C', downOnRequests: [0, 1, 2] }],
+ expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 9,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.34, 0.33, 0.33]
+ },
+
+ // 6
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0, 1] }, { server: 'C', downOnRequests: [0] }],
+ expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B/connectionRefused', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'C', 'A', 'C', 'A', 'C', 'A', 'B'],
+ expectedConnectionRefusedErrors: 4,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.36, 0.29, 0.35]
+ },
+
+ // 7
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A' }, { server: 'B' }, { server: 'C', downOnRequests: [1] }],
+ expected: ['A', 'B', 'C', 'A', 'B', 'C/connectionRefused', 'A', 'B', 'A', 'B', 'A', 'B', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 1,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.34, 0.34, 0.32],
+
+ // Skip because the behavior of Node.js has changed
+ skip: nodeMajor >= 19
+ },
+
+ // 8
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', socketHangupOnRequests: [1] }, { server: 'B' }, { server: 'C' }],
+ expected: ['A', 'B', 'C', 'A/socketError', 'B', 'C', 'B', 'C', 'B', 'C', 'A'],
+ expectedConnectionRefusedErrors: 0,
+ expectedSocketErrors: 1,
+ expectedRatios: [0.32, 0.34, 0.34]
+ },
+
+ // 9
+
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 7,
+ config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }, { server: 'D' }, { server: 'E' }],
+ expected: ['A', 'B', 'C', 'D', 'E', 'A', 'B', 'C', 'D', 'E'],
+ expectedConnectionRefusedErrors: 0,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.2, 0.2, 0.2, 0.2, 0.2]
+ },
+
+ // 10
+ {
+ iterations: 100,
+ maxWeightPerServer: 100,
+ errorPenalty: 15,
+ config: [{ server: 'A', downOnRequests: [0, 1, 2, 3] }, { server: 'B' }, { server: 'C' }],
+ expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'],
+ expectedConnectionRefusedErrors: 4,
+ expectedSocketErrors: 0,
+ expectedRatios: [0.18, 0.41, 0.41]
+ }
+
+]
+
+for (const [index, { config, expected, expectedRatios, iterations = 9, expectedConnectionRefusedErrors = 0, expectedSocketErrors = 0, maxWeightPerServer, errorPenalty = 10, only = false, skip = false }] of cases.entries()) {
+ test(`weighted round robin - case ${index}`, { only, skip }, async (t) => {
+ // cerate an array to store succesfull reqeusts
+ const requestLog = []
+
+ // create instances of the test servers according to the config
+ const servers = config.map((serverConfig) => new TestServer({
+ config: serverConfig,
+ onRequest: (server) => {
+ requestLog.push(server.name)
+ }
+ }))
+ t.teardown(() => servers.map(server => server.stop()))
+
+ // start all servers to get a port so that we can build the upstream urls to supply them to undici
+ await Promise.all(servers.map(server => server.start()))
+
+ // build upstream urls
+ const urls = servers.map(server => `http://localhost:${server.port}`)
+
+ // add upstreams
+ const client = new BalancedPool(urls[0], { maxWeightPerServer, errorPenalty })
+ urls.slice(1).map(url => client.addUpstream(url))
+
+ let connectionRefusedErrors = 0
+ let socketErrors = 0
+ for (let i = 0; i < iterations; i++) {
+ // setup test servers for the next iteration
+
+ await Promise.all(servers.map(server => server.prepareForIteration(i)))
+
+ // send a request using undinci
+ try {
+ await client.request({ path: '/', method: 'GET' })
+ } catch (e) {
+ const serverWithError =
+ servers.find(server => server.port === e.port) ||
+ servers.find(server => {
+ if (typeof AggregateError === 'function' && e instanceof AggregateError) {
+ return e.errors.some(e => server.port === (e.socket?.remotePort ?? e.port))
+ }
+
+ return server.port === e.socket.remotePort
+ })
+
+ serverWithError.requestsCount++
+
+ if (e.code === 'ECONNREFUSED') {
+ requestLog.push(`${serverWithError.name}/connectionRefused`)
+ connectionRefusedErrors++
+ }
+ if (e.code === 'UND_ERR_SOCKET') {
+ requestLog.push(`${serverWithError.name}/socketError`)
+
+ socketErrors++
+ }
+ }
+ }
+ const totalRequests = servers.reduce((acc, server) => {
+ return acc + server.requestsCount
+ }, 0)
+
+ t.equal(totalRequests, iterations)
+
+ t.equal(connectionRefusedErrors, expectedConnectionRefusedErrors)
+ t.equal(socketErrors, expectedSocketErrors)
+
+ if (expectedRatios) {
+ const ratios = servers.reduce((acc, el) => {
+ acc[el.name] = 0
+ return acc
+ }, {})
+ requestLog.map(el => ratios[el[0]]++)
+
+ t.match(Object.keys(ratios).map(k => ratios[k] / iterations), expectedRatios)
+ }
+
+ if (expected) {
+ t.match(requestLog.slice(0, expected.length), expected)
+ }
+
+ await client.close()
+ })
+}