summaryrefslogtreecommitdiffstats
path: root/testing/xpcshell/node-ws/test/create-websocket-stream.test.js
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--testing/xpcshell/node-ws/test/create-websocket-stream.test.js598
1 files changed, 598 insertions, 0 deletions
diff --git a/testing/xpcshell/node-ws/test/create-websocket-stream.test.js b/testing/xpcshell/node-ws/test/create-websocket-stream.test.js
new file mode 100644
index 0000000000..4d51958cd9
--- /dev/null
+++ b/testing/xpcshell/node-ws/test/create-websocket-stream.test.js
@@ -0,0 +1,598 @@
+'use strict';
+
+const assert = require('assert');
+const EventEmitter = require('events');
+const { createServer } = require('http');
+const { Duplex } = require('stream');
+const { randomBytes } = require('crypto');
+
+const createWebSocketStream = require('../lib/stream');
+const Sender = require('../lib/sender');
+const WebSocket = require('..');
+const { EMPTY_BUFFER } = require('../lib/constants');
+
+describe('createWebSocketStream', () => {
+ it('is exposed as a property of the `WebSocket` class', () => {
+ assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
+ });
+
+ it('returns a `Duplex` stream', () => {
+ const duplex = createWebSocketStream(new EventEmitter());
+
+ assert.ok(duplex instanceof Duplex);
+ });
+
+ it('passes the options object to the `Duplex` constructor', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws, {
+ allowHalfOpen: false,
+ encoding: 'utf8'
+ });
+
+ duplex.on('data', (chunk) => {
+ assert.strictEqual(chunk, 'hi');
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.send(Buffer.from('hi'));
+ ws.close();
+ });
+ });
+
+ describe('The returned stream', () => {
+ it('buffers writes if `readyState` is `CONNECTING`', (done) => {
+ const chunk = randomBytes(1024);
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+
+ assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
+
+ const duplex = createWebSocketStream(ws);
+
+ duplex.write(chunk);
+ });
+
+ wss.on('connection', (ws) => {
+ ws.on('message', (message, isBinary) => {
+ ws.on('close', (code, reason) => {
+ assert.deepStrictEqual(message, chunk);
+ assert.ok(isBinary);
+ assert.strictEqual(code, 1005);
+ assert.strictEqual(reason, EMPTY_BUFFER);
+ wss.close(done);
+ });
+ });
+
+ ws.close();
+ });
+ });
+
+ it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('error', (err) => {
+ assert.ok(duplex.destroyed);
+ assert.ok(err instanceof Error);
+ assert.strictEqual(
+ err.message,
+ 'WebSocket is not open: readyState 2 (CLOSING)'
+ );
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+ });
+
+ ws.on('open', () => {
+ ws._receiver.on('conclude', () => {
+ duplex.write('hi');
+ });
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.close();
+ });
+ });
+
+ it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('error', (err) => {
+ assert.ok(duplex.destroyed);
+ assert.ok(err instanceof Error);
+ assert.strictEqual(
+ err.message,
+ 'WebSocket is not open: readyState 3 (CLOSED)'
+ );
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+ });
+
+ ws.on('close', () => {
+ duplex.write('hi');
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.close();
+ });
+ });
+
+ it('does not error if `_final()` is called while connecting', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+
+ assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
+
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+
+ duplex.resume();
+ duplex.end();
+ });
+ });
+
+ it('makes `_final()` a noop if no socket is assigned', (done) => {
+ const server = createServer();
+
+ server.on('upgrade', (request, socket) => {
+ socket.on('end', socket.end);
+
+ const headers = [
+ 'HTTP/1.1 101 Switching Protocols',
+ 'Upgrade: websocket',
+ 'Connection: Upgrade',
+ 'Sec-WebSocket-Accept: foo'
+ ];
+
+ socket.write(headers.concat('\r\n').join('\r\n'));
+ });
+
+ server.listen(() => {
+ const called = [];
+ const ws = new WebSocket(`ws://localhost:${server.address().port}`);
+ const duplex = WebSocket.createWebSocketStream(ws);
+ const final = duplex._final;
+
+ duplex._final = (callback) => {
+ called.push('final');
+ assert.strictEqual(ws.readyState, WebSocket.CLOSING);
+ assert.strictEqual(ws._socket, null);
+
+ final(callback);
+ };
+
+ duplex.on('error', (err) => {
+ called.push('error');
+ assert.ok(err instanceof Error);
+ assert.strictEqual(
+ err.message,
+ 'Invalid Sec-WebSocket-Accept header'
+ );
+ });
+
+ duplex.on('finish', () => {
+ called.push('finish');
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(called, ['final', 'error']);
+ server.close(done);
+ });
+
+ ws.on('upgrade', () => {
+ process.nextTick(() => {
+ duplex.end();
+ });
+ });
+ });
+ });
+
+ it('reemits errors', (done) => {
+ let duplexCloseEventEmitted = false;
+ let serverClientCloseEventEmitted = false;
+
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('error', (err) => {
+ assert.ok(err instanceof RangeError);
+ assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE');
+ assert.strictEqual(
+ err.message,
+ 'Invalid WebSocket frame: invalid opcode 5'
+ );
+
+ duplex.on('close', () => {
+ duplexCloseEventEmitted = true;
+ if (serverClientCloseEventEmitted) wss.close(done);
+ });
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws._socket.write(Buffer.from([0x85, 0x00]));
+ ws.on('close', (code, reason) => {
+ assert.strictEqual(code, 1002);
+ assert.deepStrictEqual(reason, EMPTY_BUFFER);
+
+ serverClientCloseEventEmitted = true;
+ if (duplexCloseEventEmitted) wss.close(done);
+ });
+ });
+ });
+
+ it('does not swallow errors that may occur while destroying', (done) => {
+ const frame = Buffer.concat(
+ Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), {
+ fin: true,
+ rsv1: true,
+ opcode: 0x02,
+ mask: false,
+ readOnly: false
+ })
+ );
+
+ const wss = new WebSocket.Server(
+ {
+ perMessageDeflate: true,
+ port: 0
+ },
+ () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('error', (err) => {
+ assert.ok(err instanceof Error);
+ assert.strictEqual(err.code, 'Z_DATA_ERROR');
+ assert.strictEqual(err.errno, -3);
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+ });
+
+ let bytesRead = 0;
+
+ ws.on('open', () => {
+ ws._socket.on('data', (chunk) => {
+ bytesRead += chunk.length;
+ if (bytesRead === frame.length) duplex.destroy();
+ });
+ });
+ }
+ );
+
+ wss.on('connection', (ws) => {
+ ws._socket.write(frame);
+ });
+ });
+
+ it("does not suppress the throwing behavior of 'error' events", (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ createWebSocketStream(ws);
+ });
+
+ wss.on('connection', (ws) => {
+ ws._socket.write(Buffer.from([0x85, 0x00]));
+ });
+
+ assert.strictEqual(process.listenerCount('uncaughtException'), 1);
+
+ const [listener] = process.listeners('uncaughtException');
+
+ process.removeAllListeners('uncaughtException');
+ process.once('uncaughtException', (err) => {
+ assert.ok(err instanceof Error);
+ assert.strictEqual(
+ err.message,
+ 'Invalid WebSocket frame: invalid opcode 5'
+ );
+
+ process.on('uncaughtException', listener);
+ wss.close(done);
+ });
+ });
+
+ it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const events = [];
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('end', () => {
+ events.push('end');
+ assert.ok(duplex.destroyed);
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(events, ['finish', 'end']);
+ wss.close(done);
+ });
+
+ duplex.on('finish', () => {
+ events.push('finish');
+ assert.ok(!duplex.destroyed);
+ assert.ok(duplex.readable);
+
+ duplex.resume();
+ });
+
+ ws.on('close', () => {
+ duplex.end();
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.send('foo');
+ ws.close();
+ });
+ });
+
+ it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const events = [];
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('end', () => {
+ events.push('end');
+ assert.ok(!duplex.destroyed);
+ assert.ok(duplex.writable);
+
+ duplex.end();
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(events, ['end', 'finish']);
+ wss.close(done);
+ });
+
+ duplex.on('finish', () => {
+ events.push('finish');
+ });
+
+ duplex.resume();
+ });
+
+ wss.on('connection', (ws) => {
+ ws.close();
+ });
+ });
+
+ it('handles backpressure (1/3)', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ // eslint-disable-next-line no-unused-vars
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ });
+
+ wss.on('connection', (ws) => {
+ const duplex = createWebSocketStream(ws);
+
+ duplex.resume();
+
+ duplex.on('drain', () => {
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+
+ duplex.end();
+ });
+
+ const chunk = randomBytes(1024);
+ let ret;
+
+ do {
+ ret = duplex.write(chunk);
+ } while (ret !== false);
+ });
+ });
+
+ it('handles backpressure (2/3)', (done) => {
+ const wss = new WebSocket.Server(
+ { port: 0, perMessageDeflate: true },
+ () => {
+ const called = [];
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+ const read = duplex._read;
+
+ duplex._read = () => {
+ duplex._read = read;
+ called.push('read');
+ assert.ok(ws._receiver._writableState.needDrain);
+ read();
+ assert.ok(ws._socket.isPaused());
+ };
+
+ ws.on('open', () => {
+ ws._socket.on('pause', () => {
+ duplex.resume();
+ });
+
+ ws._receiver.on('drain', () => {
+ called.push('drain');
+ assert.ok(!ws._socket.isPaused());
+ duplex.end();
+ });
+
+ const opts = {
+ fin: true,
+ opcode: 0x02,
+ mask: false,
+ readOnly: false
+ };
+
+ const list = [
+ ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
+ ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
+ ];
+
+ // This hack is used because there is no guarantee that more than
+ // 16 KiB will be sent as a single TCP packet.
+ ws._socket.push(Buffer.concat(list));
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(called, ['read', 'drain']);
+ wss.close(done);
+ });
+ }
+ );
+ });
+
+ it('handles backpressure (3/3)', (done) => {
+ const wss = new WebSocket.Server(
+ { port: 0, perMessageDeflate: true },
+ () => {
+ const called = [];
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+ const read = duplex._read;
+
+ duplex._read = () => {
+ called.push('read');
+ assert.ok(!ws._receiver._writableState.needDrain);
+ read();
+ assert.ok(!ws._socket.isPaused());
+ duplex.end();
+ };
+
+ ws.on('open', () => {
+ ws._receiver.on('drain', () => {
+ called.push('drain');
+ assert.ok(ws._socket.isPaused());
+ duplex.resume();
+ });
+
+ const opts = {
+ fin: true,
+ opcode: 0x02,
+ mask: false,
+ readOnly: false
+ };
+
+ const list = [
+ ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
+ ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
+ ];
+
+ ws._socket.push(Buffer.concat(list));
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(called, ['drain', 'read']);
+ wss.close(done);
+ });
+ }
+ );
+ });
+
+ it('can be destroyed (1/2)', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const error = new Error('Oops');
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('error', (err) => {
+ assert.strictEqual(err, error);
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+ });
+
+ ws.on('open', () => {
+ duplex.destroy(error);
+ });
+ });
+ });
+
+ it('can be destroyed (2/2)', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+
+ ws.on('open', () => {
+ duplex.destroy();
+ });
+ });
+ });
+
+ it('converts text messages to strings in readable object mode', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const events = [];
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws, { readableObjectMode: true });
+
+ duplex.on('data', (data) => {
+ events.push('data');
+ assert.strictEqual(data, 'foo');
+ });
+
+ duplex.on('end', () => {
+ events.push('end');
+ duplex.end();
+ });
+
+ duplex.on('close', () => {
+ assert.deepStrictEqual(events, ['data', 'end']);
+ wss.close(done);
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.send('foo');
+ ws.close();
+ });
+ });
+
+ it('resumes the socket if `readyState` is `CLOSING`', (done) => {
+ const wss = new WebSocket.Server({ port: 0 }, () => {
+ const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
+ const duplex = createWebSocketStream(ws);
+
+ ws.on('message', () => {
+ assert.ok(ws._socket.isPaused());
+
+ duplex.on('close', () => {
+ wss.close(done);
+ });
+
+ duplex.end();
+
+ process.nextTick(() => {
+ assert.strictEqual(ws.readyState, WebSocket.CLOSING);
+ duplex.resume();
+ });
+ });
+ });
+
+ wss.on('connection', (ws) => {
+ ws.send(randomBytes(16 * 1024));
+ });
+ });
+ });
+});