summaryrefslogtreecommitdiffstats
path: root/testing/xpcshell/node-http2/lib/protocol/connection.js
diff options
context:
space:
mode:
Diffstat (limited to 'testing/xpcshell/node-http2/lib/protocol/connection.js')
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/connection.js630
1 files changed, 630 insertions, 0 deletions
diff --git a/testing/xpcshell/node-http2/lib/protocol/connection.js b/testing/xpcshell/node-http2/lib/protocol/connection.js
new file mode 100644
index 0000000000..8c203675fa
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/connection.js
@@ -0,0 +1,630 @@
+var assert = require('assert');
+
+// The Connection class
+// ====================
+
+// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
+// stream (TCP stream). It operates by sending and receiving frames and is implemented as a
+// [Flow](flow.html) subclass.
+
+var Flow = require('./flow').Flow;
+
+exports.Connection = Connection;
+
+// Public API
+// ----------
+
+// * **new Connection(log, firstStreamId, settings)**: create a new Connection
+//
+// * **Event: 'error' (type)**: signals a connection level error made by the other end
+//
+// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
+// code other than NO_ERROR
+//
+// * **Event: 'stream' (stream)**: signals that there's an incoming stream
+//
+// * **createStream(): stream**: initiate a new stream
+//
+// * **set(settings, callback)**: change the value of one or more settings according to the
+// key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
+//
+// * **ping([callback])**: send a ping and call callback when the answer arrives
+//
+// * **close([error])**: close the stream with an error code
+
+// Constructor
+// -----------
+
+// The main aspects of managing the connection are:
+function Connection(log, firstStreamId, settings) {
+ // * initializing the base class
+ Flow.call(this, 0);
+
+ // * logging: every method uses the common logger object
+ this._log = log.child({ component: 'connection' });
+
+ // * stream management
+ this._initializeStreamManagement(firstStreamId);
+
+ // * lifecycle management
+ this._initializeLifecycleManagement();
+
+ // * flow control
+ this._initializeFlowControl();
+
+ // * settings management
+ this._initializeSettingsManagement(settings);
+
+ // * multiplexing
+ this._initializeMultiplexing();
+}
+Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
+
+// Overview
+// --------
+
+// | ^ | ^
+// v | v |
+// +--------------+ +--------------+
+// +---| stream1 |---| stream2 |---- .... ---+
+// | | +----------+ | | +----------+ | |
+// | | | stream1. | | | | stream2. | | |
+// | +-| upstream |-+ +-| upstream |-+ |
+// | +----------+ +----------+ |
+// | | ^ | ^ |
+// | v | v | |
+// | +-----+-------------+-----+-------- .... |
+// | ^ | | | |
+// | | v | | |
+// | +--------------+ | | |
+// | | stream0 | | | |
+// | | connection | | | |
+// | | management | multiplexing |
+// | +--------------+ flow control |
+// | | ^ |
+// | _read() | | _write() |
+// | v | |
+// | +------------+ +-----------+ |
+// | |output queue| |input queue| |
+// +----------------+------------+-+-----------+-----------------+
+// | ^
+// read() | | write()
+// v |
+
+// Stream management
+// -----------------
+
+var Stream = require('./stream').Stream;
+
+// Initialization:
+Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
+ // * streams are stored in two data structures:
+ // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
+ // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
+ this._streamIds = [];
+ this._streamPriorities = [];
+
+ // * The next outbound stream ID and the last inbound stream id
+ this._nextStreamId = firstStreamId;
+ this._lastIncomingStream = 0;
+
+ // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
+ this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
+
+ // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
+ // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
+ this._streamSlotsFree = Infinity;
+ this._streamLimit = Infinity;
+ this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
+};
+
+// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
+// broadcasts the message by creating an event on it.
+Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
+ if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
+ (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') ||
+ (frame.type === 'ALTSVC') || (frame.type == 'ORIGIN')) {
+ this._log.debug({ frame: frame }, 'Receiving connection level frame');
+ this.emit(frame.type, frame);
+ } else {
+ this._log.error({ frame: frame }, 'Invalid connection level frame');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+};
+
+// Methods to manage the stream slot pool:
+Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
+ var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
+ this._streamSlotsFree += newStreamLimit - this._streamLimit;
+ this._streamLimit = newStreamLimit;
+ if (wakeup) {
+ this.emit('wakeup');
+ }
+};
+
+Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
+ if (change) {
+ this._log.trace({ free: this._streamSlotsFree, change: change },
+ 'Changing active stream count.');
+ var wakeup = (this._streamSlotsFree === 0) && (change < 0);
+ this._streamSlotsFree -= change;
+ if (wakeup) {
+ this.emit('wakeup');
+ }
+ }
+};
+
+// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
+// an outbound stream) consists of three steps:
+//
+// 1. var stream = new Stream(this._log, this);
+// 2. this._allocateId(stream, id);
+// 2. this._allocatePriority(stream);
+
+// Allocating an ID to a stream
+Connection.prototype._allocateId = function _allocateId(stream, id) {
+ // * initiated stream without definite ID
+ if (id === undefined) {
+ id = this._nextStreamId;
+ this._nextStreamId += 2;
+ }
+
+ // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
+ else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
+ this._lastIncomingStream = id;
+ }
+
+ // * incoming stream with invalid ID
+ else {
+ this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
+ 'Invalid incoming stream ID.');
+ this.emit('error', 'PROTOCOL_ERROR');
+ return undefined;
+ }
+
+ assert(!(id in this._streamIds));
+
+ // * adding to `this._streamIds`
+ this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
+ this._streamIds[id] = stream;
+ stream.id = id;
+ this.emit('new_stream', stream, id);
+
+ // * forwarding connection errors from streams
+ stream.on('connectionError', this.emit.bind(this, 'error'));
+
+ return id;
+};
+
+// Allocating a priority to a stream, and managing priority changes
+Connection.prototype._allocatePriority = function _allocatePriority(stream) {
+ this._log.trace({ s: stream }, 'Allocating priority for stream.');
+ this._insert(stream, stream._priority);
+ stream.on('priority', this._reprioritize.bind(this, stream));
+ stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
+ this.emit('wakeup');
+};
+
+Connection.prototype._insert = function _insert(stream, priority) {
+ if (priority in this._streamPriorities) {
+ this._streamPriorities[priority].push(stream);
+ } else {
+ this._streamPriorities[priority] = [stream];
+ }
+};
+
+Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
+ var bucket = this._streamPriorities[stream._priority];
+ var index = bucket.indexOf(stream);
+ assert(index !== -1);
+ bucket.splice(index, 1);
+ if (bucket.length === 0) {
+ delete this._streamPriorities[stream._priority];
+ }
+
+ this._insert(stream, priority);
+};
+
+// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
+// a previously nonexistent stream.
+Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
+ this._log.debug({ stream_id: id }, 'New incoming stream.');
+
+ var stream = new Stream(this._log, this);
+ this._allocateId(stream, id);
+ this._allocatePriority(stream);
+ this.emit('stream', stream, id);
+
+ return stream;
+};
+
+// Creating an *outbound* stream
+Connection.prototype.createStream = function createStream() {
+ this._log.trace('Creating new outbound stream.');
+
+ // * Receiving is enabled immediately, and an ID gets assigned to the stream
+ var stream = new Stream(this._log, this);
+ this._allocatePriority(stream);
+
+ return stream;
+};
+
+// Multiplexing
+// ------------
+
+Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
+ this.on('window_update', this.emit.bind(this, 'wakeup'));
+ this._sendScheduled = false;
+ this._firstFrameReceived = false;
+};
+
+// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
+// by child classes. It reads frames from streams and pushes them to the output buffer.
+Connection.prototype._send = function _send(immediate) {
+ // * Do not do anything if the connection is already closed
+ if (this._closed) {
+ return;
+ }
+
+ // * Collapsing multiple calls in a turn into a single deferred call
+ if (immediate) {
+ this._sendScheduled = false;
+ } else {
+ if (!this._sendScheduled) {
+ this._sendScheduled = true;
+ setImmediate(this._send.bind(this, true));
+ }
+ return;
+ }
+
+ this._log.trace('Starting forwarding frames from streams.');
+
+ // * Looping through priority `bucket`s in priority order.
+priority_loop:
+ for (var priority in this._streamPriorities) {
+ var bucket = this._streamPriorities[priority];
+ var nextBucket = [];
+
+ // * Forwarding frames from buckets with round-robin scheduling.
+ // 1. pulling out frame
+ // 2. if there's no frame, skip this stream
+ // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
+ // this stream
+ // 4. adding stream to the bucket of the next round
+ // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
+ // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
+ // 7. forwarding the frame, changing `streamCount` as appropriate
+ // 8. stepping to the next stream if there's still more frame needed in the output buffer
+ // 9. switching to the bucket of the next round
+ while (bucket.length > 0) {
+ for (var index = 0; index < bucket.length; index++) {
+ var stream = bucket[index];
+ var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
+
+ if (!frame) {
+ continue;
+ } else if (frame.count_change > this._streamSlotsFree) {
+ stream.upstream.unshift(frame);
+ continue;
+ }
+
+ nextBucket.push(stream);
+
+ if (frame.stream === undefined) {
+ frame.stream = stream.id || this._allocateId(stream);
+ }
+
+ if (frame.type === 'PUSH_PROMISE') {
+ this._allocatePriority(frame.promised_stream);
+ frame.promised_stream = this._allocateId(frame.promised_stream);
+ }
+
+ this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
+ var moreNeeded = this.push(frame);
+ this._changeStreamCount(frame.count_change);
+
+ assert(moreNeeded !== null); // The frame shouldn't be unforwarded
+ if (moreNeeded === false) {
+ break priority_loop;
+ }
+ }
+
+ bucket = nextBucket;
+ nextBucket = [];
+ }
+ }
+
+ // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
+ if (moreNeeded === undefined) {
+ this.once('wakeup', this._send.bind(this));
+ }
+
+ this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
+};
+
+// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
+// implemented by child classes. It forwards the given frame to the appropriate stream:
+Connection.prototype._receive = function _receive(frame, done) {
+ this._log.trace({ frame: frame }, 'Forwarding incoming frame');
+
+ // * first frame needs to be checked by the `_onFirstFrameReceived` method
+ if (!this._firstFrameReceived) {
+ this._firstFrameReceived = true;
+ this._onFirstFrameReceived(frame);
+ }
+
+ // Do some sanity checking here before we create a stream
+ if ((frame.type == 'SETTINGS' ||
+ frame.type == 'PING' ||
+ frame.type == 'GOAWAY') &&
+ frame.stream != 0) {
+ // Got connection-level frame on a stream - EEP!
+ this.close('PROTOCOL_ERROR');
+ return;
+ } else if ((frame.type == 'DATA' ||
+ frame.type == 'HEADERS' ||
+ frame.type == 'PRIORITY' ||
+ frame.type == 'RST_STREAM' ||
+ frame.type == 'PUSH_PROMISE' ||
+ frame.type == 'CONTINUATION') &&
+ frame.stream == 0) {
+ // Got stream-level frame on connection - EEP!
+ this.close('PROTOCOL_ERROR');
+ return;
+ }
+ // WINDOW_UPDATE can be on either stream or connection
+
+ // * gets the appropriate stream from the stream registry
+ var stream = this._streamIds[frame.stream];
+
+ // * or creates one if it's not in `this.streams`
+ if (!stream) {
+ stream = this._createIncomingStream(frame.stream);
+ }
+
+ // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
+ if (frame.type === 'PUSH_PROMISE') {
+ frame.promised_stream = this._createIncomingStream(frame.promised_stream);
+ }
+
+ frame.count_change = this._changeStreamCount.bind(this);
+
+ // * and writes it to the `stream`'s `upstream`
+ stream.upstream.write(frame);
+
+ done();
+};
+
+// Settings management
+// -------------------
+
+var defaultSettings = {
+};
+
+// Settings management initialization:
+Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
+ // * Setting up the callback queue for setting acknowledgements
+ this._settingsAckCallbacks = [];
+
+ // * Sending the initial settings.
+ this._log.debug({ settings: settings },
+ 'Sending the first SETTINGS frame as part of the connection header.');
+ this.set(settings || defaultSettings);
+
+ // * Forwarding SETTINGS frames to the `_receiveSettings` method
+ this.on('SETTINGS', this._receiveSettings);
+ this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize);
+};
+
+// * Checking that the first frame the other endpoint sends is SETTINGS
+Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
+ if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
+ this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
+ } else {
+ this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+};
+
+// Handling of incoming SETTINGS frames.
+Connection.prototype._receiveSettings = function _receiveSettings(frame) {
+ // * If it's an ACK, call the appropriate callback
+ if (frame.flags.ACK) {
+ var callback = this._settingsAckCallbacks.shift();
+ if (callback) {
+ callback();
+ }
+ }
+
+ // * If it's a setting change request, then send an ACK and change the appropriate settings
+ else {
+ if (!this._closed) {
+ this.push({
+ type: 'SETTINGS',
+ flags: { ACK: true },
+ stream: 0,
+ settings: {}
+ });
+ }
+ for (var name in frame.settings) {
+ this.emit('RECEIVING_' + name, frame.settings[name]);
+ }
+ }
+};
+
+Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) {
+ if ((value < 0x4000) || (value >= 0x01000000)) {
+ this._log.fatal('Received invalid value for max frame size: ' + value);
+ this.emit('error');
+ }
+};
+
+// Changing one or more settings value and sending out a SETTINGS frame
+Connection.prototype.set = function set(settings, callback) {
+ // * Calling the callback and emitting event when the change is acknowledges
+ var self = this;
+ this._settingsAckCallbacks.push(function() {
+ for (var name in settings) {
+ self.emit('ACKNOWLEDGED_' + name, settings[name]);
+ }
+ if (callback) {
+ callback();
+ }
+ });
+
+ // * Sending out the SETTINGS frame
+ this.push({
+ type: 'SETTINGS',
+ flags: { ACK: false },
+ stream: 0,
+ settings: settings
+ });
+ for (var name in settings) {
+ this.emit('SENDING_' + name, settings[name]);
+ }
+};
+
+// Lifecycle management
+// --------------------
+
+// The main responsibilities of lifecycle management code:
+//
+// * keeping the connection alive by
+// * sending PINGs when the connection is idle
+// * answering PINGs
+// * ending the connection
+
+Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
+ this._pings = {};
+ this.on('PING', this._receivePing);
+ this.on('GOAWAY', this._receiveGoaway);
+ this._closed = false;
+};
+
+// Generating a string of length 16 with random hexadecimal digits
+Connection.prototype._generatePingId = function _generatePingId() {
+ do {
+ var id = '';
+ for (var i = 0; i < 16; i++) {
+ id += Math.floor(Math.random()*16).toString(16);
+ }
+ } while(id in this._pings);
+ return id;
+};
+
+// Sending a ping and calling `callback` when the answer arrives
+Connection.prototype.ping = function ping(callback) {
+ var id = this._generatePingId();
+ var data = Buffer.from(id, 'hex');
+ this._pings[id] = callback;
+
+ this._log.debug({ data: data }, 'Sending PING.');
+ this.push({
+ type: 'PING',
+ flags: {
+ ACK: false
+ },
+ stream: 0,
+ data: data
+ });
+};
+
+// Answering pings
+Connection.prototype._receivePing = function _receivePing(frame) {
+ if (frame.flags.ACK) {
+ var id = frame.data.toString('hex');
+ if (id in this._pings) {
+ this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
+ var callback = this._pings[id];
+ if (callback) {
+ callback();
+ }
+ delete this._pings[id];
+ } else {
+ this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
+ }
+
+ } else {
+ this._log.debug({ data: frame.data }, 'Answering PING.');
+ this.push({
+ type: 'PING',
+ flags: {
+ ACK: true
+ },
+ stream: 0,
+ data: frame.data
+ });
+ }
+};
+
+Connection.prototype.originFrame = function originFrame(originList) {
+ this._log.debug(originList, 'emitting origin frame');
+
+ this.push({
+ type: 'ORIGIN',
+ flags: {},
+ stream: 0,
+ originList : originList,
+ });
+};
+
+// Terminating the connection
+Connection.prototype.close = function close(error) {
+ if (this._closed) {
+ this._log.warn('Trying to close an already closed connection');
+ return;
+ }
+
+ this._log.debug({ error: error }, 'Closing the connection');
+ this.push({
+ type: 'GOAWAY',
+ flags: {},
+ stream: 0,
+ last_stream: this._lastIncomingStream,
+ error: error || 'NO_ERROR'
+ });
+ this.push(null);
+ this._closed = true;
+};
+
+Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
+ this._log.debug({ error: frame.error }, 'Other end closed the connection');
+ this.push(null);
+ this._closed = true;
+ if (frame.error !== 'NO_ERROR') {
+ this.emit('peerError', frame.error);
+ }
+};
+
+// Flow control
+// ------------
+
+Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
+ // Handling of initial window size of individual streams.
+ this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
+ this.on('new_stream', function(stream) {
+ stream.upstream.setInitialWindow(this._initialStreamWindowSize);
+ });
+ this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
+ this._streamIds[0].upstream.setInitialWindow = function noop() {};
+};
+
+// The initial connection flow control window is 65535 bytes.
+var INITIAL_STREAM_WINDOW_SIZE = 65535;
+
+// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
+// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
+// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
+// the difference between the new value and the old value.
+Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
+ if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
+ this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
+ this.emit('error', 'FLOW_CONTROL_ERROR');
+ } else {
+ this._log.debug({ size: size }, 'Changing stream initial window size.');
+ this._initialStreamWindowSize = size;
+ this._streamIds.forEach(function(stream) {
+ stream.upstream.setInitialWindow(size);
+ });
+ }
+};