summaryrefslogtreecommitdiffstats
path: root/testing/xpcshell/node-ws/lib/permessage-deflate.js
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
commit6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch)
treea68f146d7fa01f0134297619fbe7e33db084e0aa /testing/xpcshell/node-ws/lib/permessage-deflate.js
parentInitial commit. (diff)
downloadthunderbird-6bf0a5cb5034a7e684dcc3500e841785237ce2dd.tar.xz
thunderbird-6bf0a5cb5034a7e684dcc3500e841785237ce2dd.zip
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/xpcshell/node-ws/lib/permessage-deflate.js')
-rw-r--r--testing/xpcshell/node-ws/lib/permessage-deflate.js511
1 files changed, 511 insertions, 0 deletions
diff --git a/testing/xpcshell/node-ws/lib/permessage-deflate.js b/testing/xpcshell/node-ws/lib/permessage-deflate.js
new file mode 100644
index 0000000000..94603c98da
--- /dev/null
+++ b/testing/xpcshell/node-ws/lib/permessage-deflate.js
@@ -0,0 +1,511 @@
+'use strict';
+
+const zlib = require('zlib');
+
+const bufferUtil = require('./buffer-util');
+const Limiter = require('./limiter');
+const { kStatusCode } = require('./constants');
+
+const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
+const kPerMessageDeflate = Symbol('permessage-deflate');
+const kTotalLength = Symbol('total-length');
+const kCallback = Symbol('callback');
+const kBuffers = Symbol('buffers');
+const kError = Symbol('error');
+
+//
+// We limit zlib concurrency, which prevents severe memory fragmentation
+// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
+// and https://github.com/websockets/ws/issues/1202
+//
+// Intentionally global; it's the global thread pool that's an issue.
+//
+let zlibLimiter;
+
+/**
+ * permessage-deflate implementation.
+ */
+class PerMessageDeflate {
+ /**
+ * Creates a PerMessageDeflate instance.
+ *
+ * @param {Object} [options] Configuration options
+ * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
+ * for, or request, a custom client window size
+ * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
+ * acknowledge disabling of client context takeover
+ * @param {Number} [options.concurrencyLimit=10] The number of concurrent
+ * calls to zlib
+ * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
+ * use of a custom server window size
+ * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
+ * disabling of server context takeover
+ * @param {Number} [options.threshold=1024] Size (in bytes) below which
+ * messages should not be compressed if context takeover is disabled
+ * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
+ * deflate
+ * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
+ * inflate
+ * @param {Boolean} [isServer=false] Create the instance in either server or
+ * client mode
+ * @param {Number} [maxPayload=0] The maximum allowed message length
+ */
+ constructor(options, isServer, maxPayload) {
+ this._maxPayload = maxPayload | 0;
+ this._options = options || {};
+ this._threshold =
+ this._options.threshold !== undefined ? this._options.threshold : 1024;
+ this._isServer = !!isServer;
+ this._deflate = null;
+ this._inflate = null;
+
+ this.params = null;
+
+ if (!zlibLimiter) {
+ const concurrency =
+ this._options.concurrencyLimit !== undefined
+ ? this._options.concurrencyLimit
+ : 10;
+ zlibLimiter = new Limiter(concurrency);
+ }
+ }
+
+ /**
+ * @type {String}
+ */
+ static get extensionName() {
+ return 'permessage-deflate';
+ }
+
+ /**
+ * Create an extension negotiation offer.
+ *
+ * @return {Object} Extension parameters
+ * @public
+ */
+ offer() {
+ const params = {};
+
+ if (this._options.serverNoContextTakeover) {
+ params.server_no_context_takeover = true;
+ }
+ if (this._options.clientNoContextTakeover) {
+ params.client_no_context_takeover = true;
+ }
+ if (this._options.serverMaxWindowBits) {
+ params.server_max_window_bits = this._options.serverMaxWindowBits;
+ }
+ if (this._options.clientMaxWindowBits) {
+ params.client_max_window_bits = this._options.clientMaxWindowBits;
+ } else if (this._options.clientMaxWindowBits == null) {
+ params.client_max_window_bits = true;
+ }
+
+ return params;
+ }
+
+ /**
+ * Accept an extension negotiation offer/response.
+ *
+ * @param {Array} configurations The extension negotiation offers/reponse
+ * @return {Object} Accepted configuration
+ * @public
+ */
+ accept(configurations) {
+ configurations = this.normalizeParams(configurations);
+
+ this.params = this._isServer
+ ? this.acceptAsServer(configurations)
+ : this.acceptAsClient(configurations);
+
+ return this.params;
+ }
+
+ /**
+ * Releases all resources used by the extension.
+ *
+ * @public
+ */
+ cleanup() {
+ if (this._inflate) {
+ this._inflate.close();
+ this._inflate = null;
+ }
+
+ if (this._deflate) {
+ const callback = this._deflate[kCallback];
+
+ this._deflate.close();
+ this._deflate = null;
+
+ if (callback) {
+ callback(
+ new Error(
+ 'The deflate stream was closed while data was being processed'
+ )
+ );
+ }
+ }
+ }
+
+ /**
+ * Accept an extension negotiation offer.
+ *
+ * @param {Array} offers The extension negotiation offers
+ * @return {Object} Accepted configuration
+ * @private
+ */
+ acceptAsServer(offers) {
+ const opts = this._options;
+ const accepted = offers.find((params) => {
+ if (
+ (opts.serverNoContextTakeover === false &&
+ params.server_no_context_takeover) ||
+ (params.server_max_window_bits &&
+ (opts.serverMaxWindowBits === false ||
+ (typeof opts.serverMaxWindowBits === 'number' &&
+ opts.serverMaxWindowBits > params.server_max_window_bits))) ||
+ (typeof opts.clientMaxWindowBits === 'number' &&
+ !params.client_max_window_bits)
+ ) {
+ return false;
+ }
+
+ return true;
+ });
+
+ if (!accepted) {
+ throw new Error('None of the extension offers can be accepted');
+ }
+
+ if (opts.serverNoContextTakeover) {
+ accepted.server_no_context_takeover = true;
+ }
+ if (opts.clientNoContextTakeover) {
+ accepted.client_no_context_takeover = true;
+ }
+ if (typeof opts.serverMaxWindowBits === 'number') {
+ accepted.server_max_window_bits = opts.serverMaxWindowBits;
+ }
+ if (typeof opts.clientMaxWindowBits === 'number') {
+ accepted.client_max_window_bits = opts.clientMaxWindowBits;
+ } else if (
+ accepted.client_max_window_bits === true ||
+ opts.clientMaxWindowBits === false
+ ) {
+ delete accepted.client_max_window_bits;
+ }
+
+ return accepted;
+ }
+
+ /**
+ * Accept the extension negotiation response.
+ *
+ * @param {Array} response The extension negotiation response
+ * @return {Object} Accepted configuration
+ * @private
+ */
+ acceptAsClient(response) {
+ const params = response[0];
+
+ if (
+ this._options.clientNoContextTakeover === false &&
+ params.client_no_context_takeover
+ ) {
+ throw new Error('Unexpected parameter "client_no_context_takeover"');
+ }
+
+ if (!params.client_max_window_bits) {
+ if (typeof this._options.clientMaxWindowBits === 'number') {
+ params.client_max_window_bits = this._options.clientMaxWindowBits;
+ }
+ } else if (
+ this._options.clientMaxWindowBits === false ||
+ (typeof this._options.clientMaxWindowBits === 'number' &&
+ params.client_max_window_bits > this._options.clientMaxWindowBits)
+ ) {
+ throw new Error(
+ 'Unexpected or invalid parameter "client_max_window_bits"'
+ );
+ }
+
+ return params;
+ }
+
+ /**
+ * Normalize parameters.
+ *
+ * @param {Array} configurations The extension negotiation offers/reponse
+ * @return {Array} The offers/response with normalized parameters
+ * @private
+ */
+ normalizeParams(configurations) {
+ configurations.forEach((params) => {
+ Object.keys(params).forEach((key) => {
+ let value = params[key];
+
+ if (value.length > 1) {
+ throw new Error(`Parameter "${key}" must have only a single value`);
+ }
+
+ value = value[0];
+
+ if (key === 'client_max_window_bits') {
+ if (value !== true) {
+ const num = +value;
+ if (!Number.isInteger(num) || num < 8 || num > 15) {
+ throw new TypeError(
+ `Invalid value for parameter "${key}": ${value}`
+ );
+ }
+ value = num;
+ } else if (!this._isServer) {
+ throw new TypeError(
+ `Invalid value for parameter "${key}": ${value}`
+ );
+ }
+ } else if (key === 'server_max_window_bits') {
+ const num = +value;
+ if (!Number.isInteger(num) || num < 8 || num > 15) {
+ throw new TypeError(
+ `Invalid value for parameter "${key}": ${value}`
+ );
+ }
+ value = num;
+ } else if (
+ key === 'client_no_context_takeover' ||
+ key === 'server_no_context_takeover'
+ ) {
+ if (value !== true) {
+ throw new TypeError(
+ `Invalid value for parameter "${key}": ${value}`
+ );
+ }
+ } else {
+ throw new Error(`Unknown parameter "${key}"`);
+ }
+
+ params[key] = value;
+ });
+ });
+
+ return configurations;
+ }
+
+ /**
+ * Decompress data. Concurrency limited.
+ *
+ * @param {Buffer} data Compressed data
+ * @param {Boolean} fin Specifies whether or not this is the last fragment
+ * @param {Function} callback Callback
+ * @public
+ */
+ decompress(data, fin, callback) {
+ zlibLimiter.add((done) => {
+ this._decompress(data, fin, (err, result) => {
+ done();
+ callback(err, result);
+ });
+ });
+ }
+
+ /**
+ * Compress data. Concurrency limited.
+ *
+ * @param {(Buffer|String)} data Data to compress
+ * @param {Boolean} fin Specifies whether or not this is the last fragment
+ * @param {Function} callback Callback
+ * @public
+ */
+ compress(data, fin, callback) {
+ zlibLimiter.add((done) => {
+ this._compress(data, fin, (err, result) => {
+ done();
+ callback(err, result);
+ });
+ });
+ }
+
+ /**
+ * Decompress data.
+ *
+ * @param {Buffer} data Compressed data
+ * @param {Boolean} fin Specifies whether or not this is the last fragment
+ * @param {Function} callback Callback
+ * @private
+ */
+ _decompress(data, fin, callback) {
+ const endpoint = this._isServer ? 'client' : 'server';
+
+ if (!this._inflate) {
+ const key = `${endpoint}_max_window_bits`;
+ const windowBits =
+ typeof this.params[key] !== 'number'
+ ? zlib.Z_DEFAULT_WINDOWBITS
+ : this.params[key];
+
+ this._inflate = zlib.createInflateRaw({
+ ...this._options.zlibInflateOptions,
+ windowBits
+ });
+ this._inflate[kPerMessageDeflate] = this;
+ this._inflate[kTotalLength] = 0;
+ this._inflate[kBuffers] = [];
+ this._inflate.on('error', inflateOnError);
+ this._inflate.on('data', inflateOnData);
+ }
+
+ this._inflate[kCallback] = callback;
+
+ this._inflate.write(data);
+ if (fin) this._inflate.write(TRAILER);
+
+ this._inflate.flush(() => {
+ const err = this._inflate[kError];
+
+ if (err) {
+ this._inflate.close();
+ this._inflate = null;
+ callback(err);
+ return;
+ }
+
+ const data = bufferUtil.concat(
+ this._inflate[kBuffers],
+ this._inflate[kTotalLength]
+ );
+
+ if (this._inflate._readableState.endEmitted) {
+ this._inflate.close();
+ this._inflate = null;
+ } else {
+ this._inflate[kTotalLength] = 0;
+ this._inflate[kBuffers] = [];
+
+ if (fin && this.params[`${endpoint}_no_context_takeover`]) {
+ this._inflate.reset();
+ }
+ }
+
+ callback(null, data);
+ });
+ }
+
+ /**
+ * Compress data.
+ *
+ * @param {(Buffer|String)} data Data to compress
+ * @param {Boolean} fin Specifies whether or not this is the last fragment
+ * @param {Function} callback Callback
+ * @private
+ */
+ _compress(data, fin, callback) {
+ const endpoint = this._isServer ? 'server' : 'client';
+
+ if (!this._deflate) {
+ const key = `${endpoint}_max_window_bits`;
+ const windowBits =
+ typeof this.params[key] !== 'number'
+ ? zlib.Z_DEFAULT_WINDOWBITS
+ : this.params[key];
+
+ this._deflate = zlib.createDeflateRaw({
+ ...this._options.zlibDeflateOptions,
+ windowBits
+ });
+
+ this._deflate[kTotalLength] = 0;
+ this._deflate[kBuffers] = [];
+
+ this._deflate.on('data', deflateOnData);
+ }
+
+ this._deflate[kCallback] = callback;
+
+ this._deflate.write(data);
+ this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
+ if (!this._deflate) {
+ //
+ // The deflate stream was closed while data was being processed.
+ //
+ return;
+ }
+
+ let data = bufferUtil.concat(
+ this._deflate[kBuffers],
+ this._deflate[kTotalLength]
+ );
+
+ if (fin) data = data.slice(0, data.length - 4);
+
+ //
+ // Ensure that the callback will not be called again in
+ // `PerMessageDeflate#cleanup()`.
+ //
+ this._deflate[kCallback] = null;
+
+ this._deflate[kTotalLength] = 0;
+ this._deflate[kBuffers] = [];
+
+ if (fin && this.params[`${endpoint}_no_context_takeover`]) {
+ this._deflate.reset();
+ }
+
+ callback(null, data);
+ });
+ }
+}
+
+module.exports = PerMessageDeflate;
+
+/**
+ * The listener of the `zlib.DeflateRaw` stream `'data'` event.
+ *
+ * @param {Buffer} chunk A chunk of data
+ * @private
+ */
+function deflateOnData(chunk) {
+ this[kBuffers].push(chunk);
+ this[kTotalLength] += chunk.length;
+}
+
+/**
+ * The listener of the `zlib.InflateRaw` stream `'data'` event.
+ *
+ * @param {Buffer} chunk A chunk of data
+ * @private
+ */
+function inflateOnData(chunk) {
+ this[kTotalLength] += chunk.length;
+
+ if (
+ this[kPerMessageDeflate]._maxPayload < 1 ||
+ this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
+ ) {
+ this[kBuffers].push(chunk);
+ return;
+ }
+
+ this[kError] = new RangeError('Max payload size exceeded');
+ this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH';
+ this[kError][kStatusCode] = 1009;
+ this.removeListener('data', inflateOnData);
+ this.reset();
+}
+
+/**
+ * The listener of the `zlib.InflateRaw` stream `'error'` event.
+ *
+ * @param {Error} err The emitted error
+ * @private
+ */
+function inflateOnError(err) {
+ //
+ // There is no need to call `Zlib#close()` as the handle is automatically
+ // closed when an error is emitted.
+ //
+ this[kPerMessageDeflate]._inflate = null;
+ err[kStatusCode] = 1007;
+ this[kCallback](err);
+}