summaryrefslogtreecommitdiffstats
path: root/testing/xpcshell/node-http2/lib/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'testing/xpcshell/node-http2/lib/protocol')
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/compressor.js1428
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/connection.js630
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/endpoint.js262
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/flow.js345
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/framer.js1166
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/index.js91
-rw-r--r--testing/xpcshell/node-http2/lib/protocol/stream.js677
7 files changed, 4599 insertions, 0 deletions
diff --git a/testing/xpcshell/node-http2/lib/protocol/compressor.js b/testing/xpcshell/node-http2/lib/protocol/compressor.js
new file mode 100644
index 0000000000..6f91f86ec9
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/compressor.js
@@ -0,0 +1,1428 @@
+// The implementation of the [HTTP/2 Header Compression][http2-compression] spec is separated from
+// the 'integration' part which handles HEADERS and PUSH_PROMISE frames. The compression itself is
+// implemented in the first part of the file, and consists of three classes: `HeaderTable`,
+// `HeaderSetDecompressor` and `HeaderSetCompressor`. The two latter classes are
+// [Transform Stream][node-transform] subclasses that operate in [object mode][node-objectmode].
+// These transform chunks of binary data into `[name, value]` pairs and vice versa, and store their
+// state in `HeaderTable` instances.
+//
+// The 'integration' part is also implemented by two [Transform Stream][node-transform] subclasses
+// that operate in [object mode][node-objectmode]: the `Compressor` and the `Decompressor`. These
+// provide a layer between the [framer](framer.html) and the
+// [connection handling component](connection.html).
+//
+// [node-transform]: https://nodejs.org/api/stream.html#stream_class_stream_transform
+// [node-objectmode]: https://nodejs.org/api/stream.html#stream_new_stream_readable_options
+// [http2-compression]: https://tools.ietf.org/html/rfc7541
+
+exports.HeaderTable = HeaderTable;
+exports.HuffmanTable = HuffmanTable;
+exports.HeaderSetCompressor = HeaderSetCompressor;
+exports.HeaderSetDecompressor = HeaderSetDecompressor;
+exports.Compressor = Compressor;
+exports.Decompressor = Decompressor;
+
+var TransformStream = require('stream').Transform;
+var assert = require('assert');
+var util = require('util');
+
+// Header compression
+// ==================
+
+// The HeaderTable class
+// ---------------------
+
+// The [Header Table] is a component used to associate headers to index values. It is basically an
+// ordered list of `[name, value]` pairs, so it's implemented as a subclass of `Array`.
+// In this implementation, the Header Table and the [Static Table] are handled as a single table.
+// [Header Table]: https://tools.ietf.org/html/rfc7541#section-2.3.2
+// [Static Table]: https://tools.ietf.org/html/rfc7541#section-2.3.1
+function HeaderTable(log, limit) {
+ var self = HeaderTable.staticTable.map(entryFromPair);
+ self._log = log;
+ self._limit = limit || DEFAULT_HEADER_TABLE_LIMIT;
+ self._staticLength = self.length;
+ self._size = 0;
+ self._enforceLimit = HeaderTable.prototype._enforceLimit;
+ self.add = HeaderTable.prototype.add;
+ self.setSizeLimit = HeaderTable.prototype.setSizeLimit;
+ return self;
+}
+
+function entryFromPair(pair) {
+ var entry = pair.slice();
+ entry._size = size(entry);
+ return entry;
+}
+
+// The encoder decides how to update the header table and as such can control how much memory is
+// used by the header table. To limit the memory requirements on the decoder side, the header table
+// size is bounded.
+//
+// * The default header table size limit is 4096 bytes.
+// * The size of an entry is defined as follows: the size of an entry is the sum of its name's
+// length in bytes, of its value's length in bytes and of 32 bytes.
+// * The size of a header table is the sum of the size of its entries.
+var DEFAULT_HEADER_TABLE_LIMIT = 4096;
+
+function size(entry) {
+ return (Buffer.from(entry[0] + entry[1], 'utf8')).length + 32;
+}
+
+// The `add(index, entry)` can be used to [manage the header table][tablemgmt]:
+// [tablemgmt]: https://tools.ietf.org/html/rfc7541#section-4
+//
+// * it pushes the new `entry` at the beggining of the table
+// * before doing such a modification, it has to be ensured that the header table size will stay
+// lower than or equal to the header table size limit. To achieve this, entries are evicted from
+// the end of the header table until the size of the header table is less than or equal to
+// `(this._limit - entry.size)`, or until the table is empty.
+//
+// <---------- Index Address Space ---------->
+// <-- Static Table --> <-- Header Table -->
+// +---+-----------+---+ +---+-----------+---+
+// | 0 | ... | k | |k+1| ... | n |
+// +---+-----------+---+ +---+-----------+---+
+// ^ |
+// | V
+// Insertion Point Drop Point
+
+HeaderTable.prototype._enforceLimit = function _enforceLimit(limit) {
+ var droppedEntries = [];
+ while ((this._size > 0) && (this._size > limit)) {
+ var dropped = this.pop();
+ this._size -= dropped._size;
+ droppedEntries.unshift(dropped);
+ }
+ return droppedEntries;
+};
+
+HeaderTable.prototype.add = function(entry) {
+ var limit = this._limit - entry._size;
+ var droppedEntries = this._enforceLimit(limit);
+
+ if (this._size <= limit) {
+ this.splice(this._staticLength, 0, entry);
+ this._size += entry._size;
+ }
+
+ return droppedEntries;
+};
+
+// The table size limit can be changed externally. In this case, the same eviction algorithm is used
+HeaderTable.prototype.setSizeLimit = function setSizeLimit(limit) {
+ this._limit = limit;
+ this._enforceLimit(this._limit);
+};
+
+// [The Static Table](https://tools.ietf.org/html/rfc7541#section-2.3.1)
+// ------------------
+
+// The table is generated with feeding the table from the spec to the following sed command:
+//
+// sed -re "s/\s*\| [0-9]+\s*\| ([^ ]*)/ [ '\1'/g" -e "s/\|\s([^ ]*)/, '\1'/g" -e 's/ \|/],/g'
+
+HeaderTable.staticTable = [
+ [ ':authority' , '' ],
+ [ ':method' , 'GET' ],
+ [ ':method' , 'POST' ],
+ [ ':path' , '/' ],
+ [ ':path' , '/index.html' ],
+ [ ':scheme' , 'http' ],
+ [ ':scheme' , 'https' ],
+ [ ':status' , '200' ],
+ [ ':status' , '204' ],
+ [ ':status' , '206' ],
+ [ ':status' , '304' ],
+ [ ':status' , '400' ],
+ [ ':status' , '404' ],
+ [ ':status' , '500' ],
+ [ 'accept-charset' , '' ],
+ [ 'accept-encoding' , 'gzip, deflate'],
+ [ 'accept-language' , '' ],
+ [ 'accept-ranges' , '' ],
+ [ 'accept' , '' ],
+ [ 'access-control-allow-origin' , '' ],
+ [ 'age' , '' ],
+ [ 'allow' , '' ],
+ [ 'authorization' , '' ],
+ [ 'cache-control' , '' ],
+ [ 'content-disposition' , '' ],
+ [ 'content-encoding' , '' ],
+ [ 'content-language' , '' ],
+ [ 'content-length' , '' ],
+ [ 'content-location' , '' ],
+ [ 'content-range' , '' ],
+ [ 'content-type' , '' ],
+ [ 'cookie' , '' ],
+ [ 'date' , '' ],
+ [ 'etag' , '' ],
+ [ 'expect' , '' ],
+ [ 'expires' , '' ],
+ [ 'from' , '' ],
+ [ 'host' , '' ],
+ [ 'if-match' , '' ],
+ [ 'if-modified-since' , '' ],
+ [ 'if-none-match' , '' ],
+ [ 'if-range' , '' ],
+ [ 'if-unmodified-since' , '' ],
+ [ 'last-modified' , '' ],
+ [ 'link' , '' ],
+ [ 'location' , '' ],
+ [ 'max-forwards' , '' ],
+ [ 'proxy-authenticate' , '' ],
+ [ 'proxy-authorization' , '' ],
+ [ 'range' , '' ],
+ [ 'referer' , '' ],
+ [ 'refresh' , '' ],
+ [ 'retry-after' , '' ],
+ [ 'server' , '' ],
+ [ 'set-cookie' , '' ],
+ [ 'strict-transport-security' , '' ],
+ [ 'transfer-encoding' , '' ],
+ [ 'user-agent' , '' ],
+ [ 'vary' , '' ],
+ [ 'via' , '' ],
+ [ 'www-authenticate' , '' ]
+];
+
+// The HeaderSetDecompressor class
+// -------------------------------
+
+// A `HeaderSetDecompressor` instance is a transform stream that can be used to *decompress a
+// single header set*. Its input is a stream of binary data chunks and its output is a stream of
+// `[name, value]` pairs.
+//
+// Currently, it is not a proper streaming decompressor implementation, since it buffer its input
+// until the end os the stream, and then processes the whole header block at once.
+
+util.inherits(HeaderSetDecompressor, TransformStream);
+function HeaderSetDecompressor(log, table) {
+ TransformStream.call(this, { objectMode: true });
+
+ this._log = log.child({ component: 'compressor' });
+ this._table = table;
+ this._chunks = [];
+}
+
+// `_transform` is the implementation of the [corresponding virtual function][_transform] of the
+// TransformStream class. It collects the data chunks for later processing.
+// [_transform]: https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
+HeaderSetDecompressor.prototype._transform = function _transform(chunk, encoding, callback) {
+ this._chunks.push(chunk);
+ callback();
+};
+
+// `execute(rep)` executes the given [header representation][representation].
+// [representation]: https://tools.ietf.org/html/rfc7541#section-6
+
+// The *JavaScript object representation* of a header representation:
+//
+// {
+// name: String || Integer, // string literal or index
+// value: String || Integer, // string literal or index
+// index: Boolean // with or without indexing
+// }
+//
+// *Important:* to ease the indexing of the header table, indexes start at 0 instead of 1.
+//
+// Examples:
+//
+// Indexed:
+// { name: 2 , value: 2 , index: false }
+// Literal:
+// { name: 2 , value: 'X', index: false } // without indexing
+// { name: 2 , value: 'Y', index: true } // with indexing
+// { name: 'A', value: 'Z', index: true } // with indexing, literal name
+HeaderSetDecompressor.prototype._execute = function _execute(rep) {
+ this._log.trace({ key: rep.name, value: rep.value, index: rep.index },
+ 'Executing header representation');
+
+ var entry, pair;
+
+ if (rep.contextUpdate) {
+ this._table.setSizeLimit(rep.newMaxSize);
+ }
+
+ // * An _indexed representation_ entails the following actions:
+ // * The header field corresponding to the referenced entry is emitted
+ else if (typeof rep.value === 'number') {
+ var index = rep.value;
+ entry = this._table[index];
+
+ pair = entry.slice();
+ this.push(pair);
+ }
+
+ // * A _literal representation_ that is _not added_ to the header table entails the following
+ // action:
+ // * The header is emitted.
+ // * A _literal representation_ that is _added_ to the header table entails the following further
+ // actions:
+ // * The header is added to the header table.
+ // * The header is emitted.
+ else {
+ if (typeof rep.name === 'number') {
+ pair = [this._table[rep.name][0], rep.value];
+ } else {
+ pair = [rep.name, rep.value];
+ }
+
+ if (rep.index) {
+ entry = entryFromPair(pair);
+ this._table.add(entry);
+ }
+
+ this.push(pair);
+ }
+};
+
+// `_flush` is the implementation of the [corresponding virtual function][_flush] of the
+// TransformStream class. The whole decompressing process is done in `_flush`. It gets called when
+// the input stream is over.
+// [_flush]: https://nodejs.org/api/stream.html#stream_transform_flush_callback
+HeaderSetDecompressor.prototype._flush = function _flush(callback) {
+ var buffer = concat(this._chunks);
+
+ // * processes the header representations
+ buffer.cursor = 0;
+ while (buffer.cursor < buffer.length) {
+ this._execute(HeaderSetDecompressor.header(buffer));
+ }
+
+ callback();
+};
+
+// The HeaderSetCompressor class
+// -----------------------------
+
+// A `HeaderSetCompressor` instance is a transform stream that can be used to *compress a single
+// header set*. Its input is a stream of `[name, value]` pairs and its output is a stream of
+// binary data chunks.
+//
+// It is a real streaming compressor, since it does not wait until the header set is complete.
+//
+// The compression algorithm is (intentionally) not specified by the spec. Therefore, the current
+// compression algorithm can probably be improved in the future.
+
+util.inherits(HeaderSetCompressor, TransformStream);
+function HeaderSetCompressor(log, table) {
+ TransformStream.call(this, { objectMode: true });
+
+ this._log = log.child({ component: 'compressor' });
+ this._table = table;
+ this.push = TransformStream.prototype.push.bind(this);
+}
+
+HeaderSetCompressor.prototype.send = function send(rep) {
+ this._log.trace({ key: rep.name, value: rep.value, index: rep.index },
+ 'Emitting header representation');
+
+ if (!rep.chunks) {
+ rep.chunks = HeaderSetCompressor.header(rep);
+ }
+ rep.chunks.forEach(this.push);
+};
+
+// `_transform` is the implementation of the [corresponding virtual function][_transform] of the
+// TransformStream class. It processes the input headers one by one:
+// [_transform]: https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
+HeaderSetCompressor.prototype._transform = function _transform(pair, encoding, callback) {
+ var name = pair[0].toLowerCase();
+ var value = pair[1];
+ var entry, rep;
+
+ // * tries to find full (name, value) or name match in the header table
+ var nameMatch = -1, fullMatch = -1;
+ for (var droppedIndex = 0; droppedIndex < this._table.length; droppedIndex++) {
+ entry = this._table[droppedIndex];
+ if (entry[0] === name) {
+ if (entry[1] === value) {
+ fullMatch = droppedIndex;
+ break;
+ } else if (nameMatch === -1) {
+ nameMatch = droppedIndex;
+ }
+ }
+ }
+
+ var mustNeverIndex = ((name === 'cookie' && value.length < 20) ||
+ (name === 'set-cookie' && value.length < 20) ||
+ name === 'authorization');
+
+ if (fullMatch !== -1 && !mustNeverIndex) {
+ this.send({ name: fullMatch, value: fullMatch, index: false });
+ }
+
+ // * otherwise, it will be a literal representation (with a name index if there's a name match)
+ else {
+ entry = entryFromPair(pair);
+
+ var indexing = (entry._size < this._table._limit / 2) && !mustNeverIndex;
+
+ if (indexing) {
+ this._table.add(entry);
+ }
+
+ this.send({ name: (nameMatch !== -1) ? nameMatch : name, value: value, index: indexing, mustNeverIndex: mustNeverIndex, contextUpdate: false });
+ }
+
+ callback();
+};
+
+// `_flush` is the implementation of the [corresponding virtual function][_flush] of the
+// TransformStream class. It gets called when there's no more header to compress. The final step:
+// [_flush]: https://nodejs.org/api/stream.html#stream_transform_flush_callback
+HeaderSetCompressor.prototype._flush = function _flush(callback) {
+ callback();
+};
+
+// [Detailed Format](https://tools.ietf.org/html/rfc7541#section-5)
+// -----------------
+
+// ### Integer representation ###
+//
+// The algorithm to represent an integer I is as follows:
+//
+// 1. If I < 2^N - 1, encode I on N bits
+// 2. Else, encode 2^N - 1 on N bits and do the following steps:
+// 1. Set I to (I - (2^N - 1)) and Q to 1
+// 2. While Q > 0
+// 1. Compute Q and R, quotient and remainder of I divided by 2^7
+// 2. If Q is strictly greater than 0, write one 1 bit; otherwise, write one 0 bit
+// 3. Encode R on the next 7 bits
+// 4. I = Q
+
+HeaderSetCompressor.integer = function writeInteger(I, N) {
+ var limit = Math.pow(2,N) - 1;
+ if (I < limit) {
+ return [Buffer.from([I])];
+ }
+
+ var bytes = [];
+ if (N !== 0) {
+ bytes.push(limit);
+ }
+ I -= limit;
+
+ var Q = 1, R;
+ while (Q > 0) {
+ Q = Math.floor(I / 128);
+ R = I % 128;
+
+ if (Q > 0) {
+ R += 128;
+ }
+ bytes.push(R);
+
+ I = Q;
+ }
+
+ return [Buffer.from(bytes)];
+};
+
+// The inverse algorithm:
+//
+// 1. Set I to the number coded on the lower N bits of the first byte
+// 2. If I is smaller than 2^N - 1 then return I
+// 2. Else the number is encoded on more than one byte, so do the following steps:
+// 1. Set M to 0
+// 2. While returning with I
+// 1. Let B be the next byte (the first byte if N is 0)
+// 2. Read out the lower 7 bits of B and multiply it with 2^M
+// 3. Increase I with this number
+// 4. Increase M by 7
+// 5. Return I if the most significant bit of B is 0
+
+HeaderSetDecompressor.integer = function readInteger(buffer, N) {
+ var limit = Math.pow(2,N) - 1;
+
+ var I = buffer[buffer.cursor] & limit;
+ if (N !== 0) {
+ buffer.cursor += 1;
+ }
+
+ if (I === limit) {
+ var M = 0;
+ do {
+ I += (buffer[buffer.cursor] & 127) << M;
+ M += 7;
+ buffer.cursor += 1;
+ } while (buffer[buffer.cursor - 1] & 128);
+ }
+
+ return I;
+};
+
+// ### Huffman Encoding ###
+
+function HuffmanTable(table) {
+ function createTree(codes, position) {
+ if (codes.length === 1) {
+ return [table.indexOf(codes[0])];
+ }
+
+ else {
+ position = position || 0;
+ var zero = [];
+ var one = [];
+ for (var i = 0; i < codes.length; i++) {
+ var string = codes[i];
+ if (string[position] === '0') {
+ zero.push(string);
+ } else {
+ one.push(string);
+ }
+ }
+ return [createTree(zero, position + 1), createTree(one, position + 1)];
+ }
+ }
+
+ this.tree = createTree(table);
+
+ this.codes = table.map(function(bits) {
+ return parseInt(bits, 2);
+ });
+ this.lengths = table.map(function(bits) {
+ return bits.length;
+ });
+}
+
+HuffmanTable.prototype.encode = function encode(buffer) {
+ var result = [];
+ var space = 8;
+
+ function add(data) {
+ if (space === 8) {
+ result.push(data);
+ } else {
+ result[result.length - 1] |= data;
+ }
+ }
+
+ for (var i = 0; i < buffer.length; i++) {
+ var byte = buffer[i];
+ var code = this.codes[byte];
+ var length = this.lengths[byte];
+
+ while (length !== 0) {
+ if (space >= length) {
+ add(code << (space - length));
+ code = 0;
+ space -= length;
+ length = 0;
+ } else {
+ var shift = length - space;
+ var msb = code >> shift;
+ add(msb);
+ code -= msb << shift;
+ length -= space;
+ space = 0;
+ }
+
+ if (space === 0) {
+ space = 8;
+ }
+ }
+ }
+
+ if (space !== 8) {
+ add(this.codes[256] >> (this.lengths[256] - space));
+ }
+
+ return Buffer.from(result);
+};
+
+HuffmanTable.prototype.decode = function decode(buffer) {
+ var result = [];
+ var subtree = this.tree;
+
+ for (var i = 0; i < buffer.length; i++) {
+ var byte = buffer[i];
+
+ for (var j = 0; j < 8; j++) {
+ var bit = (byte & 128) ? 1 : 0;
+ byte = byte << 1;
+
+ subtree = subtree[bit];
+ if (subtree.length === 1) {
+ result.push(subtree[0]);
+ subtree = this.tree;
+ }
+ }
+ }
+
+ return Buffer.from(result);
+};
+
+// The initializer arrays for the Huffman tables are generated with feeding the tables from the
+// spec to this sed command:
+//
+// sed -e "s/^.* [|]//g" -e "s/|//g" -e "s/ .*//g" -e "s/^/ '/g" -e "s/$/',/g"
+
+HuffmanTable.huffmanTable = new HuffmanTable([
+ '1111111111000',
+ '11111111111111111011000',
+ '1111111111111111111111100010',
+ '1111111111111111111111100011',
+ '1111111111111111111111100100',
+ '1111111111111111111111100101',
+ '1111111111111111111111100110',
+ '1111111111111111111111100111',
+ '1111111111111111111111101000',
+ '111111111111111111101010',
+ '111111111111111111111111111100',
+ '1111111111111111111111101001',
+ '1111111111111111111111101010',
+ '111111111111111111111111111101',
+ '1111111111111111111111101011',
+ '1111111111111111111111101100',
+ '1111111111111111111111101101',
+ '1111111111111111111111101110',
+ '1111111111111111111111101111',
+ '1111111111111111111111110000',
+ '1111111111111111111111110001',
+ '1111111111111111111111110010',
+ '111111111111111111111111111110',
+ '1111111111111111111111110011',
+ '1111111111111111111111110100',
+ '1111111111111111111111110101',
+ '1111111111111111111111110110',
+ '1111111111111111111111110111',
+ '1111111111111111111111111000',
+ '1111111111111111111111111001',
+ '1111111111111111111111111010',
+ '1111111111111111111111111011',
+ '010100',
+ '1111111000',
+ '1111111001',
+ '111111111010',
+ '1111111111001',
+ '010101',
+ '11111000',
+ '11111111010',
+ '1111111010',
+ '1111111011',
+ '11111001',
+ '11111111011',
+ '11111010',
+ '010110',
+ '010111',
+ '011000',
+ '00000',
+ '00001',
+ '00010',
+ '011001',
+ '011010',
+ '011011',
+ '011100',
+ '011101',
+ '011110',
+ '011111',
+ '1011100',
+ '11111011',
+ '111111111111100',
+ '100000',
+ '111111111011',
+ '1111111100',
+ '1111111111010',
+ '100001',
+ '1011101',
+ '1011110',
+ '1011111',
+ '1100000',
+ '1100001',
+ '1100010',
+ '1100011',
+ '1100100',
+ '1100101',
+ '1100110',
+ '1100111',
+ '1101000',
+ '1101001',
+ '1101010',
+ '1101011',
+ '1101100',
+ '1101101',
+ '1101110',
+ '1101111',
+ '1110000',
+ '1110001',
+ '1110010',
+ '11111100',
+ '1110011',
+ '11111101',
+ '1111111111011',
+ '1111111111111110000',
+ '1111111111100',
+ '11111111111100',
+ '100010',
+ '111111111111101',
+ '00011',
+ '100011',
+ '00100',
+ '100100',
+ '00101',
+ '100101',
+ '100110',
+ '100111',
+ '00110',
+ '1110100',
+ '1110101',
+ '101000',
+ '101001',
+ '101010',
+ '00111',
+ '101011',
+ '1110110',
+ '101100',
+ '01000',
+ '01001',
+ '101101',
+ '1110111',
+ '1111000',
+ '1111001',
+ '1111010',
+ '1111011',
+ '111111111111110',
+ '11111111100',
+ '11111111111101',
+ '1111111111101',
+ '1111111111111111111111111100',
+ '11111111111111100110',
+ '1111111111111111010010',
+ '11111111111111100111',
+ '11111111111111101000',
+ '1111111111111111010011',
+ '1111111111111111010100',
+ '1111111111111111010101',
+ '11111111111111111011001',
+ '1111111111111111010110',
+ '11111111111111111011010',
+ '11111111111111111011011',
+ '11111111111111111011100',
+ '11111111111111111011101',
+ '11111111111111111011110',
+ '111111111111111111101011',
+ '11111111111111111011111',
+ '111111111111111111101100',
+ '111111111111111111101101',
+ '1111111111111111010111',
+ '11111111111111111100000',
+ '111111111111111111101110',
+ '11111111111111111100001',
+ '11111111111111111100010',
+ '11111111111111111100011',
+ '11111111111111111100100',
+ '111111111111111011100',
+ '1111111111111111011000',
+ '11111111111111111100101',
+ '1111111111111111011001',
+ '11111111111111111100110',
+ '11111111111111111100111',
+ '111111111111111111101111',
+ '1111111111111111011010',
+ '111111111111111011101',
+ '11111111111111101001',
+ '1111111111111111011011',
+ '1111111111111111011100',
+ '11111111111111111101000',
+ '11111111111111111101001',
+ '111111111111111011110',
+ '11111111111111111101010',
+ '1111111111111111011101',
+ '1111111111111111011110',
+ '111111111111111111110000',
+ '111111111111111011111',
+ '1111111111111111011111',
+ '11111111111111111101011',
+ '11111111111111111101100',
+ '111111111111111100000',
+ '111111111111111100001',
+ '1111111111111111100000',
+ '111111111111111100010',
+ '11111111111111111101101',
+ '1111111111111111100001',
+ '11111111111111111101110',
+ '11111111111111111101111',
+ '11111111111111101010',
+ '1111111111111111100010',
+ '1111111111111111100011',
+ '1111111111111111100100',
+ '11111111111111111110000',
+ '1111111111111111100101',
+ '1111111111111111100110',
+ '11111111111111111110001',
+ '11111111111111111111100000',
+ '11111111111111111111100001',
+ '11111111111111101011',
+ '1111111111111110001',
+ '1111111111111111100111',
+ '11111111111111111110010',
+ '1111111111111111101000',
+ '1111111111111111111101100',
+ '11111111111111111111100010',
+ '11111111111111111111100011',
+ '11111111111111111111100100',
+ '111111111111111111111011110',
+ '111111111111111111111011111',
+ '11111111111111111111100101',
+ '111111111111111111110001',
+ '1111111111111111111101101',
+ '1111111111111110010',
+ '111111111111111100011',
+ '11111111111111111111100110',
+ '111111111111111111111100000',
+ '111111111111111111111100001',
+ '11111111111111111111100111',
+ '111111111111111111111100010',
+ '111111111111111111110010',
+ '111111111111111100100',
+ '111111111111111100101',
+ '11111111111111111111101000',
+ '11111111111111111111101001',
+ '1111111111111111111111111101',
+ '111111111111111111111100011',
+ '111111111111111111111100100',
+ '111111111111111111111100101',
+ '11111111111111101100',
+ '111111111111111111110011',
+ '11111111111111101101',
+ '111111111111111100110',
+ '1111111111111111101001',
+ '111111111111111100111',
+ '111111111111111101000',
+ '11111111111111111110011',
+ '1111111111111111101010',
+ '1111111111111111101011',
+ '1111111111111111111101110',
+ '1111111111111111111101111',
+ '111111111111111111110100',
+ '111111111111111111110101',
+ '11111111111111111111101010',
+ '11111111111111111110100',
+ '11111111111111111111101011',
+ '111111111111111111111100110',
+ '11111111111111111111101100',
+ '11111111111111111111101101',
+ '111111111111111111111100111',
+ '111111111111111111111101000',
+ '111111111111111111111101001',
+ '111111111111111111111101010',
+ '111111111111111111111101011',
+ '1111111111111111111111111110',
+ '111111111111111111111101100',
+ '111111111111111111111101101',
+ '111111111111111111111101110',
+ '111111111111111111111101111',
+ '111111111111111111111110000',
+ '11111111111111111111101110',
+ '111111111111111111111111111111'
+]);
+
+// ### String literal representation ###
+//
+// Literal **strings** can represent header names or header values. There's two variant of the
+// string encoding:
+//
+// String literal with Huffman encoding:
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 1 | Value Length Prefix (7) |
+// +---+---+---+---+---+---+---+---+
+// | Value Length (0-N bytes) |
+// +---+---+---+---+---+---+---+---+
+// ...
+// +---+---+---+---+---+---+---+---+
+// | Huffman Encoded Data |Padding|
+// +---+---+---+---+---+---+---+---+
+//
+// String literal without Huffman encoding:
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | Value Length Prefix (7) |
+// +---+---+---+---+---+---+---+---+
+// | Value Length (0-N bytes) |
+// +---+---+---+---+---+---+---+---+
+// ...
+// +---+---+---+---+---+---+---+---+
+// | Field Bytes Without Encoding |
+// +---+---+---+---+---+---+---+---+
+
+HeaderSetCompressor.string = function writeString(str) {
+ str = Buffer.from(str, 'utf8');
+
+ var huffman = HuffmanTable.huffmanTable.encode(str);
+ if (huffman.length < str.length) {
+ var length = HeaderSetCompressor.integer(huffman.length, 7);
+ length[0][0] |= 128;
+ return length.concat(huffman);
+ }
+
+ else {
+ length = HeaderSetCompressor.integer(str.length, 7);
+ return length.concat(str);
+ }
+};
+
+HeaderSetDecompressor.string = function readString(buffer) {
+ var huffman = buffer[buffer.cursor] & 128;
+ var length = HeaderSetDecompressor.integer(buffer, 7);
+ var encoded = buffer.slice(buffer.cursor, buffer.cursor + length);
+ buffer.cursor += length;
+ return (huffman ? HuffmanTable.huffmanTable.decode(encoded) : encoded).toString('utf8');
+};
+
+// ### Header represenations ###
+
+// The JavaScript object representation is described near the
+// `HeaderSetDecompressor.prototype._execute()` method definition.
+//
+// **All binary header representations** start with a prefix signaling the representation type and
+// an index represented using prefix coded integers:
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 1 | Index (7+) | Indexed Representation
+// +---+---------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 1 | Index (6+) |
+// +---+---+---+-------------------+ Literal w/ Indexing
+// | Value Length (8+) |
+// +-------------------------------+ w/ Indexed Name
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 1 | 0 |
+// +---+---+---+-------------------+
+// | Name Length (8+) |
+// +-------------------------------+ Literal w/ Indexing
+// | Name String (Length octets) |
+// +-------------------------------+ w/ New Name
+// | Value Length (8+) |
+// +-------------------------------+
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 0 | 0 | 0 | Index (4+) |
+// +---+---+---+-------------------+ Literal w/o Incremental Indexing
+// | Value Length (8+) |
+// +-------------------------------+ w/ Indexed Name
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 0 | 0 | 0 | 0 |
+// +---+---+---+-------------------+
+// | Name Length (8+) |
+// +-------------------------------+ Literal w/o Incremental Indexing
+// | Name String (Length octets) |
+// +-------------------------------+ w/ New Name
+// | Value Length (8+) |
+// +-------------------------------+
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 0 | 0 | 1 | Index (4+) |
+// +---+---+---+-------------------+ Literal never indexed
+// | Value Length (8+) |
+// +-------------------------------+ w/ Indexed Name
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// 0 1 2 3 4 5 6 7
+// +---+---+---+---+---+---+---+---+
+// | 0 | 0 | 0 | 1 | 0 |
+// +---+---+---+-------------------+
+// | Name Length (8+) |
+// +-------------------------------+ Literal never indexed
+// | Name String (Length octets) |
+// +-------------------------------+ w/ New Name
+// | Value Length (8+) |
+// +-------------------------------+
+// | Value String (Length octets) |
+// +-------------------------------+
+//
+// The **Indexed Representation** consists of the 1-bit prefix and the Index that is represented as
+// a 7-bit prefix coded integer and nothing else.
+//
+// After the first bits, **all literal representations** specify the header name, either as a
+// pointer to the Header Table (Index) or a string literal. When the string literal representation
+// is used, the Index is set to 0 and the string literal starts at the second byte.
+//
+// For **all literal representations**, the specification of the header value comes next. It is
+// always represented as a string.
+
+var representations = {
+ indexed : { prefix: 7, pattern: 0x80 },
+ literalIncremental : { prefix: 6, pattern: 0x40 },
+ contextUpdate : { prefix: 0, pattern: 0x20 },
+ literalNeverIndexed : { prefix: 4, pattern: 0x10 },
+ literal : { prefix: 4, pattern: 0x00 }
+};
+
+HeaderSetCompressor.header = function writeHeader(header) {
+ var representation, buffers = [];
+
+ if (header.contextUpdate) {
+ representation = representations.contextUpdate;
+ } else if (typeof header.value === 'number') {
+ representation = representations.indexed;
+ } else if (header.index) {
+ representation = representations.literalIncremental;
+ } else if (header.mustNeverIndex) {
+ representation = representations.literalNeverIndexed;
+ } else {
+ representation = representations.literal;
+ }
+
+ if (representation === representations.contextUpdate) {
+ buffers.push(HeaderSetCompressor.integer(header.newMaxSize, 5));
+ }
+
+ else if (representation === representations.indexed) {
+ buffers.push(HeaderSetCompressor.integer(header.value + 1, representation.prefix));
+ }
+
+ else {
+ if (typeof header.name === 'number') {
+ buffers.push(HeaderSetCompressor.integer(header.name + 1, representation.prefix));
+ } else {
+ buffers.push(HeaderSetCompressor.integer(0, representation.prefix));
+ buffers.push(HeaderSetCompressor.string(header.name));
+ }
+ buffers.push(HeaderSetCompressor.string(header.value));
+ }
+
+ buffers[0][0][0] |= representation.pattern;
+
+ return Array.prototype.concat.apply([], buffers); // array of arrays of buffers -> array of buffers
+};
+
+HeaderSetDecompressor.header = function readHeader(buffer) {
+ var representation, header = {};
+
+ var firstByte = buffer[buffer.cursor];
+ if (firstByte & 0x80) {
+ representation = representations.indexed;
+ } else if (firstByte & 0x40) {
+ representation = representations.literalIncremental;
+ } else if (firstByte & 0x20) {
+ representation = representations.contextUpdate;
+ } else if (firstByte & 0x10) {
+ representation = representations.literalNeverIndexed;
+ } else {
+ representation = representations.literal;
+ }
+
+ header.value = header.name = -1;
+ header.index = false;
+ header.contextUpdate = false;
+ header.newMaxSize = 0;
+ header.mustNeverIndex = false;
+
+ if (representation === representations.contextUpdate) {
+ header.contextUpdate = true;
+ header.newMaxSize = HeaderSetDecompressor.integer(buffer, 5);
+ }
+
+ else if (representation === representations.indexed) {
+ header.value = header.name = HeaderSetDecompressor.integer(buffer, representation.prefix) - 1;
+ }
+
+ else {
+ header.name = HeaderSetDecompressor.integer(buffer, representation.prefix) - 1;
+ if (header.name === -1) {
+ header.name = HeaderSetDecompressor.string(buffer);
+ }
+ header.value = HeaderSetDecompressor.string(buffer);
+ header.index = (representation === representations.literalIncremental);
+ header.mustNeverIndex = (representation === representations.literalNeverIndexed);
+ }
+
+ return header;
+};
+
+// Integration with HTTP/2
+// =======================
+
+// This section describes the interaction between the compressor/decompressor and the rest of the
+// HTTP/2 implementation. The `Compressor` and the `Decompressor` makes up a layer between the
+// [framer](framer.html) and the [connection handling component](connection.html). They let most
+// frames pass through, except HEADERS and PUSH_PROMISE frames. They convert the frames between
+// these two representations:
+//
+// { {
+// type: 'HEADERS', type: 'HEADERS',
+// flags: {}, flags: {},
+// stream: 1, <===> stream: 1,
+// headers: { data: Buffer
+// N1: 'V1', }
+// N2: ['V1', 'V2', ...],
+// // ...
+// }
+// }
+//
+// There are possibly several binary frame that belong to a single non-binary frame.
+
+var MAX_HTTP_PAYLOAD_SIZE = 16384;
+
+// The Compressor class
+// --------------------
+
+// The Compressor transform stream is basically stateless.
+util.inherits(Compressor, TransformStream);
+function Compressor(log, type) {
+ TransformStream.call(this, { objectMode: true });
+
+ this._log = log.child({ component: 'compressor' });
+
+ assert((type === 'REQUEST') || (type === 'RESPONSE'));
+ this._table = new HeaderTable(this._log);
+
+ this.tableSizeChangePending = false;
+ this.lowestTableSizePending = 0;
+ this.tableSizeSetting = DEFAULT_HEADER_TABLE_LIMIT;
+}
+
+// Changing the header table size
+Compressor.prototype.setTableSizeLimit = function setTableSizeLimit(size) {
+ this._table.setSizeLimit(size);
+ if (!this.tableSizeChangePending || size < this.lowestTableSizePending) {
+ this.lowestTableSizePending = size;
+ }
+ this.tableSizeSetting = size;
+ this.tableSizeChangePending = true;
+};
+
+// `compress` takes a header set, and compresses it using a new `HeaderSetCompressor` stream
+// instance. This means that from now on, the advantages of streaming header encoding are lost,
+// but the API becomes simpler.
+Compressor.prototype.compress = function compress(headers) {
+ var compressor = new HeaderSetCompressor(this._log, this._table);
+
+ if (this.tableSizeChangePending) {
+ if (this.lowestTableSizePending < this.tableSizeSetting) {
+ compressor.send({contextUpdate: true, newMaxSize: this.lowestTableSizePending,
+ name: "", value: "", index: 0});
+ }
+ compressor.send({contextUpdate: true, newMaxSize: this.tableSizeSetting,
+ name: "", value: "", index: 0});
+ this.tableSizeChangePending = false;
+ }
+ var colonHeaders = [];
+ var nonColonHeaders = [];
+
+ // To ensure we send colon headers first
+ for (var name in headers) {
+ if (name.trim()[0] === ':') {
+ colonHeaders.push(name);
+ } else {
+ nonColonHeaders.push(name);
+ }
+ }
+
+ function compressHeader(name) {
+ var value = headers[name];
+ name = String(name).toLowerCase();
+
+ // * To allow for better compression efficiency, the Cookie header field MAY be split into
+ // separate header fields, each with one or more cookie-pairs.
+ if (name == 'cookie') {
+ if (!(value instanceof Array)) {
+ value = [value];
+ }
+ value = Array.prototype.concat.apply([], value.map(function(cookie) {
+ return String(cookie).split(';').map(trim);
+ }));
+ }
+
+ if (value instanceof Array) {
+ for (var i = 0; i < value.length; i++) {
+ compressor.write([name, String(value[i])]);
+ }
+ } else {
+ compressor.write([name, String(value)]);
+ }
+ }
+
+ colonHeaders.forEach(compressHeader);
+ nonColonHeaders.forEach(compressHeader);
+
+ compressor.end();
+
+ var chunk, chunks = [];
+ while (chunk = compressor.read()) {
+ chunks.push(chunk);
+ }
+
+ function insertSoftIllegalHpack(originalCompressed) {
+ var illegalLiteral = Buffer.from([
+ 0x00, // Literal, no index
+ 0x08, // Name: not huffman encoded, 8 bytes long
+ 0x3a,
+ 0x69,
+ 0x6c,
+ 0x6c,
+ 0x65,
+ 0x67,
+ 0x61,
+ 0x6c, // :illegal
+ 0x10, // Value: not huffman encoded, 16 bytes long
+ // REALLY NOT LEGAL
+ 0x52,
+ 0x45,
+ 0x41,
+ 0x4c,
+ 0x4c,
+ 0x59,
+ 0x20,
+ 0x4e,
+ 0x4f,
+ 0x54,
+ 0x20,
+ 0x4c,
+ 0x45,
+ 0x47,
+ 0x41,
+ 0x4c,
+ ]);
+ var newBufferLength = originalCompressed.length + illegalLiteral.length;
+ var concatenated = Buffer.alloc(newBufferLength);
+ originalCompressed.copy(concatenated, 0);
+ illegalLiteral.copy(concatenated, originalCompressed.length);
+ return concatenated;
+ }
+
+ function insertHardIllegalHpack(originalCompressed) {
+ // Now we have to add an invalid header
+ var illegalIndexed = HeaderSetCompressor.integer(5000, 7);
+ // The above returns an array of buffers, but there's only one buffer, so
+ // get rid of the array.
+ illegalIndexed = illegalIndexed[0];
+ // Set the first bit to 1 to signal this is an indexed representation
+ illegalIndexed[0] |= 0x80;
+ var newBufferLength = originalCompressed.length + illegalIndexed.length;
+ var concatenated = Buffer.alloc(newBufferLength);
+ originalCompressed.copy(concatenated, 0);
+ illegalIndexed.copy(concatenated, originalCompressed.length);
+ return concatenated;
+ }
+
+ if ("x-softillegalhpack" in headers) {
+ return insertSoftIllegalHpack(concat(chunks));
+ }
+
+ if ("x-hardillegalhpack" in headers) {
+ return insertHardIllegalHpack(concat(chunks));
+ }
+
+ return concat(chunks);
+};
+
+// When a `frame` arrives
+Compressor.prototype._transform = function _transform(frame, encoding, done) {
+ // * and it is a HEADERS or PUSH_PROMISE frame
+ // * it generates a header block using the compress method
+ // * cuts the header block into `chunks` that are not larger than `MAX_HTTP_PAYLOAD_SIZE`
+ // * for each chunk, it pushes out a chunk frame that is identical to the original, except
+ // the `data` property which holds the given chunk, the type of the frame which is always
+ // CONTINUATION except for the first frame, and the END_HEADERS/END_PUSH_STREAM flag that
+ // marks the last frame and the END_STREAM flag which is always false before the end
+ if (frame.type === 'HEADERS' || frame.type === 'PUSH_PROMISE') {
+ var buffer = this.compress(frame.headers);
+
+ // This will result in CONTINUATIONs from a PUSH_PROMISE being 4 bytes shorter than they could
+ // be, but that's not the end of the world, and it prevents us from going over MAX_HTTP_PAYLOAD_SIZE
+ // on the initial PUSH_PROMISE frame.
+ var adjustment = frame.type === 'PUSH_PROMISE' ? 4 : 0;
+ var chunks = cut(buffer, MAX_HTTP_PAYLOAD_SIZE - adjustment);
+
+ for (var i = 0; i < chunks.length; i++) {
+ var chunkFrame;
+ var first = (i === 0);
+ var last = (i === chunks.length - 1);
+
+ if (first) {
+ chunkFrame = util._extend({}, frame);
+ chunkFrame.flags = util._extend({}, frame.flags);
+ chunkFrame.flags['END_' + frame.type] = last;
+ } else {
+ chunkFrame = {
+ type: 'CONTINUATION',
+ flags: { END_HEADERS: last },
+ stream: frame.stream
+ };
+ }
+ chunkFrame.data = chunks[i];
+
+ this.push(chunkFrame);
+ }
+ }
+
+ // * otherwise, the frame is forwarded without taking any action
+ else {
+ this.push(frame);
+ }
+
+ done();
+};
+
+// The Decompressor class
+// ----------------------
+
+// The Decompressor is a stateful transform stream, since it has to collect multiple frames first,
+// and the decoding comes after unifying the payload of those frames.
+//
+// If there's a frame in progress, `this._inProgress` is `true`. The frames are collected in
+// `this._frames`, and the type of the frame and the stream identifier is stored in `this._type`
+// and `this._stream` respectively.
+util.inherits(Decompressor, TransformStream);
+function Decompressor(log, type) {
+ TransformStream.call(this, { objectMode: true });
+
+ this._log = log.child({ component: 'compressor' });
+
+ assert((type === 'REQUEST') || (type === 'RESPONSE'));
+ this._table = new HeaderTable(this._log);
+
+ this._inProgress = false;
+ this._base = undefined;
+}
+
+// Changing the header table size
+Decompressor.prototype.setTableSizeLimit = function setTableSizeLimit(size) {
+ this._table.setSizeLimit(size);
+};
+
+// `decompress` takes a full header block, and decompresses it using a new `HeaderSetDecompressor`
+// stream instance. This means that from now on, the advantages of streaming header decoding are
+// lost, but the API becomes simpler.
+Decompressor.prototype.decompress = function decompress(block) {
+ var decompressor = new HeaderSetDecompressor(this._log, this._table);
+ decompressor.end(block);
+
+ var seenNonColonHeader = false;
+ var headers = {};
+ var pair;
+ while (pair = decompressor.read()) {
+ var name = pair[0];
+ var value = pair[1];
+ var isColonHeader = (name.trim()[0] === ':');
+ if (seenNonColonHeader && isColonHeader) {
+ this.emit('error', 'PROTOCOL_ERROR');
+ return headers;
+ }
+ seenNonColonHeader = !isColonHeader;
+ if (name in headers) {
+ if (headers[name] instanceof Array) {
+ headers[name].push(value);
+ } else {
+ headers[name] = [headers[name], value];
+ }
+ } else {
+ headers[name] = value;
+ }
+ }
+
+ // * If there are multiple Cookie header fields after decompression, these MUST be concatenated
+ // into a single octet string using the two octet delimiter of 0x3B, 0x20 (the ASCII
+ // string "; ").
+ if (('cookie' in headers) && (headers['cookie'] instanceof Array)) {
+ headers['cookie'] = headers['cookie'].join('; ');
+ }
+
+ return headers;
+};
+
+// When a `frame` arrives
+Decompressor.prototype._transform = function _transform(frame, encoding, done) {
+ // * and the collection process is already `_inProgress`, the frame is simply stored, except if
+ // it's an illegal frame
+ if (this._inProgress) {
+ if ((frame.type !== 'CONTINUATION') || (frame.stream !== this._base.stream)) {
+ this._log.error('A series of HEADER frames were not continuous');
+ this.emit('error', 'PROTOCOL_ERROR');
+ return;
+ }
+ this._frames.push(frame);
+ }
+
+ // * and the collection process is not `_inProgress`, but the new frame's type is HEADERS or
+ // PUSH_PROMISE, a new collection process begins
+ else if ((frame.type === 'HEADERS') || (frame.type === 'PUSH_PROMISE')) {
+ this._inProgress = true;
+ this._base = util._extend({}, frame);
+ this._frames = [frame];
+ }
+
+ // * otherwise, the frame is forwarded without taking any action
+ else {
+ this.push(frame);
+ }
+
+ // * When the frame signals that it's the last in the series, the header block chunks are
+ // concatenated, the headers are decompressed, and a new frame gets pushed out with the
+ // decompressed headers.
+ if (this._inProgress && (frame.flags.END_HEADERS || frame.flags.END_PUSH_PROMISE)) {
+ var buffer = concat(this._frames.map(function(frame) {
+ return frame.data;
+ }));
+ try {
+ var headers = this.decompress(buffer);
+ } catch(error) {
+ this._log.error({ err: error }, 'Header decompression error');
+ this.emit('error', 'COMPRESSION_ERROR');
+ return;
+ }
+ this.push(util._extend(this._base, { headers: headers }));
+ this._inProgress = false;
+ }
+
+ done();
+};
+
+// Helper functions
+// ================
+
+// Concatenate an array of buffers into a new buffer
+function concat(buffers) {
+ var size = 0;
+ for (var i = 0; i < buffers.length; i++) {
+ size += buffers[i].length;
+ }
+
+ var concatenated = Buffer.alloc(size);
+ for (var cursor = 0, j = 0; j < buffers.length; cursor += buffers[j].length, j++) {
+ buffers[j].copy(concatenated, cursor);
+ }
+
+ return concatenated;
+}
+
+// Cut `buffer` into chunks not larger than `size`
+function cut(buffer, size) {
+ var chunks = [];
+ var cursor = 0;
+ do {
+ var chunkSize = Math.min(size, buffer.length - cursor);
+ chunks.push(buffer.slice(cursor, cursor + chunkSize));
+ cursor += chunkSize;
+ } while(cursor < buffer.length);
+ return chunks;
+}
+
+function trim(string) {
+ return string.trim();
+}
diff --git a/testing/xpcshell/node-http2/lib/protocol/connection.js b/testing/xpcshell/node-http2/lib/protocol/connection.js
new file mode 100644
index 0000000000..8c203675fa
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/connection.js
@@ -0,0 +1,630 @@
+var assert = require('assert');
+
+// The Connection class
+// ====================
+
+// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
+// stream (TCP stream). It operates by sending and receiving frames and is implemented as a
+// [Flow](flow.html) subclass.
+
+var Flow = require('./flow').Flow;
+
+exports.Connection = Connection;
+
+// Public API
+// ----------
+
+// * **new Connection(log, firstStreamId, settings)**: create a new Connection
+//
+// * **Event: 'error' (type)**: signals a connection level error made by the other end
+//
+// * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
+// code other than NO_ERROR
+//
+// * **Event: 'stream' (stream)**: signals that there's an incoming stream
+//
+// * **createStream(): stream**: initiate a new stream
+//
+// * **set(settings, callback)**: change the value of one or more settings according to the
+// key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
+//
+// * **ping([callback])**: send a ping and call callback when the answer arrives
+//
+// * **close([error])**: close the stream with an error code
+
+// Constructor
+// -----------
+
+// The main aspects of managing the connection are:
+function Connection(log, firstStreamId, settings) {
+ // * initializing the base class
+ Flow.call(this, 0);
+
+ // * logging: every method uses the common logger object
+ this._log = log.child({ component: 'connection' });
+
+ // * stream management
+ this._initializeStreamManagement(firstStreamId);
+
+ // * lifecycle management
+ this._initializeLifecycleManagement();
+
+ // * flow control
+ this._initializeFlowControl();
+
+ // * settings management
+ this._initializeSettingsManagement(settings);
+
+ // * multiplexing
+ this._initializeMultiplexing();
+}
+Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
+
+// Overview
+// --------
+
+// | ^ | ^
+// v | v |
+// +--------------+ +--------------+
+// +---| stream1 |---| stream2 |---- .... ---+
+// | | +----------+ | | +----------+ | |
+// | | | stream1. | | | | stream2. | | |
+// | +-| upstream |-+ +-| upstream |-+ |
+// | +----------+ +----------+ |
+// | | ^ | ^ |
+// | v | v | |
+// | +-----+-------------+-----+-------- .... |
+// | ^ | | | |
+// | | v | | |
+// | +--------------+ | | |
+// | | stream0 | | | |
+// | | connection | | | |
+// | | management | multiplexing |
+// | +--------------+ flow control |
+// | | ^ |
+// | _read() | | _write() |
+// | v | |
+// | +------------+ +-----------+ |
+// | |output queue| |input queue| |
+// +----------------+------------+-+-----------+-----------------+
+// | ^
+// read() | | write()
+// v |
+
+// Stream management
+// -----------------
+
+var Stream = require('./stream').Stream;
+
+// Initialization:
+Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
+ // * streams are stored in two data structures:
+ // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
+ // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
+ this._streamIds = [];
+ this._streamPriorities = [];
+
+ // * The next outbound stream ID and the last inbound stream id
+ this._nextStreamId = firstStreamId;
+ this._lastIncomingStream = 0;
+
+ // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
+ this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
+
+ // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
+ // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
+ this._streamSlotsFree = Infinity;
+ this._streamLimit = Infinity;
+ this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
+};
+
+// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
+// broadcasts the message by creating an event on it.
+Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
+ if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
+ (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') ||
+ (frame.type === 'ALTSVC') || (frame.type == 'ORIGIN')) {
+ this._log.debug({ frame: frame }, 'Receiving connection level frame');
+ this.emit(frame.type, frame);
+ } else {
+ this._log.error({ frame: frame }, 'Invalid connection level frame');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+};
+
+// Methods to manage the stream slot pool:
+Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
+ var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
+ this._streamSlotsFree += newStreamLimit - this._streamLimit;
+ this._streamLimit = newStreamLimit;
+ if (wakeup) {
+ this.emit('wakeup');
+ }
+};
+
+Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
+ if (change) {
+ this._log.trace({ free: this._streamSlotsFree, change: change },
+ 'Changing active stream count.');
+ var wakeup = (this._streamSlotsFree === 0) && (change < 0);
+ this._streamSlotsFree -= change;
+ if (wakeup) {
+ this.emit('wakeup');
+ }
+ }
+};
+
+// Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
+// an outbound stream) consists of three steps:
+//
+// 1. var stream = new Stream(this._log, this);
+// 2. this._allocateId(stream, id);
+// 2. this._allocatePriority(stream);
+
+// Allocating an ID to a stream
+Connection.prototype._allocateId = function _allocateId(stream, id) {
+ // * initiated stream without definite ID
+ if (id === undefined) {
+ id = this._nextStreamId;
+ this._nextStreamId += 2;
+ }
+
+ // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
+ else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
+ this._lastIncomingStream = id;
+ }
+
+ // * incoming stream with invalid ID
+ else {
+ this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
+ 'Invalid incoming stream ID.');
+ this.emit('error', 'PROTOCOL_ERROR');
+ return undefined;
+ }
+
+ assert(!(id in this._streamIds));
+
+ // * adding to `this._streamIds`
+ this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
+ this._streamIds[id] = stream;
+ stream.id = id;
+ this.emit('new_stream', stream, id);
+
+ // * forwarding connection errors from streams
+ stream.on('connectionError', this.emit.bind(this, 'error'));
+
+ return id;
+};
+
+// Allocating a priority to a stream, and managing priority changes
+Connection.prototype._allocatePriority = function _allocatePriority(stream) {
+ this._log.trace({ s: stream }, 'Allocating priority for stream.');
+ this._insert(stream, stream._priority);
+ stream.on('priority', this._reprioritize.bind(this, stream));
+ stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
+ this.emit('wakeup');
+};
+
+Connection.prototype._insert = function _insert(stream, priority) {
+ if (priority in this._streamPriorities) {
+ this._streamPriorities[priority].push(stream);
+ } else {
+ this._streamPriorities[priority] = [stream];
+ }
+};
+
+Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
+ var bucket = this._streamPriorities[stream._priority];
+ var index = bucket.indexOf(stream);
+ assert(index !== -1);
+ bucket.splice(index, 1);
+ if (bucket.length === 0) {
+ delete this._streamPriorities[stream._priority];
+ }
+
+ this._insert(stream, priority);
+};
+
+// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
+// a previously nonexistent stream.
+Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
+ this._log.debug({ stream_id: id }, 'New incoming stream.');
+
+ var stream = new Stream(this._log, this);
+ this._allocateId(stream, id);
+ this._allocatePriority(stream);
+ this.emit('stream', stream, id);
+
+ return stream;
+};
+
+// Creating an *outbound* stream
+Connection.prototype.createStream = function createStream() {
+ this._log.trace('Creating new outbound stream.');
+
+ // * Receiving is enabled immediately, and an ID gets assigned to the stream
+ var stream = new Stream(this._log, this);
+ this._allocatePriority(stream);
+
+ return stream;
+};
+
+// Multiplexing
+// ------------
+
+Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
+ this.on('window_update', this.emit.bind(this, 'wakeup'));
+ this._sendScheduled = false;
+ this._firstFrameReceived = false;
+};
+
+// The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
+// by child classes. It reads frames from streams and pushes them to the output buffer.
+Connection.prototype._send = function _send(immediate) {
+ // * Do not do anything if the connection is already closed
+ if (this._closed) {
+ return;
+ }
+
+ // * Collapsing multiple calls in a turn into a single deferred call
+ if (immediate) {
+ this._sendScheduled = false;
+ } else {
+ if (!this._sendScheduled) {
+ this._sendScheduled = true;
+ setImmediate(this._send.bind(this, true));
+ }
+ return;
+ }
+
+ this._log.trace('Starting forwarding frames from streams.');
+
+ // * Looping through priority `bucket`s in priority order.
+priority_loop:
+ for (var priority in this._streamPriorities) {
+ var bucket = this._streamPriorities[priority];
+ var nextBucket = [];
+
+ // * Forwarding frames from buckets with round-robin scheduling.
+ // 1. pulling out frame
+ // 2. if there's no frame, skip this stream
+ // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
+ // this stream
+ // 4. adding stream to the bucket of the next round
+ // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
+ // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
+ // 7. forwarding the frame, changing `streamCount` as appropriate
+ // 8. stepping to the next stream if there's still more frame needed in the output buffer
+ // 9. switching to the bucket of the next round
+ while (bucket.length > 0) {
+ for (var index = 0; index < bucket.length; index++) {
+ var stream = bucket[index];
+ var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
+
+ if (!frame) {
+ continue;
+ } else if (frame.count_change > this._streamSlotsFree) {
+ stream.upstream.unshift(frame);
+ continue;
+ }
+
+ nextBucket.push(stream);
+
+ if (frame.stream === undefined) {
+ frame.stream = stream.id || this._allocateId(stream);
+ }
+
+ if (frame.type === 'PUSH_PROMISE') {
+ this._allocatePriority(frame.promised_stream);
+ frame.promised_stream = this._allocateId(frame.promised_stream);
+ }
+
+ this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
+ var moreNeeded = this.push(frame);
+ this._changeStreamCount(frame.count_change);
+
+ assert(moreNeeded !== null); // The frame shouldn't be unforwarded
+ if (moreNeeded === false) {
+ break priority_loop;
+ }
+ }
+
+ bucket = nextBucket;
+ nextBucket = [];
+ }
+ }
+
+ // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
+ if (moreNeeded === undefined) {
+ this.once('wakeup', this._send.bind(this));
+ }
+
+ this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
+};
+
+// The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
+// implemented by child classes. It forwards the given frame to the appropriate stream:
+Connection.prototype._receive = function _receive(frame, done) {
+ this._log.trace({ frame: frame }, 'Forwarding incoming frame');
+
+ // * first frame needs to be checked by the `_onFirstFrameReceived` method
+ if (!this._firstFrameReceived) {
+ this._firstFrameReceived = true;
+ this._onFirstFrameReceived(frame);
+ }
+
+ // Do some sanity checking here before we create a stream
+ if ((frame.type == 'SETTINGS' ||
+ frame.type == 'PING' ||
+ frame.type == 'GOAWAY') &&
+ frame.stream != 0) {
+ // Got connection-level frame on a stream - EEP!
+ this.close('PROTOCOL_ERROR');
+ return;
+ } else if ((frame.type == 'DATA' ||
+ frame.type == 'HEADERS' ||
+ frame.type == 'PRIORITY' ||
+ frame.type == 'RST_STREAM' ||
+ frame.type == 'PUSH_PROMISE' ||
+ frame.type == 'CONTINUATION') &&
+ frame.stream == 0) {
+ // Got stream-level frame on connection - EEP!
+ this.close('PROTOCOL_ERROR');
+ return;
+ }
+ // WINDOW_UPDATE can be on either stream or connection
+
+ // * gets the appropriate stream from the stream registry
+ var stream = this._streamIds[frame.stream];
+
+ // * or creates one if it's not in `this.streams`
+ if (!stream) {
+ stream = this._createIncomingStream(frame.stream);
+ }
+
+ // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
+ if (frame.type === 'PUSH_PROMISE') {
+ frame.promised_stream = this._createIncomingStream(frame.promised_stream);
+ }
+
+ frame.count_change = this._changeStreamCount.bind(this);
+
+ // * and writes it to the `stream`'s `upstream`
+ stream.upstream.write(frame);
+
+ done();
+};
+
+// Settings management
+// -------------------
+
+var defaultSettings = {
+};
+
+// Settings management initialization:
+Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
+ // * Setting up the callback queue for setting acknowledgements
+ this._settingsAckCallbacks = [];
+
+ // * Sending the initial settings.
+ this._log.debug({ settings: settings },
+ 'Sending the first SETTINGS frame as part of the connection header.');
+ this.set(settings || defaultSettings);
+
+ // * Forwarding SETTINGS frames to the `_receiveSettings` method
+ this.on('SETTINGS', this._receiveSettings);
+ this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize);
+};
+
+// * Checking that the first frame the other endpoint sends is SETTINGS
+Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
+ if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
+ this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
+ } else {
+ this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+};
+
+// Handling of incoming SETTINGS frames.
+Connection.prototype._receiveSettings = function _receiveSettings(frame) {
+ // * If it's an ACK, call the appropriate callback
+ if (frame.flags.ACK) {
+ var callback = this._settingsAckCallbacks.shift();
+ if (callback) {
+ callback();
+ }
+ }
+
+ // * If it's a setting change request, then send an ACK and change the appropriate settings
+ else {
+ if (!this._closed) {
+ this.push({
+ type: 'SETTINGS',
+ flags: { ACK: true },
+ stream: 0,
+ settings: {}
+ });
+ }
+ for (var name in frame.settings) {
+ this.emit('RECEIVING_' + name, frame.settings[name]);
+ }
+ }
+};
+
+Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) {
+ if ((value < 0x4000) || (value >= 0x01000000)) {
+ this._log.fatal('Received invalid value for max frame size: ' + value);
+ this.emit('error');
+ }
+};
+
+// Changing one or more settings value and sending out a SETTINGS frame
+Connection.prototype.set = function set(settings, callback) {
+ // * Calling the callback and emitting event when the change is acknowledges
+ var self = this;
+ this._settingsAckCallbacks.push(function() {
+ for (var name in settings) {
+ self.emit('ACKNOWLEDGED_' + name, settings[name]);
+ }
+ if (callback) {
+ callback();
+ }
+ });
+
+ // * Sending out the SETTINGS frame
+ this.push({
+ type: 'SETTINGS',
+ flags: { ACK: false },
+ stream: 0,
+ settings: settings
+ });
+ for (var name in settings) {
+ this.emit('SENDING_' + name, settings[name]);
+ }
+};
+
+// Lifecycle management
+// --------------------
+
+// The main responsibilities of lifecycle management code:
+//
+// * keeping the connection alive by
+// * sending PINGs when the connection is idle
+// * answering PINGs
+// * ending the connection
+
+Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
+ this._pings = {};
+ this.on('PING', this._receivePing);
+ this.on('GOAWAY', this._receiveGoaway);
+ this._closed = false;
+};
+
+// Generating a string of length 16 with random hexadecimal digits
+Connection.prototype._generatePingId = function _generatePingId() {
+ do {
+ var id = '';
+ for (var i = 0; i < 16; i++) {
+ id += Math.floor(Math.random()*16).toString(16);
+ }
+ } while(id in this._pings);
+ return id;
+};
+
+// Sending a ping and calling `callback` when the answer arrives
+Connection.prototype.ping = function ping(callback) {
+ var id = this._generatePingId();
+ var data = Buffer.from(id, 'hex');
+ this._pings[id] = callback;
+
+ this._log.debug({ data: data }, 'Sending PING.');
+ this.push({
+ type: 'PING',
+ flags: {
+ ACK: false
+ },
+ stream: 0,
+ data: data
+ });
+};
+
+// Answering pings
+Connection.prototype._receivePing = function _receivePing(frame) {
+ if (frame.flags.ACK) {
+ var id = frame.data.toString('hex');
+ if (id in this._pings) {
+ this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
+ var callback = this._pings[id];
+ if (callback) {
+ callback();
+ }
+ delete this._pings[id];
+ } else {
+ this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
+ }
+
+ } else {
+ this._log.debug({ data: frame.data }, 'Answering PING.');
+ this.push({
+ type: 'PING',
+ flags: {
+ ACK: true
+ },
+ stream: 0,
+ data: frame.data
+ });
+ }
+};
+
+Connection.prototype.originFrame = function originFrame(originList) {
+ this._log.debug(originList, 'emitting origin frame');
+
+ this.push({
+ type: 'ORIGIN',
+ flags: {},
+ stream: 0,
+ originList : originList,
+ });
+};
+
+// Terminating the connection
+Connection.prototype.close = function close(error) {
+ if (this._closed) {
+ this._log.warn('Trying to close an already closed connection');
+ return;
+ }
+
+ this._log.debug({ error: error }, 'Closing the connection');
+ this.push({
+ type: 'GOAWAY',
+ flags: {},
+ stream: 0,
+ last_stream: this._lastIncomingStream,
+ error: error || 'NO_ERROR'
+ });
+ this.push(null);
+ this._closed = true;
+};
+
+Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
+ this._log.debug({ error: frame.error }, 'Other end closed the connection');
+ this.push(null);
+ this._closed = true;
+ if (frame.error !== 'NO_ERROR') {
+ this.emit('peerError', frame.error);
+ }
+};
+
+// Flow control
+// ------------
+
+Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
+ // Handling of initial window size of individual streams.
+ this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
+ this.on('new_stream', function(stream) {
+ stream.upstream.setInitialWindow(this._initialStreamWindowSize);
+ });
+ this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
+ this._streamIds[0].upstream.setInitialWindow = function noop() {};
+};
+
+// The initial connection flow control window is 65535 bytes.
+var INITIAL_STREAM_WINDOW_SIZE = 65535;
+
+// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
+// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
+// stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
+// the difference between the new value and the old value.
+Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
+ if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
+ this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
+ this.emit('error', 'FLOW_CONTROL_ERROR');
+ } else {
+ this._log.debug({ size: size }, 'Changing stream initial window size.');
+ this._initialStreamWindowSize = size;
+ this._streamIds.forEach(function(stream) {
+ stream.upstream.setInitialWindow(size);
+ });
+ }
+};
diff --git a/testing/xpcshell/node-http2/lib/protocol/endpoint.js b/testing/xpcshell/node-http2/lib/protocol/endpoint.js
new file mode 100644
index 0000000000..127f4c4c5c
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/endpoint.js
@@ -0,0 +1,262 @@
+var assert = require('assert');
+
+var Serializer = require('./framer').Serializer;
+var Deserializer = require('./framer').Deserializer;
+var Compressor = require('./compressor').Compressor;
+var Decompressor = require('./compressor').Decompressor;
+var Connection = require('./connection').Connection;
+var Duplex = require('stream').Duplex;
+var Transform = require('stream').Transform;
+
+exports.Endpoint = Endpoint;
+
+// The Endpoint class
+// ==================
+
+// Public API
+// ----------
+
+// - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
+//
+// - `log`: bunyan logger of the parent
+// - `role`: 'CLIENT' or 'SERVER'
+// - `settings`: initial HTTP/2 settings
+// - `filters`: a map of functions that filter the traffic between components (for debugging or
+// intentional failure injection).
+//
+// Filter functions get three arguments:
+// 1. `frame`: the current frame
+// 2. `forward(frame)`: function that can be used to forward a frame to the next component
+// 3. `done()`: callback to signal the end of the filter process
+//
+// Valid filter names and their position in the stack:
+// - `beforeSerialization`: after compression, before serialization
+// - `beforeCompression`: after multiplexing, before compression
+// - `afterDeserialization`: after deserialization, before decompression
+// - `afterDecompression`: after decompression, before multiplexing
+//
+// * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
+//
+// * **Event: 'error' (type)**: signals an error
+//
+// * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
+//
+// * **close([error])**: close the connection with an error code
+
+// Constructor
+// -----------
+
+// The process of initialization:
+function Endpoint(log, role, settings, filters) {
+ Duplex.call(this);
+
+ // * Initializing logging infrastructure
+ this._log = log.child({ component: 'endpoint', e: this });
+
+ // * First part of the handshake process: sending and receiving the client connection header
+ // prelude.
+ assert((role === 'CLIENT') || role === 'SERVER');
+ if (role === 'CLIENT') {
+ this._writePrelude();
+ } else {
+ this._readPrelude();
+ }
+
+ // * Initialization of component. This includes the second part of the handshake process:
+ // sending the first SETTINGS frame. This is done by the connection class right after
+ // initialization.
+ this._initializeDataFlow(role, settings, filters || {});
+
+ // * Initialization of management code.
+ this._initializeManagement();
+
+ // * Initializing error handling.
+ this._initializeErrorHandling();
+}
+Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
+
+// Handshake
+// ---------
+
+var CLIENT_PRELUDE = Buffer.from('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
+
+// Writing the client header is simple and synchronous.
+Endpoint.prototype._writePrelude = function _writePrelude() {
+ this._log.debug('Sending the client connection header prelude.');
+ this.push(CLIENT_PRELUDE);
+};
+
+// The asynchronous process of reading the client header:
+Endpoint.prototype._readPrelude = function _readPrelude() {
+ // * progress in the header is tracker using a `cursor`
+ var cursor = 0;
+
+ // * `_write` is temporarily replaced by the comparator function
+ this._write = function _temporalWrite(chunk, encoding, done) {
+ // * which compares the stored header with the current `chunk` byte by byte and emits the
+ // 'error' event if there's a byte that doesn't match
+ var offset = cursor;
+ while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
+ if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
+ this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
+ 'Client connection header prelude does not match.');
+ this._error('handshake', 'PROTOCOL_ERROR');
+ return;
+ }
+ cursor += 1;
+ }
+
+ // * if the whole header is over, and there were no error then restore the original `_write`
+ // and call it with the remaining part of the current chunk
+ if (cursor === CLIENT_PRELUDE.length) {
+ this._log.debug('Successfully received the client connection header prelude.');
+ delete this._write;
+ chunk = chunk.slice(cursor - offset);
+ this._write(chunk, encoding, done);
+ }
+ };
+};
+
+// Data flow
+// ---------
+
+// +---------------------------------------------+
+// | |
+// | +-------------------------------------+ |
+// | | +---------+ +---------+ +---------+ | |
+// | | | stream1 | | stream2 | | ... | | |
+// | | +---------+ +---------+ +---------+ | |
+// | | connection | |
+// | +-------------------------------------+ |
+// | | ^ |
+// | pipe | | pipe |
+// | v | |
+// | +------------------+------------------+ |
+// | | compressor | decompressor | |
+// | +------------------+------------------+ |
+// | | ^ |
+// | pipe | | pipe |
+// | v | |
+// | +------------------+------------------+ |
+// | | serializer | deserializer | |
+// | +------------------+------------------+ |
+// | | ^ |
+// | _read() | | _write() |
+// | v | |
+// | +------------+ +-----------+ |
+// | |output queue| |input queue| |
+// +------+------------+-----+-----------+-------+
+// | ^
+// read() | | write()
+// v |
+
+function createTransformStream(filter) {
+ var transform = new Transform({ objectMode: true });
+ var push = transform.push.bind(transform);
+ transform._transform = function(frame, encoding, done) {
+ filter(frame, push, done);
+ };
+ return transform;
+}
+
+function pipeAndFilter(stream1, stream2, filter) {
+ if (filter) {
+ stream1.pipe(createTransformStream(filter)).pipe(stream2);
+ } else {
+ stream1.pipe(stream2);
+ }
+}
+
+Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
+ var firstStreamId, compressorRole, decompressorRole;
+ if (role === 'CLIENT') {
+ firstStreamId = 1;
+ compressorRole = 'REQUEST';
+ decompressorRole = 'RESPONSE';
+ } else {
+ firstStreamId = 2;
+ compressorRole = 'RESPONSE';
+ decompressorRole = 'REQUEST';
+ }
+
+ this._serializer = new Serializer(this._log);
+ this._deserializer = new Deserializer(this._log);
+ this._compressor = new Compressor(this._log, compressorRole);
+ this._decompressor = new Decompressor(this._log, decompressorRole);
+ this._connection = new Connection(this._log, firstStreamId, settings);
+
+ pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
+ pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
+ pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
+ pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
+
+ this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
+ this._decompressor.setTableSizeLimit.bind(this._decompressor));
+ this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
+ this._compressor.setTableSizeLimit.bind(this._compressor));
+};
+
+var noread = {};
+Endpoint.prototype._read = function _read() {
+ this._readableState.sync = true;
+ var moreNeeded = noread, chunk;
+ while (moreNeeded && (chunk = this._serializer.read())) {
+ moreNeeded = this.push(chunk);
+ }
+ if (moreNeeded === noread) {
+ this._serializer.once('readable', this._read.bind(this));
+ }
+ this._readableState.sync = false;
+};
+
+Endpoint.prototype._write = function _write(chunk, encoding, done) {
+ this._deserializer.write(chunk, encoding, done);
+};
+
+// Management
+// --------------
+
+Endpoint.prototype._initializeManagement = function _initializeManagement() {
+ this._connection.on('stream', this.emit.bind(this, 'stream'));
+};
+
+Endpoint.prototype.createStream = function createStream() {
+ return this._connection.createStream();
+};
+
+// Error handling
+// --------------
+
+Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
+ this._serializer.on('error', this._error.bind(this, 'serializer'));
+ this._deserializer.on('error', this._error.bind(this, 'deserializer'));
+ this._compressor.on('error', this._error.bind(this, 'compressor'));
+ this._decompressor.on('error', this._error.bind(this, 'decompressor'));
+ this._connection.on('error', this._error.bind(this, 'connection'));
+
+ this._connection.on('peerError', this.emit.bind(this, 'peerError'));
+};
+
+Endpoint.prototype._error = function _error(component, error) {
+ this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
+ this.close(error);
+ setImmediate(this.emit.bind(this, 'error', error));
+};
+
+Endpoint.prototype.close = function close(error) {
+ this._connection.close(error);
+};
+
+// Bunyan serializers
+// ------------------
+
+exports.serializers = {};
+
+var nextId = 0;
+exports.serializers.e = function(endpoint) {
+ if (!('id' in endpoint)) {
+ endpoint.id = nextId;
+ nextId += 1;
+ }
+ return endpoint.id;
+};
diff --git a/testing/xpcshell/node-http2/lib/protocol/flow.js b/testing/xpcshell/node-http2/lib/protocol/flow.js
new file mode 100644
index 0000000000..6bec857551
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/flow.js
@@ -0,0 +1,345 @@
+var assert = require('assert');
+
+// The Flow class
+// ==============
+
+// Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be
+// subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html).
+// [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex
+
+var Duplex = require('stream').Duplex;
+
+exports.Flow = Flow;
+
+// Public API
+// ----------
+
+// * **Event: 'error' (type)**: signals an error
+//
+// * **setInitialWindow(size)**: the initial flow control window size can be changed *any time*
+// ([as described in the standard][1]) using this method
+//
+// [1]: https://tools.ietf.org/html/rfc7540#section-6.9.2
+
+// API for child classes
+// ---------------------
+
+// * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames
+// with the given `flowControlId` (or every update frame if not given)
+//
+// * **_send()**: called when more frames should be pushed. The child class is expected to override
+// this (instead of the `_read` method of the Duplex class).
+//
+// * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is
+// expected to override this (instead of the `_write` method of the Duplex class).
+//
+// * **push(frame): bool**: schedules `frame` for sending.
+//
+// Returns `true` if it needs more frames in the output queue, `false` if the output queue is
+// full, and `null` if did not push the frame into the output queue (instead, it pushed it into
+// the flow control queue).
+//
+// * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA
+// frames, length of the payload for DATA frames) of the returned frame will be under `limit`.
+// Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the
+// same thing as [in the original API](https://nodejs.org/api/stream.html#stream_stream_read_0).
+//
+// * **getLastQueuedFrame(): frame**: returns the last frame in output buffers
+//
+// * **_log**: the Flow class uses the `_log` object of the parent
+
+// Constructor
+// -----------
+
+// When a HTTP/2.0 connection is first established, new streams are created with an initial flow
+// control window size of 65535 bytes.
+var INITIAL_WINDOW_SIZE = 65535;
+
+// `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched.
+function Flow(flowControlId) {
+ Duplex.call(this, { objectMode: true });
+
+ this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
+ this._flowControlId = flowControlId;
+ this._queue = [];
+ this._ended = false;
+ this._received = 0;
+}
+Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });
+
+// Incoming frames
+// ---------------
+
+// `_receive` is called when there's an incoming frame.
+Flow.prototype._receive = function _receive(frame, callback) {
+ throw new Error('The _receive(frame, callback) method has to be overridden by the child class!');
+};
+
+// `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s
+// to the flow. It emits the 'receiving' event and notifies the window size tracking code if the
+// incoming frame is a WINDOW_UPDATE.
+// [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
+Flow.prototype._write = function _write(frame, encoding, callback) {
+ var sentToUs = (this._flowControlId === undefined) || (frame.stream === this._flowControlId);
+
+ if (sentToUs && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
+ this._ended = true;
+ }
+
+ if ((frame.type === 'DATA') && (frame.data.length > 0)) {
+ this._receive(frame, function() {
+ this._received += frame.data.length;
+ if (!this._restoreWindowTimer) {
+ this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
+ }
+ callback();
+ }.bind(this));
+ }
+
+ else {
+ this._receive(frame, callback);
+ }
+
+ if (sentToUs && (frame.type === 'WINDOW_UPDATE')) {
+ this._updateWindow(frame);
+ }
+};
+
+// `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
+// a WINDOW_UPDATE that restores the flow control window of the remote end.
+// TODO: push this directly into the output queue. No need to wait for DATA frames in the queue.
+Flow.prototype._restoreWindow = function _restoreWindow() {
+ delete this._restoreWindowTimer;
+ if (!this._ended && (this._received > 0)) {
+ this.push({
+ type: 'WINDOW_UPDATE',
+ flags: {},
+ stream: this._flowControlId,
+ window_size: this._received
+ });
+ this._received = 0;
+ }
+};
+
+// Outgoing frames - sending procedure
+// -----------------------------------
+
+// flow
+// +-------------------------------------------------+
+// | |
+// +--------+ +---------+ |
+// read() | output | _read() | flow | _send() |
+// <----------| |<----------| control |<------------- |
+// | buffer | | buffer | |
+// +--------+ +---------+ |
+// | input | |
+// ---------->| |-----------------------------------> |
+// write() | buffer | _write() _receive() |
+// +--------+ |
+// | |
+// +-------------------------------------------------+
+
+// `_send` is called when more frames should be pushed to the output buffer.
+Flow.prototype._send = function _send() {
+ throw new Error('The _send() method has to be overridden by the child class!');
+};
+
+// `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more
+// items in the output queue.
+// [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
+Flow.prototype._read = function _read() {
+ // * if the flow control queue is empty, then let the user push more frames
+ if (this._queue.length === 0) {
+ this._send();
+ }
+
+ // * if there are items in the flow control queue, then let's put them into the output queue (to
+ // the extent it is possible with respect to the window size and output queue feedback)
+ else if (this._window > 0) {
+ this._readableState.sync = true; // to avoid reentrant calls
+ do {
+ var moreNeeded = this._push(this._queue[0]);
+ if (moreNeeded !== null) {
+ this._queue.shift();
+ }
+ } while (moreNeeded && (this._queue.length > 0));
+ this._readableState.sync = false;
+
+ assert((!moreNeeded) || // * output queue is full
+ (this._queue.length === 0) || // * flow control queue is empty
+ (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update
+ }
+
+ // * otherwise, come back when the flow control window is positive
+ else {
+ this.once('window_update', this._read);
+ }
+};
+
+var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383
+
+// `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control
+// size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will
+// be under `limit`.
+Flow.prototype.read = function read(limit) {
+ if (limit === 0) {
+ return Duplex.prototype.read.call(this, 0);
+ } else if (limit === -1) {
+ limit = 0;
+ } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) {
+ limit = MAX_PAYLOAD_SIZE;
+ }
+
+ // * Looking at the first frame in the queue without pulling it out if possible.
+ var frame = this._readableState.buffer[0];
+ if (!frame && !this._readableState.ended) {
+ this._read();
+ frame = this._readableState.buffer[0];
+ }
+
+ if (frame && (frame.type === 'DATA')) {
+ // * If the frame is DATA, then there's two special cases:
+ // * if the limit is 0, we shouldn't return anything
+ // * if the size of the frame is larger than limit, then the frame should be split
+ if (limit === 0) {
+ return Duplex.prototype.read.call(this, 0);
+ }
+
+ else if (frame.data.length > limit) {
+ this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit },
+ 'Splitting out forwardable part of a DATA frame.');
+ this.unshift({
+ type: 'DATA',
+ flags: {},
+ stream: frame.stream,
+ data: frame.data.slice(0, limit)
+ });
+ frame.data = frame.data.slice(limit);
+ }
+ }
+
+ return Duplex.prototype.read.call(this);
+};
+
+// `_parentPush` pushes the given `frame` into the output queue
+Flow.prototype._parentPush = function _parentPush(frame) {
+ this._log.trace({ frame: frame }, 'Pushing frame into the output queue');
+
+ if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) {
+ this._log.trace({ window: this._window, by: frame.data.length },
+ 'Decreasing flow control window size.');
+ this._window -= frame.data.length;
+ assert(this._window >= 0);
+ }
+
+ return Duplex.prototype.push.call(this, frame);
+};
+
+// `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size.
+// It is capable of splitting DATA frames into smaller parts, if the window size is not enough to
+// push the whole frame. The return value is similar to `push` except that it returns `null` if it
+// did not push the whole frame to the output queue (but maybe it did push part of the frame).
+Flow.prototype._push = function _push(frame) {
+ var data = frame && (frame.type === 'DATA') && frame.data;
+ var maxFrameLength = (this._window < 16384) ? this._window : 16384;
+
+ if (!data || (data.length <= maxFrameLength)) {
+ return this._parentPush(frame);
+ }
+
+ else if (this._window <= 0) {
+ return null;
+ }
+
+ else {
+ this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window },
+ 'Splitting out forwardable part of a DATA frame.');
+ frame.data = data.slice(maxFrameLength);
+ this._parentPush({
+ type: 'DATA',
+ flags: {},
+ stream: frame.stream,
+ data: data.slice(0, maxFrameLength)
+ });
+ return null;
+ }
+};
+
+// Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
+Flow.prototype.push = function push(frame) {
+ if (frame === null) {
+ this._log.debug('Enqueueing outgoing End Of Stream');
+ } else {
+ this._log.debug({ frame: frame }, 'Enqueueing outgoing frame');
+ }
+
+ var moreNeeded = null;
+ if (this._queue.length === 0) {
+ moreNeeded = this._push(frame);
+ }
+
+ if (moreNeeded === null) {
+ this._queue.push(frame);
+ }
+
+ return moreNeeded;
+};
+
+// `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the
+// [Stream](stream.html) class to mark the last frame with END_STREAM flag.
+Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() {
+ var readableQueue = this._readableState.buffer;
+ return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1];
+};
+
+// Outgoing frames - managing the window size
+// ------------------------------------------
+
+// Flow control window size is manipulated using the `_increaseWindow` method.
+//
+// * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled
+// again once disabled. Any attempt to re-enable flow control MUST be rejected with a
+// FLOW_CONTROL_ERROR error code.
+// * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken
+// depends on it being a stream or the connection itself.
+
+var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
+
+Flow.prototype._increaseWindow = function _increaseWindow(size) {
+ if ((this._window === Infinity) && (size !== Infinity)) {
+ this._log.error('Trying to increase flow control window after flow control was turned off.');
+ this.emit('error', 'FLOW_CONTROL_ERROR');
+ } else {
+ this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.');
+ this._window += size;
+ if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) {
+ this._log.error('Flow control window grew too large.');
+ this.emit('error', 'FLOW_CONTROL_ERROR');
+ } else {
+ if (size != 0) {
+ this.emit('window_update');
+ }
+ }
+ }
+};
+
+// The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It
+// modifies the flow control window:
+//
+// * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the
+// END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL
+// flag set is ignored.
+// * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount
+// specified in the frame.
+Flow.prototype._updateWindow = function _updateWindow(frame) {
+ this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size);
+};
+
+// A SETTINGS frame can alter the initial flow control window size for all current streams. When the
+// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by
+// calling the `setInitialWindow` method. The window size has to be modified by the difference
+// between the new value and the old value.
+Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) {
+ this._increaseWindow(initialWindow - this._initialWindow);
+ this._initialWindow = initialWindow;
+};
diff --git a/testing/xpcshell/node-http2/lib/protocol/framer.js b/testing/xpcshell/node-http2/lib/protocol/framer.js
new file mode 100644
index 0000000000..055402f8d8
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/framer.js
@@ -0,0 +1,1166 @@
+// The framer consists of two [Transform Stream][1] subclasses that operate in [object mode][2]:
+// the Serializer and the Deserializer
+// [1]: https://nodejs.org/api/stream.html#stream_class_stream_transform
+// [2]: https://nodejs.org/api/stream.html#stream_new_stream_readable_options
+var assert = require('assert');
+
+var Transform = require('stream').Transform;
+
+exports.Serializer = Serializer;
+exports.Deserializer = Deserializer;
+
+var logData = Boolean(process.env.HTTP2_LOG_DATA);
+
+var MAX_PAYLOAD_SIZE = 16384;
+var WINDOW_UPDATE_PAYLOAD_SIZE = 4;
+
+// Serializer
+// ----------
+//
+// Frame Objects
+// * * * * * * * --+---------------------------
+// | |
+// v v Buffers
+// [] -----> Payload Ser. --[buffers]--> Header Ser. --> * * * *
+// empty adds payload adds header
+// array buffers buffer
+
+function Serializer(log) {
+ this._log = log.child({ component: 'serializer' });
+ Transform.call(this, { objectMode: true });
+}
+Serializer.prototype = Object.create(Transform.prototype, { constructor: { value: Serializer } });
+
+// When there's an incoming frame object, it first generates the frame type specific part of the
+// frame (payload), and then then adds the header part which holds fields that are common to all
+// frame types (like the length of the payload).
+Serializer.prototype._transform = function _transform(frame, encoding, done) {
+ this._log.trace({ frame: frame }, 'Outgoing frame');
+
+ assert(frame.type in Serializer, 'Unknown frame type: ' + frame.type);
+
+ var buffers = [];
+ Serializer[frame.type](frame, buffers);
+ var length = Serializer.commonHeader(frame, buffers);
+
+ assert(length <= MAX_PAYLOAD_SIZE, 'Frame too large!');
+
+ for (var i = 0; i < buffers.length; i++) {
+ if (logData) {
+ this._log.trace({ data: buffers[i] }, 'Outgoing data');
+ }
+ this.push(buffers[i]);
+ }
+
+ done();
+};
+
+// Deserializer
+// ------------
+//
+// Buffers
+// * * * * --------+-------------------------
+// | |
+// v v Frame Objects
+// {} -----> Header Des. --{frame}--> Payload Des. --> * * * * * * *
+// empty adds parsed adds parsed
+// object header properties payload properties
+
+function Deserializer(log, role) {
+ this._role = role;
+ this._log = log.child({ component: 'deserializer' });
+ Transform.call(this, { objectMode: true });
+ this._next(COMMON_HEADER_SIZE);
+}
+Deserializer.prototype = Object.create(Transform.prototype, { constructor: { value: Deserializer } });
+
+// The Deserializer is stateful, and it's two main alternating states are: *waiting for header* and
+// *waiting for payload*. The state is stored in the boolean property `_waitingForHeader`.
+//
+// When entering a new state, a `_buffer` is created that will hold the accumulated data (header or
+// payload). The `_cursor` is used to track the progress.
+Deserializer.prototype._next = function(size) {
+ this._cursor = 0;
+ this._buffer = Buffer.alloc(size);
+ this._waitingForHeader = !this._waitingForHeader;
+ if (this._waitingForHeader) {
+ this._frame = {};
+ }
+};
+
+// Parsing an incoming buffer is an iterative process because it can hold multiple frames if it's
+// large enough. A `cursor` is used to track the progress in parsing the incoming `chunk`.
+Deserializer.prototype._transform = function _transform(chunk, encoding, done) {
+ var cursor = 0;
+
+ if (logData) {
+ this._log.trace({ data: chunk }, 'Incoming data');
+ }
+
+ while(cursor < chunk.length) {
+ // The content of an incoming buffer is first copied to `_buffer`. If it can't hold the full
+ // chunk, then only a part of it is copied.
+ var toCopy = Math.min(chunk.length - cursor, this._buffer.length - this._cursor);
+ chunk.copy(this._buffer, this._cursor, cursor, cursor + toCopy);
+ this._cursor += toCopy;
+ cursor += toCopy;
+
+ // When `_buffer` is full, it's content gets parsed either as header or payload depending on
+ // the actual state.
+
+ // If it's header then the parsed data is stored in a temporary variable and then the
+ // deserializer waits for the specified length payload.
+ if ((this._cursor === this._buffer.length) && this._waitingForHeader) {
+ var payloadSize = Deserializer.commonHeader(this._buffer, this._frame);
+ if (payloadSize <= MAX_PAYLOAD_SIZE) {
+ this._next(payloadSize);
+ } else {
+ this.emit('error', 'FRAME_SIZE_ERROR');
+ return;
+ }
+ }
+
+ // If it's payload then the the frame object is finalized and then gets pushed out.
+ // Unknown frame types are ignored.
+ //
+ // Note: If we just finished the parsing of a header and the payload length is 0, this branch
+ // will also run.
+ if ((this._cursor === this._buffer.length) && !this._waitingForHeader) {
+ if (this._frame.type) {
+ var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role);
+ if (error) {
+ this._log.error('Incoming frame parsing error: ' + error);
+ this.emit('error', error);
+ } else {
+ this._log.trace({ frame: this._frame }, 'Incoming frame');
+ this.push(this._frame);
+ }
+ } else {
+ this._log.error('Unknown type incoming frame');
+ // Ignore it other than logging
+ }
+ this._next(COMMON_HEADER_SIZE);
+ }
+ }
+
+ done();
+};
+
+// [Frame Header](https://tools.ietf.org/html/rfc7540#section-4.1)
+// --------------------------------------------------------------
+//
+// HTTP/2 frames share a common base format consisting of a 9-byte header followed by 0 to 2^24 - 1
+// bytes of data.
+//
+// Additional size limits can be set by specific application uses. HTTP limits the frame size to
+// 16,384 octets by default, though this can be increased by a receiver.
+//
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Length (24) |
+// +---------------+---------------+---------------+
+// | Type (8) | Flags (8) |
+// +-+-----------------------------+---------------+---------------+
+// |R| Stream Identifier (31) |
+// +-+-------------------------------------------------------------+
+// | Frame Data (0...) ...
+// +---------------------------------------------------------------+
+//
+// The fields of the frame header are defined as:
+//
+// * Length:
+// The length of the frame data expressed as an unsigned 24-bit integer. The 9 bytes of the frame
+// header are not included in this value.
+//
+// * Type:
+// The 8-bit type of the frame. The frame type determines how the remainder of the frame header
+// and data are interpreted. Implementations MUST ignore unsupported and unrecognized frame types.
+//
+// * Flags:
+// An 8-bit field reserved for frame-type specific boolean flags.
+//
+// Flags are assigned semantics specific to the indicated frame type. Flags that have no defined
+// semantics for a particular frame type MUST be ignored, and MUST be left unset (0) when sending.
+//
+// * R:
+// A reserved 1-bit field. The semantics of this bit are undefined and the bit MUST remain unset
+// (0) when sending and MUST be ignored when receiving.
+//
+// * Stream Identifier:
+// A 31-bit stream identifier. The value 0 is reserved for frames that are associated with the
+// connection as a whole as opposed to an individual stream.
+//
+// The structure and content of the remaining frame data is dependent entirely on the frame type.
+
+var COMMON_HEADER_SIZE = 9;
+
+var frameTypes = [];
+
+var frameFlags = {};
+
+var genericAttributes = ['type', 'flags', 'stream'];
+
+var typeSpecificAttributes = {};
+
+Serializer.commonHeader = function writeCommonHeader(frame, buffers) {
+ var headerBuffer = Buffer.alloc(COMMON_HEADER_SIZE);
+
+ var size = 0;
+ for (var i = 0; i < buffers.length; i++) {
+ size += buffers[i].length;
+ }
+ headerBuffer.writeUInt8(0, 0);
+ headerBuffer.writeUInt16BE(size, 1);
+
+ var typeId = frameTypes.indexOf(frame.type); // If we are here then the type is valid for sure
+ headerBuffer.writeUInt8(typeId, 3);
+
+ var flagByte = 0;
+ for (var flag in frame.flags) {
+ var position = frameFlags[frame.type].indexOf(flag);
+ assert(position !== -1, 'Unknown flag for frame type ' + frame.type + ': ' + flag);
+ if (frame.flags[flag]) {
+ flagByte |= (1 << position);
+ }
+ }
+ headerBuffer.writeUInt8(flagByte, 4);
+
+ assert((0 <= frame.stream) && (frame.stream < 0x7fffffff), frame.stream);
+ headerBuffer.writeUInt32BE(frame.stream || 0, 5);
+
+ buffers.unshift(headerBuffer);
+
+ return size;
+};
+
+Deserializer.commonHeader = function readCommonHeader(buffer, frame) {
+ if (buffer.length < 9) {
+ return 'FRAME_SIZE_ERROR';
+ }
+
+ var totallyWastedByte = buffer.readUInt8(0);
+ var length = buffer.readUInt16BE(1);
+ // We do this just for sanity checking later on, to make sure no one sent us a
+ // frame that's super large.
+ length += totallyWastedByte << 16;
+
+ frame.type = frameTypes[buffer.readUInt8(3)];
+ if (!frame.type) {
+ // We are required to ignore unknown frame types
+ return length;
+ }
+
+ frame.flags = {};
+ var flagByte = buffer.readUInt8(4);
+ var definedFlags = frameFlags[frame.type];
+ for (var i = 0; i < definedFlags.length; i++) {
+ frame.flags[definedFlags[i]] = Boolean(flagByte & (1 << i));
+ }
+
+ frame.stream = buffer.readUInt32BE(5) & 0x7fffffff;
+
+ return length;
+};
+
+// Frame types
+// ===========
+
+// Every frame type is registered in the following places:
+//
+// * `frameTypes`: a register of frame type codes (used by `commonHeader()`)
+// * `frameFlags`: a register of valid flags for frame types (used by `commonHeader()`)
+// * `typeSpecificAttributes`: a register of frame specific frame object attributes (used by
+// logging code and also serves as documentation for frame objects)
+
+// [DATA Frames](https://tools.ietf.org/html/rfc7540#section-6.1)
+// ------------------------------------------------------------
+//
+// DATA frames (type=0x0) convey arbitrary, variable-length sequences of octets associated with a
+// stream.
+//
+// The DATA frame defines the following flags:
+//
+// * END_STREAM (0x1):
+// Bit 1 being set indicates that this frame is the last that the endpoint will send for the
+// identified stream.
+// * PADDED (0x08):
+// Bit 4 being set indicates that the Pad Length field is present.
+
+frameTypes[0x0] = 'DATA';
+
+frameFlags.DATA = ['END_STREAM', 'RESERVED2', 'RESERVED4', 'PADDED'];
+
+typeSpecificAttributes.DATA = ['data'];
+
+Serializer.DATA = function writeData(frame, buffers) {
+ buffers.push(frame.data);
+};
+
+Deserializer.DATA = function readData(buffer, frame) {
+ var dataOffset = 0;
+ var paddingLength = 0;
+ if (frame.flags.PADDED) {
+ if (buffer.length < 1) {
+ // We must have at least one byte for padding control, but we don't. Bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+ paddingLength = (buffer.readUInt8(dataOffset) & 0xff);
+ dataOffset = 1;
+ }
+
+ if (paddingLength) {
+ if (paddingLength >= (buffer.length - 1)) {
+ // We don't have enough room for the padding advertised - bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.data = buffer.slice(dataOffset, -1 * paddingLength);
+ } else {
+ frame.data = buffer.slice(dataOffset);
+ }
+};
+
+// [HEADERS](https://tools.ietf.org/html/rfc7540#section-6.2)
+// --------------------------------------------------------------
+//
+// The HEADERS frame (type=0x1) allows the sender to create a stream.
+//
+// The HEADERS frame defines the following flags:
+//
+// * END_STREAM (0x1):
+// Bit 1 being set indicates that this frame is the last that the endpoint will send for the
+// identified stream.
+// * END_HEADERS (0x4):
+// The END_HEADERS bit indicates that this frame contains the entire payload necessary to provide
+// a complete set of headers.
+// * PADDED (0x08):
+// Bit 4 being set indicates that the Pad Length field is present.
+// * PRIORITY (0x20):
+// Bit 6 being set indicates that the Exlusive Flag (E), Stream Dependency, and Weight fields are
+// present.
+
+frameTypes[0x1] = 'HEADERS';
+
+frameFlags.HEADERS = ['END_STREAM', 'RESERVED2', 'END_HEADERS', 'PADDED', 'RESERVED5', 'PRIORITY'];
+
+typeSpecificAttributes.HEADERS = ['priorityDependency', 'priorityWeight', 'exclusiveDependency', 'headers', 'data'];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |Pad Length? (8)|
+// +-+-------------+---------------+-------------------------------+
+// |E| Stream Dependency? (31) |
+// +-+-------------+-----------------------------------------------+
+// | Weight? (8) |
+// +-+-------------+-----------------------------------------------+
+// | Header Block Fragment (*) ...
+// +---------------------------------------------------------------+
+// | Padding (*) ...
+// +---------------------------------------------------------------+
+//
+// The payload of a HEADERS frame contains a Headers Block
+
+Serializer.HEADERS = function writeHeadersPriority(frame, buffers) {
+ if (frame.flags.PRIORITY) {
+ var buffer = Buffer.alloc(5);
+ assert((0 <= frame.priorityDependency) && (frame.priorityDependency <= 0x7fffffff), frame.priorityDependency);
+ buffer.writeUInt32BE(frame.priorityDependency, 0);
+ if (frame.exclusiveDependency) {
+ buffer[0] |= 0x80;
+ }
+ assert((0 <= frame.priorityWeight) && (frame.priorityWeight <= 0xff), frame.priorityWeight);
+ buffer.writeUInt8(frame.priorityWeight, 4);
+ buffers.push(buffer);
+ }
+ buffers.push(frame.data);
+};
+
+Deserializer.HEADERS = function readHeadersPriority(buffer, frame) {
+ var minFrameLength = 0;
+ if (frame.flags.PADDED) {
+ minFrameLength += 1;
+ }
+ if (frame.flags.PRIORITY) {
+ minFrameLength += 5;
+ }
+ if (buffer.length < minFrameLength) {
+ // Peer didn't send enough data - bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+
+ var dataOffset = 0;
+ var paddingLength = 0;
+ if (frame.flags.PADDED) {
+ paddingLength = (buffer.readUInt8(dataOffset) & 0xff);
+ dataOffset = 1;
+ }
+
+ if (frame.flags.PRIORITY) {
+ var dependencyData = Buffer.alloc(4);
+ buffer.copy(dependencyData, 0, dataOffset, dataOffset + 4);
+ dataOffset += 4;
+ frame.exclusiveDependency = !!(dependencyData[0] & 0x80);
+ dependencyData[0] &= 0x7f;
+ frame.priorityDependency = dependencyData.readUInt32BE(0);
+ frame.priorityWeight = buffer.readUInt8(dataOffset);
+ dataOffset += 1;
+ }
+
+ if (paddingLength) {
+ if ((buffer.length - dataOffset) < paddingLength) {
+ // Not enough data left to satisfy the advertised padding - bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.data = buffer.slice(dataOffset, -1 * paddingLength);
+ } else {
+ frame.data = buffer.slice(dataOffset);
+ }
+};
+
+// [PRIORITY](https://tools.ietf.org/html/rfc7540#section-6.3)
+// -------------------------------------------------------
+//
+// The PRIORITY frame (type=0x2) specifies the sender-advised priority of a stream.
+//
+// The PRIORITY frame does not define any flags.
+
+frameTypes[0x2] = 'PRIORITY';
+
+frameFlags.PRIORITY = [];
+
+typeSpecificAttributes.PRIORITY = ['priorityDependency', 'priorityWeight', 'exclusiveDependency'];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |E| Stream Dependency? (31) |
+// +-+-------------+-----------------------------------------------+
+// | Weight? (8) |
+// +-+-------------+
+//
+// The payload of a PRIORITY frame contains an exclusive bit, a 31-bit dependency, and an 8-bit weight
+
+Serializer.PRIORITY = function writePriority(frame, buffers) {
+ var buffer = Buffer.alloc(5);
+ assert((0 <= frame.priorityDependency) && (frame.priorityDependency <= 0x7fffffff), frame.priorityDependency);
+ buffer.writeUInt32BE(frame.priorityDependency, 0);
+ if (frame.exclusiveDependency) {
+ buffer[0] |= 0x80;
+ }
+ assert((0 <= frame.priorityWeight) && (frame.priorityWeight <= 0xff), frame.priorityWeight);
+ buffer.writeUInt8(frame.priorityWeight, 4);
+
+ buffers.push(buffer);
+};
+
+Deserializer.PRIORITY = function readPriority(buffer, frame) {
+ if (buffer.length < 5) {
+ // PRIORITY frames are 5 bytes long. Bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+ var dependencyData = Buffer.alloc(4);
+ buffer.copy(dependencyData, 0, 0, 4);
+ frame.exclusiveDependency = !!(dependencyData[0] & 0x80);
+ dependencyData[0] &= 0x7f;
+ frame.priorityDependency = dependencyData.readUInt32BE(0);
+ frame.priorityWeight = buffer.readUInt8(4);
+};
+
+// [RST_STREAM](https://tools.ietf.org/html/rfc7540#section-6.4)
+// -----------------------------------------------------------
+//
+// The RST_STREAM frame (type=0x3) allows for abnormal termination of a stream.
+//
+// No type-flags are defined.
+
+frameTypes[0x3] = 'RST_STREAM';
+
+frameFlags.RST_STREAM = [];
+
+typeSpecificAttributes.RST_STREAM = ['error'];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Error Code (32) |
+// +---------------------------------------------------------------+
+//
+// The RST_STREAM frame contains a single unsigned, 32-bit integer identifying the error
+// code (see Error Codes). The error code indicates why the stream is being terminated.
+
+Serializer.RST_STREAM = function writeRstStream(frame, buffers) {
+ var buffer = Buffer.alloc(4);
+ var code = errorCodes.indexOf(frame.error);
+ assert((0 <= code) && (code <= 0xffffffff), code);
+ buffer.writeUInt32BE(code, 0);
+ buffers.push(buffer);
+};
+
+Deserializer.RST_STREAM = function readRstStream(buffer, frame) {
+ if (buffer.length < 4) {
+ // RST_STREAM is 4 bytes long. Bad peer!
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.error = errorCodes[buffer.readUInt32BE(0)];
+ if (!frame.error) {
+ // Unknown error codes are considered equivalent to INTERNAL_ERROR
+ frame.error = 'INTERNAL_ERROR';
+ }
+};
+
+// [SETTINGS](https://tools.ietf.org/html/rfc7540#section-6.5)
+// -------------------------------------------------------
+//
+// The SETTINGS frame (type=0x4) conveys configuration parameters that affect how endpoints
+// communicate.
+//
+// The SETTINGS frame defines the following flag:
+
+// * ACK (0x1):
+// Bit 1 being set indicates that this frame acknowledges receipt and application of the peer's
+// SETTINGS frame.
+frameTypes[0x4] = 'SETTINGS';
+
+frameFlags.SETTINGS = ['ACK'];
+
+typeSpecificAttributes.SETTINGS = ['settings'];
+
+// The payload of a SETTINGS frame consists of zero or more settings. Each setting consists of a
+// 16-bit identifier, and an unsigned 32-bit value.
+//
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Identifier(16) | Value (32) |
+// +-----------------+---------------------------------------------+
+// ...Value |
+// +---------------------------------+
+//
+// Each setting in a SETTINGS frame replaces the existing value for that setting. Settings are
+// processed in the order in which they appear, and a receiver of a SETTINGS frame does not need to
+// maintain any state other than the current value of settings. Therefore, the value of a setting
+// is the last value that is seen by a receiver. This permits the inclusion of the same settings
+// multiple times in the same SETTINGS frame, though doing so does nothing other than waste
+// connection capacity.
+
+Serializer.SETTINGS = function writeSettings(frame, buffers) {
+ var settings = [], settingsLeft = Object.keys(frame.settings);
+ definedSettings.forEach(function(setting, id) {
+ if (setting.name in frame.settings) {
+ settingsLeft.splice(settingsLeft.indexOf(setting.name), 1);
+ var value = frame.settings[setting.name];
+ settings.push({ id: id, value: setting.flag ? Boolean(value) : value });
+ }
+ });
+ assert(settingsLeft.length === 0, 'Unknown settings: ' + settingsLeft.join(', '));
+
+ var buffer = Buffer.alloc(settings.length * 6);
+ for (var i = 0; i < settings.length; i++) {
+ buffer.writeUInt16BE(settings[i].id & 0xffff, i*6);
+ buffer.writeUInt32BE(settings[i].value, i*6 + 2);
+ }
+
+ buffers.push(buffer);
+};
+
+Deserializer.SETTINGS = function readSettings(buffer, frame, role) {
+ frame.settings = {};
+
+ // Receipt of a SETTINGS frame with the ACK flag set and a length
+ // field value other than 0 MUST be treated as a connection error
+ // (Section 5.4.1) of type FRAME_SIZE_ERROR.
+ if(frame.flags.ACK && buffer.length != 0) {
+ return 'FRAME_SIZE_ERROR';
+ }
+
+ if (buffer.length % 6 !== 0) {
+ return 'PROTOCOL_ERROR';
+ }
+ for (var i = 0; i < buffer.length / 6; i++) {
+ var id = buffer.readUInt16BE(i*6) & 0xffff;
+ var setting = definedSettings[id];
+ if (setting) {
+ if (role == 'CLIENT' && setting.name == 'SETTINGS_ENABLE_PUSH') {
+ return 'SETTINGS frame on client got SETTINGS_ENABLE_PUSH';
+ }
+ var value = buffer.readUInt32BE(i*6 + 2);
+ frame.settings[setting.name] = setting.flag ? Boolean(value & 0x1) : value;
+ }
+ }
+};
+
+// The following settings are defined:
+var definedSettings = [];
+
+// * SETTINGS_HEADER_TABLE_SIZE (1):
+// Allows the sender to inform the remote endpoint of the size of the header compression table
+// used to decode header blocks.
+definedSettings[1] = { name: 'SETTINGS_HEADER_TABLE_SIZE', flag: false };
+
+// * SETTINGS_ENABLE_PUSH (2):
+// This setting can be use to disable server push. An endpoint MUST NOT send a PUSH_PROMISE frame
+// if it receives this setting set to a value of 0. The default value is 1, which indicates that
+// push is permitted.
+definedSettings[2] = { name: 'SETTINGS_ENABLE_PUSH', flag: true };
+
+// * SETTINGS_MAX_CONCURRENT_STREAMS (3):
+// indicates the maximum number of concurrent streams that the sender will allow.
+definedSettings[3] = { name: 'SETTINGS_MAX_CONCURRENT_STREAMS', flag: false };
+
+// * SETTINGS_INITIAL_WINDOW_SIZE (4):
+// indicates the sender's initial stream window size (in bytes) for new streams.
+definedSettings[4] = { name: 'SETTINGS_INITIAL_WINDOW_SIZE', flag: false };
+
+// * SETTINGS_MAX_FRAME_SIZE (5):
+// indicates the maximum size of a frame the receiver will allow.
+definedSettings[5] = { name: 'SETTINGS_MAX_FRAME_SIZE', flag: false };
+
+// [PUSH_PROMISE](https://tools.ietf.org/html/rfc7540#section-6.6)
+// ---------------------------------------------------------------
+//
+// The PUSH_PROMISE frame (type=0x5) is used to notify the peer endpoint in advance of streams the
+// sender intends to initiate.
+//
+// The PUSH_PROMISE frame defines the following flags:
+//
+// * END_PUSH_PROMISE (0x4):
+// The END_PUSH_PROMISE bit indicates that this frame contains the entire payload necessary to
+// provide a complete set of headers.
+
+frameTypes[0x5] = 'PUSH_PROMISE';
+
+frameFlags.PUSH_PROMISE = ['RESERVED1', 'RESERVED2', 'END_PUSH_PROMISE', 'PADDED'];
+
+typeSpecificAttributes.PUSH_PROMISE = ['promised_stream', 'headers', 'data'];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |Pad Length? (8)|
+// +-+-------------+-----------------------------------------------+
+// |X| Promised-Stream-ID (31) |
+// +-+-------------------------------------------------------------+
+// | Header Block Fragment (*) ...
+// +---------------------------------------------------------------+
+// | Padding (*) ...
+// +---------------------------------------------------------------+
+//
+// The PUSH_PROMISE frame includes the unsigned 31-bit identifier of
+// the stream the endpoint plans to create along with a minimal set of headers that provide
+// additional context for the stream.
+
+Serializer.PUSH_PROMISE = function writePushPromise(frame, buffers) {
+ var buffer = Buffer.alloc(4);
+
+ var promised_stream = frame.promised_stream;
+ assert((0 <= promised_stream) && (promised_stream <= 0x7fffffff), promised_stream);
+ buffer.writeUInt32BE(promised_stream, 0);
+
+ buffers.push(buffer);
+ buffers.push(frame.data);
+};
+
+Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) {
+ if (buffer.length < 4) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ var dataOffset = 0;
+ var paddingLength = 0;
+ if (frame.flags.PADDED) {
+ if (buffer.length < 5) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ paddingLength = (buffer.readUInt8(dataOffset) & 0xff);
+ dataOffset = 1;
+ }
+ frame.promised_stream = buffer.readUInt32BE(dataOffset) & 0x7fffffff;
+ dataOffset += 4;
+ if (paddingLength) {
+ if ((buffer.length - dataOffset) < paddingLength) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.data = buffer.slice(dataOffset, -1 * paddingLength);
+ } else {
+ frame.data = buffer.slice(dataOffset);
+ }
+};
+
+// [PING](https://tools.ietf.org/html/rfc7540#section-6.7)
+// -----------------------------------------------
+//
+// The PING frame (type=0x6) is a mechanism for measuring a minimal round-trip time from the
+// sender, as well as determining whether an idle connection is still functional.
+//
+// The PING frame defines one type-specific flag:
+//
+// * ACK (0x1):
+// Bit 1 being set indicates that this PING frame is a PING response.
+
+frameTypes[0x6] = 'PING';
+
+frameFlags.PING = ['ACK'];
+
+typeSpecificAttributes.PING = ['data'];
+
+// In addition to the frame header, PING frames MUST contain 8 additional octets of opaque data.
+
+Serializer.PING = function writePing(frame, buffers) {
+ buffers.push(frame.data);
+};
+
+Deserializer.PING = function readPing(buffer, frame) {
+ if (buffer.length !== 8) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.data = buffer;
+};
+
+// [GOAWAY](https://tools.ietf.org/html/rfc7540#section-6.8)
+// ---------------------------------------------------
+//
+// The GOAWAY frame (type=0x7) informs the remote peer to stop creating streams on this connection.
+//
+// The GOAWAY frame does not define any flags.
+
+frameTypes[0x7] = 'GOAWAY';
+
+frameFlags.GOAWAY = [];
+
+typeSpecificAttributes.GOAWAY = ['last_stream', 'error'];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// |X| Last-Stream-ID (31) |
+// +-+-------------------------------------------------------------+
+// | Error Code (32) |
+// +---------------------------------------------------------------+
+//
+// The last stream identifier in the GOAWAY frame contains the highest numbered stream identifier
+// for which the sender of the GOAWAY frame has received frames on and might have taken some action
+// on.
+//
+// The GOAWAY frame also contains a 32-bit error code (see Error Codes) that contains the reason for
+// closing the connection.
+
+Serializer.GOAWAY = function writeGoaway(frame, buffers) {
+ var buffer = Buffer.alloc(8);
+
+ var last_stream = frame.last_stream;
+ assert((0 <= last_stream) && (last_stream <= 0x7fffffff), last_stream);
+ buffer.writeUInt32BE(last_stream, 0);
+
+ var code = errorCodes.indexOf(frame.error);
+ assert((0 <= code) && (code <= 0xffffffff), code);
+ buffer.writeUInt32BE(code, 4);
+
+ buffers.push(buffer);
+};
+
+Deserializer.GOAWAY = function readGoaway(buffer, frame) {
+ if (buffer.length !== 8) {
+ // GOAWAY must have 8 bytes
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.last_stream = buffer.readUInt32BE(0) & 0x7fffffff;
+ frame.error = errorCodes[buffer.readUInt32BE(4)];
+ if (!frame.error) {
+ // Unknown error types are to be considered equivalent to INTERNAL ERROR
+ frame.error = 'INTERNAL_ERROR';
+ }
+};
+
+// [WINDOW_UPDATE](https://tools.ietf.org/html/rfc7540#section-6.9)
+// -----------------------------------------------------------------
+//
+// The WINDOW_UPDATE frame (type=0x8) is used to implement flow control.
+//
+// The WINDOW_UPDATE frame does not define any flags.
+
+frameTypes[0x8] = 'WINDOW_UPDATE';
+
+frameFlags.WINDOW_UPDATE = [];
+
+typeSpecificAttributes.WINDOW_UPDATE = ['window_size'];
+
+// The payload of a WINDOW_UPDATE frame is a 32-bit value indicating the additional number of bytes
+// that the sender can transmit in addition to the existing flow control window. The legal range
+// for this field is 1 to 2^31 - 1 (0x7fffffff) bytes; the most significant bit of this value is
+// reserved.
+
+Serializer.WINDOW_UPDATE = function writeWindowUpdate(frame, buffers) {
+ var buffer = Buffer.alloc(4);
+
+ var window_size = frame.window_size;
+ assert((0 < window_size) && (window_size <= 0x7fffffff), window_size);
+ buffer.writeUInt32BE(window_size, 0);
+
+ buffers.push(buffer);
+};
+
+Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame) {
+ if (buffer.length !== WINDOW_UPDATE_PAYLOAD_SIZE) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.window_size = buffer.readUInt32BE(0) & 0x7fffffff;
+ if (frame.window_size === 0) {
+ return 'PROTOCOL_ERROR';
+ }
+};
+
+// [CONTINUATION](https://tools.ietf.org/html/rfc7540#section-6.10)
+// ------------------------------------------------------------
+//
+// The CONTINUATION frame (type=0x9) is used to continue a sequence of header block fragments.
+//
+// The CONTINUATION frame defines the following flag:
+//
+// * END_HEADERS (0x4):
+// The END_HEADERS bit indicates that this frame ends the sequence of header block fragments
+// necessary to provide a complete set of headers.
+
+frameTypes[0x9] = 'CONTINUATION';
+
+frameFlags.CONTINUATION = ['RESERVED1', 'RESERVED2', 'END_HEADERS'];
+
+typeSpecificAttributes.CONTINUATION = ['headers', 'data'];
+
+Serializer.CONTINUATION = function writeContinuation(frame, buffers) {
+ buffers.push(frame.data);
+};
+
+Deserializer.CONTINUATION = function readContinuation(buffer, frame) {
+ frame.data = buffer;
+};
+
+// [ALTSVC](https://tools.ietf.org/html/rfc7838#section-4)
+// ------------------------------------------------------------
+//
+// The ALTSVC frame (type=0xA) advertises the availability of an alternative service to the client.
+//
+// The ALTSVC frame does not define any flags.
+
+frameTypes[0xA] = 'ALTSVC';
+
+frameFlags.ALTSVC = [];
+
+// 0 1 2 3
+// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+// | Origin-Len (16) | Origin? (*) ...
+// +-------------------------------+----------------+--------------+
+// | Alt-Svc-Field-Value (*) ...
+// +---------------------------------------------------------------+
+//
+// The ALTSVC frame contains the following fields:
+//
+// Origin-Len: An unsigned, 16-bit integer indicating the length, in
+// octets, of the Origin field.
+//
+// Origin: An OPTIONAL sequence of characters containing ASCII
+// serialisation of an origin ([RFC6454](https://tools.ietf.org/html/rfc6454),
+// Section 6.2) that the alternate service is applicable to.
+//
+// Alt-Svc-Field-Value: A sequence of octets (length determined by
+// subtracting the length of all preceding fields from the frame
+// length) containing a value identical to the Alt-Svc field value
+// defined in (Section 3)[https://tools.ietf.org/html/rfc7838#section-3]
+// (ABNF production "Alt-Svc").
+
+typeSpecificAttributes.ALTSVC = ['maxAge', 'port', 'protocolID', 'host',
+ 'origin'];
+
+function istchar(c) {
+ return ('!#$&\'*+-.^_`|~1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.indexOf(c) > -1);
+}
+
+function hexencode(s) {
+ var t = '';
+ for (var i = 0; i < s.length; i++) {
+ if (!istchar(s[i])) {
+ t += '%';
+ t += Buffer.from(s[i]).toString('hex');
+ } else {
+ t += s[i];
+ }
+ }
+ return t;
+}
+
+Serializer.ALTSVC = function writeAltSvc(frame, buffers) {
+ var buffer = Buffer.alloc(2);
+ buffer.writeUInt16BE(frame.origin.length, 0);
+ buffers.push(buffer);
+ buffers.push(Buffer.from(frame.origin, 'ascii'));
+
+ var fieldValue = hexencode(frame.protocolID) + '="' + frame.host + ':' + frame.port + '"';
+ if (frame.maxAge !== 86400) { // 86400 is the default
+ fieldValue += "; ma=" + frame.maxAge;
+ }
+
+ buffers.push(Buffer.from(fieldValue, 'ascii'));
+};
+
+function stripquotes(s) {
+ var start = 0;
+ var end = s.length;
+ while ((start < end) && (s[start] === '"')) {
+ start++;
+ }
+ while ((end > start) && (s[end - 1] === '"')) {
+ end--;
+ }
+ if (start >= end) {
+ return "";
+ }
+ return s.substring(start, end);
+}
+
+function splitNameValue(nvpair) {
+ var eq = -1;
+ var inQuotes = false;
+
+ for (var i = 0; i < nvpair.length; i++) {
+ if (nvpair[i] === '"') {
+ inQuotes = !inQuotes;
+ continue;
+ }
+ if (inQuotes) {
+ continue;
+ }
+ if (nvpair[i] === '=') {
+ eq = i;
+ break;
+ }
+ }
+
+ if (eq === -1) {
+ return {'name': nvpair, 'value': null};
+ }
+
+ var name = stripquotes(nvpair.substring(0, eq).trim());
+ var value = stripquotes(nvpair.substring(eq + 1).trim());
+ return {'name': name, 'value': value};
+}
+
+function splitHeaderParameters(hv) {
+ return parseHeaderValue(hv, ';', splitNameValue);
+}
+
+function parseHeaderValue(hv, separator, callback) {
+ var start = 0;
+ var inQuotes = false;
+ var values = [];
+
+ for (var i = 0; i < hv.length; i++) {
+ if (hv[i] === '"') {
+ inQuotes = !inQuotes;
+ continue;
+ }
+ if (inQuotes) {
+ // Just skip this
+ continue;
+ }
+ if (hv[i] === separator) {
+ var newValue = hv.substring(start, i).trim();
+ if (newValue.length > 0) {
+ newValue = callback(newValue);
+ values.push(newValue);
+ }
+ start = i + 1;
+ }
+ }
+
+ var newValue = hv.substring(start).trim();
+ if (newValue.length > 0) {
+ newValue = callback(newValue);
+ values.push(newValue);
+ }
+
+ return values;
+}
+
+function rsplit(s, delim, count) {
+ var nsplits = 0;
+ var end = s.length;
+ var rval = [];
+ for (var i = s.length - 1; i >= 0; i--) {
+ if (s[i] === delim) {
+ var t = s.substring(i + 1, end);
+ end = i;
+ rval.unshift(t);
+ nsplits++;
+ if (nsplits === count) {
+ break;
+ }
+ }
+ }
+ if (end !== 0) {
+ rval.unshift(s.substring(0, end));
+ }
+ return rval;
+}
+
+function ishex(c) {
+ return ('0123456789ABCDEFabcdef'.indexOf(c) > -1);
+}
+
+function unescape(s) {
+ var i = 0;
+ var t = '';
+ while (i < s.length) {
+ if (s[i] != '%' || !ishex(s[i + 1]) || !ishex(s[i + 2])) {
+ t += s[i];
+ } else {
+ ++i;
+ var hexvalue = '';
+ if (i < s.length) {
+ hexvalue += s[i];
+ ++i;
+ }
+ if (i < s.length) {
+ hexvalue += s[i];
+ }
+ if (hexvalue.length > 0) {
+ t += Buffer.from(hexvalue, 'hex').toString();
+ } else {
+ t += '%';
+ }
+ }
+
+ ++i;
+ }
+ return t;
+}
+
+Deserializer.ALTSVC = function readAltSvc(buffer, frame) {
+ if (buffer.length < 2) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ var originLength = buffer.readUInt16BE(0);
+ if ((buffer.length - 2) < originLength) {
+ return 'FRAME_SIZE_ERROR';
+ }
+ frame.origin = buffer.toString('ascii', 2, 2 + originLength);
+ var fieldValue = buffer.toString('ascii', 2 + originLength);
+ var values = parseHeaderValue(fieldValue, ',', splitHeaderParameters);
+ if (values.length > 1) {
+ // TODO - warn that we only use one here
+ }
+ if (values.length === 0) {
+ // Well that's a malformed frame. Just ignore it.
+ return;
+ }
+
+ var chosenAltSvc = values[0];
+ frame.maxAge = 86400; // Default
+ for (var i = 0; i < chosenAltSvc.length; i++) {
+ if (i === 0) {
+ // This corresponds to the protocolID="<host>:<port>" item
+ frame.protocolID = unescape(chosenAltSvc[i].name);
+ var hostport = rsplit(chosenAltSvc[i].value, ':', 1);
+ frame.host = hostport[0];
+ frame.port = parseInt(hostport[1], 10);
+ } else if (chosenAltSvc[i].name == 'ma') {
+ frame.maxAge = parseInt(chosenAltSvc[i].value, 10);
+ }
+ // Otherwise, we just ignore this
+ }
+};
+
+// frame 0xB was BLOCKED and some versions of chrome will
+// throw PROTOCOL_ERROR upon seeing it with non 0 payload
+
+frameTypes[0xC] = 'ORIGIN';
+frameFlags.ORIGIN = [];
+typeSpecificAttributes.ORIGIN = ['originList'];
+
+Serializer.ORIGIN = function writeOrigin(frame, buffers) {
+ for (var i = 0; i < frame.originList.length; i++) {
+ var buffer = Buffer.alloc(2);
+ buffer.writeUInt16BE(frame.originList[i].length, 0);
+ buffers.push(buffer);
+ buffers.push(Buffer.from(frame.originList[i], 'ascii'));
+ }
+};
+
+Deserializer.ORIGIN = function readOrigin(buffer, frame) {
+ // ignored
+};
+
+
+// [Error Codes](https://tools.ietf.org/html/rfc7540#section-7)
+// ------------------------------------------------------------
+
+var errorCodes = [
+ 'NO_ERROR',
+ 'PROTOCOL_ERROR',
+ 'INTERNAL_ERROR',
+ 'FLOW_CONTROL_ERROR',
+ 'SETTINGS_TIMEOUT',
+ 'STREAM_CLOSED',
+ 'FRAME_SIZE_ERROR',
+ 'REFUSED_STREAM',
+ 'CANCEL',
+ 'COMPRESSION_ERROR',
+ 'CONNECT_ERROR',
+ 'ENHANCE_YOUR_CALM',
+ 'INADEQUATE_SECURITY',
+ 'HTTP_1_1_REQUIRED'
+];
+
+// Logging
+// -------
+
+// [Bunyan serializers](https://github.com/trentm/node-bunyan#serializers) to improve logging output
+// for debug messages emitted in this component.
+exports.serializers = {};
+
+// * `frame` serializer: it transforms data attributes from Buffers to hex strings and filters out
+// flags that are not present.
+var frameCounter = 0;
+exports.serializers.frame = function(frame) {
+ if (!frame) {
+ return null;
+ }
+
+ if ('id' in frame) {
+ return frame.id;
+ }
+
+ frame.id = frameCounter;
+ frameCounter += 1;
+
+ var logEntry = { id: frame.id };
+ genericAttributes.concat(typeSpecificAttributes[frame.type]).forEach(function(name) {
+ logEntry[name] = frame[name];
+ });
+
+ if (frame.data instanceof Buffer) {
+ if (logEntry.data.length > 50) {
+ logEntry.data = frame.data.slice(0, 47).toString('hex') + '...';
+ } else {
+ logEntry.data = frame.data.toString('hex');
+ }
+
+ if (!('length' in logEntry)) {
+ logEntry.length = frame.data.length;
+ }
+ }
+
+ if (frame.promised_stream instanceof Object) {
+ logEntry.promised_stream = 'stream-' + frame.promised_stream.id;
+ }
+
+ logEntry.flags = Object.keys(frame.flags || {}).filter(function(name) {
+ return frame.flags[name] === true;
+ });
+
+ return logEntry;
+};
+
+// * `data` serializer: it simply transforms a buffer to a hex string.
+exports.serializers.data = function(data) {
+ return data.toString('hex');
+};
diff --git a/testing/xpcshell/node-http2/lib/protocol/index.js b/testing/xpcshell/node-http2/lib/protocol/index.js
new file mode 100644
index 0000000000..0f3720e2ce
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/index.js
@@ -0,0 +1,91 @@
+// This is an implementation of the [HTTP/2][http2]
+// framing layer for [node.js][node].
+//
+// The main building blocks are [node.js streams][node-stream] that are connected through pipes.
+//
+// The main components are:
+//
+// * [Endpoint](endpoint.html): represents an HTTP/2 endpoint (client or server). It's
+// responsible for the the first part of the handshake process (sending/receiving the
+// [connection header][http2-connheader]) and manages other components (framer, compressor,
+// connection, streams) that make up a client or server.
+//
+// * [Connection](connection.html): multiplexes the active HTTP/2 streams, manages connection
+// lifecycle and settings, and responsible for enforcing the connection level limits (flow
+// control, initiated stream limit)
+//
+// * [Stream](stream.html): implementation of the [HTTP/2 stream concept][http2-stream].
+// Implements the [stream state machine][http2-streamstate] defined by the standard, provides
+// management methods and events for using the stream (sending/receiving headers, data, etc.),
+// and enforces stream level constraints (flow control, sending only legal frames).
+//
+// * [Flow](flow.html): implements flow control for Connection and Stream as parent class.
+//
+// * [Compressor and Decompressor](compressor.html): compression and decompression of HEADER and
+// PUSH_PROMISE frames
+//
+// * [Serializer and Deserializer](framer.html): the lowest layer in the stack that transforms
+// between the binary and the JavaScript object representation of HTTP/2 frames
+//
+// [http2]: https://tools.ietf.org/html/rfc7540
+// [http2-connheader]: https://tools.ietf.org/html/rfc7540#section-3.5
+// [http2-stream]: https://tools.ietf.org/html/rfc7540#section-5
+// [http2-streamstate]: https://tools.ietf.org/html/rfc7540#section-5.1
+// [node]: https://nodejs.org/
+// [node-stream]: https://nodejs.org/api/stream.html
+// [node-https]: https://nodejs.org/api/https.html
+// [node-http]: https://nodejs.org/api/http.html
+
+exports.VERSION = 'h2';
+
+exports.Endpoint = require('./endpoint').Endpoint;
+
+/* Bunyan serializers exported by submodules that are worth adding when creating a logger. */
+exports.serializers = {};
+var modules = ['./framer', './compressor', './flow', './connection', './stream', './endpoint'];
+modules.map(require).forEach(function(module) {
+ for (var name in module.serializers) {
+ exports.serializers[name] = module.serializers[name];
+ }
+});
+
+/*
+ Stream API Endpoint API
+ Stream data
+
+ | ^ | ^
+ | | | |
+ | | | |
+ +-----------|------------|---------------------------------------+
+ | | | Endpoint |
+ | | | |
+ | +-------|------------|-----------------------------------+ |
+ | | | | Connection | |
+ | | v | | |
+ | | +-----------------------+ +-------------------- | |
+ | | | Stream | | Stream ... | |
+ | | +-----------------------+ +-------------------- | |
+ | | | ^ | ^ | |
+ | | v | v | | |
+ | | +------------+--+--------+--+------------+- ... | |
+ | | | ^ | |
+ | | | | | |
+ | +-----------------------|--------|-----------------------+ |
+ | | | |
+ | v | |
+ | +--------------------------+ +--------------------------+ |
+ | | Compressor | | Decompressor | |
+ | +--------------------------+ +--------------------------+ |
+ | | ^ |
+ | v | |
+ | +--------------------------+ +--------------------------+ |
+ | | Serializer | | Deserializer | |
+ | +--------------------------+ +--------------------------+ |
+ | | ^ |
+ +---------------------------|--------|---------------------------+
+ | |
+ v |
+
+ Raw data
+
+*/
diff --git a/testing/xpcshell/node-http2/lib/protocol/stream.js b/testing/xpcshell/node-http2/lib/protocol/stream.js
new file mode 100644
index 0000000000..b80dff0098
--- /dev/null
+++ b/testing/xpcshell/node-http2/lib/protocol/stream.js
@@ -0,0 +1,677 @@
+var assert = require('assert');
+
+// The Stream class
+// ================
+
+// Stream is a [Duplex stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex)
+// subclass that implements the [HTTP/2 Stream](https://tools.ietf.org/html/rfc7540#section-5)
+// concept. It has two 'sides': one that is used by the user to send/receive data (the `stream`
+// object itself) and one that is used by a Connection to read/write frames to/from the other peer
+// (`stream.upstream`).
+
+var Duplex = require('stream').Duplex;
+
+exports.Stream = Stream;
+
+// Public API
+// ----------
+
+// * **new Stream(log, connection)**: create a new Stream
+//
+// * **Event: 'headers' (headers)**: signals incoming headers
+//
+// * **Event: 'promise' (stream, headers)**: signals an incoming push promise
+//
+// * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0
+// (highest priority) and 2^31-1 (lowest priority). Default value is 2^30.
+//
+// * **Event: 'error' (type)**: signals an error
+//
+// * **headers(headers)**: send headers
+//
+// * **promise(headers): Stream**: promise a stream
+//
+// * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer
+// too, but once it is set locally, it can not be changed remotely.
+//
+// * **reset(error)**: reset the stream with an error code
+//
+// * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames
+// that are to be sent/arrived to/from the peer and are related to this stream.
+//
+// Headers are always in the [regular node.js header format][1].
+// [1]: https://nodejs.org/api/http.html#http_message_headers
+
+// Constructor
+// -----------
+
+// The main aspects of managing the stream are:
+function Stream(log, connection) {
+ Duplex.call(this);
+
+ // * logging
+ this._log = log.child({ component: 'stream', s: this });
+
+ // * receiving and sending stream management commands
+ this._initializeManagement();
+
+ // * sending and receiving frames to/from the upstream connection
+ this._initializeDataFlow();
+
+ // * maintaining the state of the stream (idle, open, closed, etc.) and error detection
+ this._initializeState();
+
+ this.connection = connection;
+ this.sentEndStream = false;
+}
+
+Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } });
+
+// Managing the stream
+// -------------------
+
+// the default stream priority is 2^30
+var DEFAULT_PRIORITY = Math.pow(2, 30);
+var MAX_PRIORITY = Math.pow(2, 31) - 1;
+
+// PUSH_PROMISE and HEADERS are forwarded to the user through events.
+Stream.prototype._initializeManagement = function _initializeManagement() {
+ this._resetSent = false;
+ this._priority = DEFAULT_PRIORITY;
+ this._letPeerPrioritize = true;
+};
+
+Stream.prototype.promise = function promise(headers) {
+ var stream = new Stream(this._log, this.connection);
+ stream._priority = Math.min(this._priority + 1, MAX_PRIORITY);
+ this._pushUpstream({
+ type: 'PUSH_PROMISE',
+ flags: {},
+ stream: this.id,
+ promised_stream: stream,
+ headers: headers
+ });
+ return stream;
+};
+
+Stream.prototype._onPromise = function _onPromise(frame) {
+ this.emit('promise', frame.promised_stream, frame.headers);
+};
+
+Stream.prototype.headers = function headers(headers) {
+ this._pushUpstream({
+ type: 'HEADERS',
+ flags: {},
+ stream: this.id,
+ headers: headers
+ });
+};
+
+Stream.prototype.trailers = function trailers(trailers) {
+ this.sentEndStream = true;
+ this._pushUpstream({
+ type: 'HEADERS',
+ flags: {'END_STREAM': true},
+ stream: this.id,
+ headers: trailers
+ });
+};
+
+Stream.prototype._onHeaders = function _onHeaders(frame) {
+ if (frame.priority !== undefined) {
+ this.priority(frame.priority, true);
+ }
+ this.emit('headers', frame.headers);
+};
+
+Stream.prototype.priority = function priority(priority, peer) {
+ if ((peer && this._letPeerPrioritize) || !peer) {
+ if (!peer) {
+ this._letPeerPrioritize = false;
+
+ var lastFrame = this.upstream.getLastQueuedFrame();
+ if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) {
+ lastFrame.priority = priority;
+ } else {
+ this._pushUpstream({
+ type: 'PRIORITY',
+ flags: {},
+ stream: this.id,
+ priority: priority
+ });
+ }
+ }
+
+ this._log.debug({ priority: priority }, 'Changing priority');
+ this.emit('priority', priority);
+ this._priority = priority;
+ }
+};
+
+Stream.prototype._onPriority = function _onPriority(frame) {
+ this.priority(frame.priority, true);
+};
+
+// Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for
+// any stream.
+Stream.prototype.reset = function reset(error) {
+ if (!this._resetSent) {
+ this._resetSent = true;
+ this._pushUpstream({
+ type: 'RST_STREAM',
+ flags: {},
+ stream: this.id,
+ error: error
+ });
+ }
+};
+
+// Specify an alternate service for the origin of this stream
+Stream.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) {
+ var stream;
+ if (origin) {
+ stream = 0;
+ } else {
+ stream = this.id;
+ }
+ this._pushUpstream({
+ type: 'ALTSVC',
+ flags: {},
+ stream: stream,
+ host: host,
+ port: port,
+ protocolID: protocolID,
+ origin: origin,
+ maxAge: maxAge
+ });
+};
+
+// Data flow
+// ---------
+
+// The incoming and the generated outgoing frames are received/transmitted on the `this.upstream`
+// [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read
+// and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by
+// the user to write or read the body of the request.
+// [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex
+
+// upstream side stream user side
+//
+// +------------------------------------+
+// | |
+// +------------------+ |
+// | upstream | |
+// | | |
+// +--+ | +--|
+// read() | | _send() | _write() | | write(buf)
+// <--------------|B |<--------------|--------------| B|<------------
+// | | | | |
+// frames +--+ | +--| buffers
+// | | | | |
+// -------------->|B |---------------|------------->| B|------------>
+// write(frame) | | _receive() | _read() | | read()
+// +--+ | +--|
+// | | |
+// | | |
+// +------------------+ |
+// | |
+// +------------------------------------+
+//
+// B: input or output buffer
+
+var Flow = require('./flow').Flow;
+
+Stream.prototype._initializeDataFlow = function _initializeDataFlow() {
+ this.id = undefined;
+
+ this._ended = false;
+
+ this.upstream = new Flow();
+ this.upstream._log = this._log;
+ this.upstream._send = this._send.bind(this);
+ this.upstream._receive = this._receive.bind(this);
+ this.upstream.write = this._writeUpstream.bind(this);
+ this.upstream.on('error', this.emit.bind(this, 'error'));
+
+ this.on('finish', this._finishing);
+};
+
+Stream.prototype._pushUpstream = function _pushUpstream(frame) {
+ this.upstream.push(frame);
+ this._transition(true, frame);
+};
+
+// Overriding the upstream's `write` allows us to act immediately instead of waiting for the input
+// queue to empty. This is important in case of control frames.
+Stream.prototype._writeUpstream = function _writeUpstream(frame) {
+ this._log.debug({ frame: frame }, 'Receiving frame');
+
+ var moreNeeded = Flow.prototype.write.call(this.upstream, frame);
+
+ // * Transition to a new state if that's the effect of receiving the frame
+ this._transition(false, frame);
+
+ // * If it's a control frame. Call the appropriate handler method.
+ if (frame.type === 'HEADERS') {
+ if (this._processedHeaders && !frame.flags['END_STREAM']) {
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+ this._processedHeaders = true;
+ this._onHeaders(frame);
+ } else if (frame.type === 'PUSH_PROMISE') {
+ this._onPromise(frame);
+ } else if (frame.type === 'PRIORITY') {
+ this._onPriority(frame);
+ } else if (frame.type === 'ALTSVC') {
+ // TODO
+ } else if (frame.type === 'ORIGIN') {
+ // TODO
+ }
+
+ // * If it's an invalid stream level frame, emit error
+ else if ((frame.type !== 'DATA') &&
+ (frame.type !== 'WINDOW_UPDATE') &&
+ (frame.type !== 'RST_STREAM')) {
+ this._log.error({ frame: frame }, 'Invalid stream level frame');
+ this.emit('error', 'PROTOCOL_ERROR');
+ }
+
+ return moreNeeded;
+};
+
+// The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame.
+Stream.prototype._receive = function _receive(frame, ready) {
+ // * If it's a DATA frame, then push the payload into the output buffer on the other side.
+ // Call ready when the other side is ready to receive more.
+ if (!this._ended && (frame.type === 'DATA')) {
+ var moreNeeded = this.push(frame.data);
+ if (!moreNeeded) {
+ this._receiveMore = ready;
+ }
+ }
+
+ // * Any frame may signal the end of the stream with the END_STREAM flag
+ if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
+ this.push(null);
+ this._ended = true;
+ }
+
+ // * Postpone calling `ready` if `push()` returned a falsy value
+ if (this._receiveMore !== ready) {
+ ready();
+ }
+};
+
+// The `_read` method is called when the user side is ready to receive more data. If there's a
+// pending write on the upstream, then call its pending ready callback to receive more frames.
+Stream.prototype._read = function _read() {
+ if (this._receiveMore) {
+ var receiveMore = this._receiveMore;
+ delete this._receiveMore;
+ receiveMore();
+ }
+};
+
+// The `write` method gets called when there's a write request from the user.
+Stream.prototype._write = function _write(buffer, encoding, ready) {
+ // * Chunking is done by the upstream Flow.
+ var moreNeeded = this._pushUpstream({
+ type: 'DATA',
+ flags: {},
+ stream: this.id,
+ data: buffer
+ });
+
+ // * Call ready when upstream is ready to receive more frames.
+ if (moreNeeded) {
+ ready();
+ } else {
+ this._sendMore = ready;
+ }
+};
+
+// The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames.
+// If there's a pending write on the user side, then call its pending ready callback to receive more
+// writes.
+Stream.prototype._send = function _send() {
+ if (this._sendMore) {
+ var sendMore = this._sendMore;
+ delete this._sendMore;
+ sendMore();
+ }
+};
+
+// When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM`
+// flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag,
+// then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an
+// existing frame is a nice optimization.
+var emptyBuffer = Buffer.alloc(0);
+Stream.prototype._finishing = function _finishing() {
+ var endFrame = {
+ type: 'DATA',
+ flags: { END_STREAM: true },
+ stream: this.id,
+ data: emptyBuffer
+ };
+
+ if (this.sentEndStream) {
+ this._log.debug('Already sent END_STREAM, not sending again.');
+ return;
+ }
+
+ this.sentEndStream = true;
+ var lastFrame = this.upstream.getLastQueuedFrame();
+ if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) {
+ this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.');
+ lastFrame.flags.END_STREAM = true;
+ this._transition(true, endFrame);
+ } else {
+ this._pushUpstream(endFrame);
+ }
+};
+
+// [Stream States](https://tools.ietf.org/html/rfc7540#section-5.1)
+// ----------------
+//
+// +--------+
+// PP | | PP
+// ,--------| idle |--------.
+// / | | \
+// v +--------+ v
+// +----------+ | +----------+
+// | | | H | |
+// ,---| reserved | | | reserved |---.
+// | | (local) | v | (remote) | |
+// | +----------+ +--------+ +----------+ |
+// | | ES | | ES | |
+// | | H ,-------| open |-------. | H |
+// | | / | | \ | |
+// | v v +--------+ v v |
+// | +----------+ | +----------+ |
+// | | half | | | half | |
+// | | closed | | R | closed | |
+// | | (remote) | | | (local) | |
+// | +----------+ | +----------+ |
+// | | v | |
+// | | ES / R +--------+ ES / R | |
+// | `----------->| |<-----------' |
+// | R | closed | R |
+// `-------------------->| |<--------------------'
+// +--------+
+
+// Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame
+Stream.prototype._initializeState = function _initializeState() {
+ this.state = 'IDLE';
+ this._initiated = undefined;
+ this._closedByUs = undefined;
+ this._closedWithRst = undefined;
+ this._processedHeaders = false;
+};
+
+// Only `_setState` should change `this.state` directly. It also logs the state change and notifies
+// interested parties using the 'state' event.
+Stream.prototype._setState = function transition(state) {
+ assert(this.state !== state);
+ this._log.debug({ from: this.state, to: state }, 'State transition');
+ this.state = state;
+ this.emit('state', state);
+};
+
+// A state is 'active' if the stream in that state counts towards the concurrency limit. Streams
+// that are in the "open" state, or either of the "half closed" states count toward this limit.
+function activeState(state) {
+ return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN'));
+}
+
+// `_transition` is called every time there's an incoming or outgoing frame. It manages state
+// transitions, and detects stream errors. A stream error is always caused by a frame that is not
+// allowed in the current state.
+Stream.prototype._transition = function transition(sending, frame) {
+ var receiving = !sending;
+ var connectionError;
+ var streamError;
+
+ var DATA = false, HEADERS = false, PRIORITY = false, ALTSVC = false, ORIGIN = false;
+ var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false;
+ switch(frame.type) {
+ case 'DATA' : DATA = true; break;
+ case 'HEADERS' : HEADERS = true; break;
+ case 'PRIORITY' : PRIORITY = true; break;
+ case 'RST_STREAM' : RST_STREAM = true; break;
+ case 'PUSH_PROMISE' : PUSH_PROMISE = true; break;
+ case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break;
+ case 'ALTSVC' : ALTSVC = true; break;
+ case 'ORIGIN' : ORIGIN = true; break;
+ }
+
+ var previousState = this.state;
+
+ switch (this.state) {
+ // All streams start in the **idle** state. In this state, no frames have been exchanged.
+ //
+ // * Sending or receiving a HEADERS frame causes the stream to become "open".
+ //
+ // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen.
+ case 'IDLE':
+ if (HEADERS) {
+ this._setState('OPEN');
+ if (frame.flags.END_STREAM) {
+ this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
+ }
+ this._initiated = sending;
+ } else if (sending && RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (PRIORITY) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream in the **reserved (local)** state is one that has been promised by sending a
+ // PUSH_PROMISE frame.
+ //
+ // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed
+ // (remote)" state.
+ // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
+ // releases the stream reservation.
+ // * An endpoint may receive PRIORITY frame in this state.
+ // * An endpoint MUST NOT send any other type of frame in this state.
+ case 'RESERVED_LOCAL':
+ if (sending && HEADERS) {
+ this._setState('HALF_CLOSED_REMOTE');
+ } else if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (PRIORITY) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream in the **reserved (remote)** state has been reserved by a remote peer.
+ //
+ // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This
+ // releases the stream reservation.
+ // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)".
+ // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream.
+ // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR.
+ case 'RESERVED_REMOTE':
+ if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else if (receiving && HEADERS) {
+ this._setState('HALF_CLOSED_LOCAL');
+ } else if (PRIORITY || ORIGIN) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // The **open** state is where both peers can send frames. In this state, sending peers observe
+ // advertised stream level flow control limits.
+ //
+ // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes
+ // the stream to transition into one of the "half closed" states: an endpoint sending a
+ // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint
+ // receiving a END_STREAM flag causes the stream state to become "half closed (remote)".
+ // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition
+ // immediately to "closed".
+ case 'OPEN':
+ if (frame.flags.END_STREAM) {
+ this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE');
+ } else if (RST_STREAM) {
+ this._setState('CLOSED');
+ } else {
+ /* No state change */
+ }
+ break;
+
+ // A stream that is **half closed (local)** cannot be used for sending frames.
+ //
+ // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM
+ // flag is received, or when either peer sends a RST_STREAM frame.
+ // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
+ // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag.
+ case 'HALF_CLOSED_LOCAL':
+ if (RST_STREAM || (receiving && frame.flags.END_STREAM)) {
+ this._setState('CLOSED');
+ } else if (ORIGIN || ALTSVC || receiving || PRIORITY || (sending && WINDOW_UPDATE)) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // A stream that is **half closed (remote)** is no longer being used by the peer to send frames.
+ // In this state, an endpoint is no longer obligated to maintain a receiver flow control window
+ // if it performs flow control.
+ //
+ // * If an endpoint receives additional frames for a stream that is in this state it MUST
+ // respond with a stream error of type STREAM_CLOSED.
+ // * A stream can transition from this state to "closed" by sending a frame that contains a
+ // END_STREAM flag, or when either peer sends a RST_STREAM frame.
+ // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream.
+ // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream.
+ case 'HALF_CLOSED_REMOTE':
+ if (RST_STREAM || (sending && frame.flags.END_STREAM)) {
+ this._setState('CLOSED');
+ } else if (ORIGIN || ALTSVC || sending || PRIORITY || (receiving && WINDOW_UPDATE)) {
+ /* No state change */
+ } else {
+ connectionError = 'PROTOCOL_ERROR';
+ }
+ break;
+
+ // The **closed** state is the terminal state.
+ //
+ // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame
+ // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST
+ // treat that as a stream error of type STREAM_CLOSED.
+ // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short
+ // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives
+ // and processes the frame bearing the END_STREAM flag, it might send either frame type.
+ // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY
+ // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending
+ // END_STREAM as a connection error of type PROTOCOL_ERROR.
+ // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives
+ // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream
+ // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that
+ // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose
+ // to limit the period over which it ignores frames and treat frames that arrive after this
+ // time as being in error.
+ // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE
+ // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM
+ // can be used to close any of those streams.
+ case 'CLOSED':
+ if (PRIORITY || (sending && RST_STREAM) ||
+ (receiving && WINDOW_UPDATE) ||
+ (receiving && this._closedByUs &&
+ (this._closedWithRst || RST_STREAM || ALTSVC || ORIGIN))) {
+ /* No state change */
+ } else {
+ streamError = 'STREAM_CLOSED';
+ }
+ break;
+ }
+
+ // Noting that the connection was closed by the other endpoint. It may be important in edge cases.
+ // For example, when the peer tries to cancel a promised stream, but we already sent every data
+ // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM.
+ if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) {
+ this._closedByUs = sending;
+ this._closedWithRst = RST_STREAM;
+ }
+
+ // Sending/receiving a PUSH_PROMISE
+ //
+ // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state
+ // for the reserved stream transitions to "reserved (local)".
+ // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer.
+ // The state of the stream becomes "reserved (remote)".
+ if (PUSH_PROMISE && !connectionError && !streamError) {
+ /* This assertion must hold, because _transition is called immediately when a frame is written
+ to the stream. If it would be called when a frame gets out of the input queue, the state
+ of the reserved could have been changed by then. */
+ assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state);
+ frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
+ frame.promised_stream._initiated = sending;
+ }
+
+ // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1)
+ if (this._initiated) {
+ var change = (activeState(this.state) - activeState(previousState));
+ if (sending) {
+ frame.count_change = change;
+ } else {
+ frame.count_change(change);
+ }
+ } else if (sending) {
+ frame.count_change = 0;
+ }
+
+ // Common error handling.
+ if (connectionError || streamError) {
+ var info = {
+ error: connectionError,
+ frame: frame,
+ state: this.state,
+ closedByUs: this._closedByUs,
+ closedWithRst: this._closedWithRst
+ };
+
+ // * When sending something invalid, throwing an exception, since it is probably a bug.
+ if (sending) {
+ this._log.error(info, 'Sending illegal frame.');
+ return this.emit('error', new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.'));
+ }
+
+ // * In case of a serious problem, emitting and error and letting someone else handle it
+ // (e.g. closing the connection)
+ // * When receiving something invalid, sending an RST_STREAM using the `reset` method.
+ // This will automatically cause a transition to the CLOSED state.
+ else {
+ this._log.error(info, 'Received illegal frame.');
+ if (connectionError) {
+ this.emit('connectionError', connectionError);
+ } else {
+ this.reset(streamError);
+ this.emit('error', streamError);
+ }
+ }
+ }
+};
+
+// Bunyan serializers
+// ------------------
+
+exports.serializers = {};
+
+var nextId = 0;
+exports.serializers.s = function(stream) {
+ if (!('_id' in stream)) {
+ stream._id = nextId;
+ nextId += 1;
+ }
+ return stream._id;
+};