diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-21 20:56:19 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-21 20:56:19 +0000 |
commit | 0b6210cd37b68b94252cb798598b12974a20e1c1 (patch) | |
tree | e371686554a877842d95aa94f100bee552ff2a8e /test/balanced-pool.js | |
parent | Initial commit. (diff) | |
download | node-undici-upstream.tar.xz node-undici-upstream.zip |
Adding upstream version 5.28.2+dfsg1+~cs23.11.12.3.upstream/5.28.2+dfsg1+_cs23.11.12.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/balanced-pool.js')
-rw-r--r-- | test/balanced-pool.js | 566 |
1 files changed, 566 insertions, 0 deletions
diff --git a/test/balanced-pool.js b/test/balanced-pool.js new file mode 100644 index 0000000..d20f926 --- /dev/null +++ b/test/balanced-pool.js @@ -0,0 +1,566 @@ +'use strict' + +const { test } = require('tap') +const { BalancedPool, Pool, Client, errors } = require('..') +const { nodeMajor } = require('../lib/core/util') +const { createServer } = require('http') +const { promisify } = require('util') + +test('throws when factory is not a function', (t) => { + t.plan(2) + + try { + new BalancedPool(null, { factory: '' }) // eslint-disable-line + } catch (err) { + t.type(err, errors.InvalidArgumentError) + t.equal(err.message, 'factory must be a function.') + } +}) + +test('add/remove upstreams', (t) => { + t.plan(7) + + const upstream01 = 'http://localhost:1' + const upstream02 = 'http://localhost:2' + + const pool = new BalancedPool() + t.same(pool.upstreams, []) + + // try to remove non-existent upstream + pool.removeUpstream(upstream01) + t.same(pool.upstreams, []) + + pool.addUpstream(upstream01) + t.same(pool.upstreams, [upstream01]) + + // try to add the same upstream + pool.addUpstream(upstream01) + t.same(pool.upstreams, [upstream01]) + + pool.addUpstream(upstream02) + t.same(pool.upstreams, [upstream01, upstream02]) + + pool.removeUpstream(upstream02) + t.same(pool.upstreams, [upstream01]) + + pool.removeUpstream(upstream01) + t.same(pool.upstreams, []) +}) + +test('basic get', async (t) => { + t.plan(16) + + let server1Called = 0 + const server1 = createServer((req, res) => { + server1Called++ + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server1.close.bind(server1)) + + await promisify(server1.listen).call(server1, 0) + + let server2Called = 0 + const server2 = createServer((req, res) => { + server2Called++ + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server2.close.bind(server2)) + + await promisify(server2.listen).call(server2, 0) + + const client = new BalancedPool() + client.addUpstream(`http://localhost:${server1.address().port}`) + client.addUpstream(`http://localhost:${server2.address().port}`) + t.teardown(client.destroy.bind(client)) + + { + const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + t.equal('hello', await body.text()) + } + + { + const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + t.equal('hello', await body.text()) + } + + t.equal(server1Called, 1) + t.equal(server2Called, 1) + + t.equal(client.destroyed, false) + t.equal(client.closed, false) + await client.close() + t.equal(client.destroyed, true) + t.equal(client.closed, true) +}) + +test('connect/disconnect event(s)', (t) => { + const clients = 2 + + t.plan(clients * 5) + + const server = createServer((req, res) => { + res.writeHead(200, { + Connection: 'keep-alive', + 'Keep-Alive': 'timeout=1s' + }) + res.end('ok') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, () => { + const pool = new BalancedPool(`http://localhost:${server.address().port}`, { + connections: clients, + keepAliveTimeoutThreshold: 100 + }) + t.teardown(pool.close.bind(pool)) + + pool.on('connect', (origin, [pool, pool2, client]) => { + t.equal(client instanceof Client, true) + }) + pool.on('disconnect', (origin, [pool, pool2, client], error) => { + t.ok(client instanceof Client) + t.type(error, errors.InformationalError) + t.equal(error.code, 'UND_ERR_INFO') + }) + + for (let i = 0; i < clients; i++) { + pool.request({ + path: '/', + method: 'GET' + }, (err, { headers, body }) => { + t.error(err) + body.resume() + }) + } + }) +}) + +test('busy', (t) => { + t.plan(8 * 6 + 2 + 1) + + const server = createServer((req, res) => { + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + server.listen(0, async () => { + const client = new BalancedPool(`http://localhost:${server.address().port}`, { + connections: 2, + pipelining: 2 + }) + client.on('drain', () => { + t.pass() + }) + client.on('connect', () => { + t.pass() + }) + t.teardown(client.destroy.bind(client)) + + for (let n = 1; n <= 8; ++n) { + client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => { + t.error(err) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + const bufs = [] + body.on('data', (buf) => { + bufs.push(buf) + }) + body.on('end', () => { + t.equal('hello', Buffer.concat(bufs).toString('utf8')) + }) + }) + } + }) +}) + +test('factory option with basic get request', async (t) => { + t.plan(12) + + let factoryCalled = 0 + const opts = { + factory: (origin, opts) => { + factoryCalled++ + return new Pool(origin, opts) + } + } + + const client = new BalancedPool([], opts) // eslint-disable-line + + let serverCalled = 0 + const server = createServer((req, res) => { + serverCalled++ + t.equal('/', req.url) + t.equal('GET', req.method) + res.setHeader('content-type', 'text/plain') + res.end('hello') + }) + t.teardown(server.close.bind(server)) + + await promisify(server.listen).call(server, 0) + + client.addUpstream(`http://localhost:${server.address().port}`) + + t.same(client.upstreams, [`http://localhost:${server.address().port}`]) + + t.teardown(client.destroy.bind(client)) + + { + const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' }) + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain') + t.equal('hello', await body.text()) + } + + t.equal(serverCalled, 1) + t.equal(factoryCalled, 1) + + t.equal(client.destroyed, false) + t.equal(client.closed, false) + await client.close() + t.equal(client.destroyed, true) + t.equal(client.closed, true) +}) + +test('throws when upstream is missing', async (t) => { + t.plan(2) + + const pool = new BalancedPool() + + try { + await pool.request({ path: '/', method: 'GET' }) + } catch (e) { + t.type(e, errors.BalancedPoolMissingUpstreamError) + t.equal(e.message, 'No upstream has been added to the BalancedPool') + } +}) + +class TestServer { + constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) { + this.config = { + downOnRequests: downOnRequests || [], + socketHangupOnRequests: socketHangupOnRequests || [], + socketHangup + } + this.name = server + // start a server listening to any port available on the host + this.port = 0 + this.iteration = 0 + this.requestsCount = 0 + this.onRequest = onRequest + this.server = null + } + + _shouldHangupOnClient () { + if (this.config.socketHangup) { + return true + } + if (this.config.socketHangupOnRequests.includes(this.requestsCount)) { + return true + } + + return false + } + + _shouldStopServer () { + if (this.config.upstreamDown === true || this.config.downOnRequests.includes(this.requestsCount)) { + return true + } + return false + } + + async prepareForIteration (iteration) { + // set current iteration + this.iteration = iteration + + if (this._shouldStopServer()) { + await this.stop() + } else if (!this.isRunning()) { + await this.start() + } + } + + start () { + this.server = createServer((req, res) => { + if (this._shouldHangupOnClient()) { + req.destroy(new Error('(ツ)')) + return + } + this.requestsCount++ + res.end('server is running!') + + this.onRequest(this) + }).listen(this.port) + + this.server.keepAliveTimeout = 2000 + + return new Promise((resolve) => { + this.server.on('listening', () => { + // store the used port to use it again if the server was stopped as part of test and then started again + this.port = this.server.address().port + + return resolve() + }) + }) + } + + isRunning () { + return !!this.server.address() + } + + stop () { + if (!this.isRunning()) { + return + } + + return new Promise(resolve => { + this.server.close(() => resolve()) + }) + } +} + +const cases = [ + + // 0 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 7, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }], + expected: ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 1 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B' }, { server: 'C' }], + expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A', 'B', 'C', 'A'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.32, 0.34, 0.34] + }, + + // 2 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C' }], + expected: ['A', 'B/connectionRefused', 'C', 'A', 'C', 'A', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.32, 0.34] + }, + + // 3 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }], + expected: ['A', 'B/connectionRefused', 'C/connectionRefused', 'A', 'A', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 2, + expectedSocketErrors: 0, + expectedRatios: [0.35, 0.33, 0.32] + }, + + // 4 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 3, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 5 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0, 1, 2] }, { server: 'B', downOnRequests: [0, 1, 2] }, { server: 'C', downOnRequests: [0, 1, 2] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 9, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.33, 0.33] + }, + + // 6 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0, 1] }, { server: 'C', downOnRequests: [0] }], + expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B/connectionRefused', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'C', 'A', 'C', 'A', 'C', 'A', 'B'], + expectedConnectionRefusedErrors: 4, + expectedSocketErrors: 0, + expectedRatios: [0.36, 0.29, 0.35] + }, + + // 7 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C', downOnRequests: [1] }], + expected: ['A', 'B', 'C', 'A', 'B', 'C/connectionRefused', 'A', 'B', 'A', 'B', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 1, + expectedSocketErrors: 0, + expectedRatios: [0.34, 0.34, 0.32], + + // Skip because the behavior of Node.js has changed + skip: nodeMajor >= 19 + }, + + // 8 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', socketHangupOnRequests: [1] }, { server: 'B' }, { server: 'C' }], + expected: ['A', 'B', 'C', 'A/socketError', 'B', 'C', 'B', 'C', 'B', 'C', 'A'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 1, + expectedRatios: [0.32, 0.34, 0.34] + }, + + // 9 + + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 7, + config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }, { server: 'D' }, { server: 'E' }], + expected: ['A', 'B', 'C', 'D', 'E', 'A', 'B', 'C', 'D', 'E'], + expectedConnectionRefusedErrors: 0, + expectedSocketErrors: 0, + expectedRatios: [0.2, 0.2, 0.2, 0.2, 0.2] + }, + + // 10 + { + iterations: 100, + maxWeightPerServer: 100, + errorPenalty: 15, + config: [{ server: 'A', downOnRequests: [0, 1, 2, 3] }, { server: 'B' }, { server: 'C' }], + expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'], + expectedConnectionRefusedErrors: 4, + expectedSocketErrors: 0, + expectedRatios: [0.18, 0.41, 0.41] + } + +] + +for (const [index, { config, expected, expectedRatios, iterations = 9, expectedConnectionRefusedErrors = 0, expectedSocketErrors = 0, maxWeightPerServer, errorPenalty = 10, only = false, skip = false }] of cases.entries()) { + test(`weighted round robin - case ${index}`, { only, skip }, async (t) => { + // cerate an array to store succesfull reqeusts + const requestLog = [] + + // create instances of the test servers according to the config + const servers = config.map((serverConfig) => new TestServer({ + config: serverConfig, + onRequest: (server) => { + requestLog.push(server.name) + } + })) + t.teardown(() => servers.map(server => server.stop())) + + // start all servers to get a port so that we can build the upstream urls to supply them to undici + await Promise.all(servers.map(server => server.start())) + + // build upstream urls + const urls = servers.map(server => `http://localhost:${server.port}`) + + // add upstreams + const client = new BalancedPool(urls[0], { maxWeightPerServer, errorPenalty }) + urls.slice(1).map(url => client.addUpstream(url)) + + let connectionRefusedErrors = 0 + let socketErrors = 0 + for (let i = 0; i < iterations; i++) { + // setup test servers for the next iteration + + await Promise.all(servers.map(server => server.prepareForIteration(i))) + + // send a request using undinci + try { + await client.request({ path: '/', method: 'GET' }) + } catch (e) { + const serverWithError = + servers.find(server => server.port === e.port) || + servers.find(server => { + if (typeof AggregateError === 'function' && e instanceof AggregateError) { + return e.errors.some(e => server.port === (e.socket?.remotePort ?? e.port)) + } + + return server.port === e.socket.remotePort + }) + + serverWithError.requestsCount++ + + if (e.code === 'ECONNREFUSED') { + requestLog.push(`${serverWithError.name}/connectionRefused`) + connectionRefusedErrors++ + } + if (e.code === 'UND_ERR_SOCKET') { + requestLog.push(`${serverWithError.name}/socketError`) + + socketErrors++ + } + } + } + const totalRequests = servers.reduce((acc, server) => { + return acc + server.requestsCount + }, 0) + + t.equal(totalRequests, iterations) + + t.equal(connectionRefusedErrors, expectedConnectionRefusedErrors) + t.equal(socketErrors, expectedSocketErrors) + + if (expectedRatios) { + const ratios = servers.reduce((acc, el) => { + acc[el.name] = 0 + return acc + }, {}) + requestLog.map(el => ratios[el[0]]++) + + t.match(Object.keys(ratios).map(k => ratios[k] / iterations), expectedRatios) + } + + if (expected) { + t.match(requestLog.slice(0, expected.length), expected) + } + + await client.close() + }) +} |