diff options
Diffstat (limited to 'testing/xpcshell/node-ws/test/create-websocket-stream.test.js')
-rw-r--r-- | testing/xpcshell/node-ws/test/create-websocket-stream.test.js | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/testing/xpcshell/node-ws/test/create-websocket-stream.test.js b/testing/xpcshell/node-ws/test/create-websocket-stream.test.js new file mode 100644 index 0000000000..4d51958cd9 --- /dev/null +++ b/testing/xpcshell/node-ws/test/create-websocket-stream.test.js @@ -0,0 +1,598 @@ +'use strict'; + +const assert = require('assert'); +const EventEmitter = require('events'); +const { createServer } = require('http'); +const { Duplex } = require('stream'); +const { randomBytes } = require('crypto'); + +const createWebSocketStream = require('../lib/stream'); +const Sender = require('../lib/sender'); +const WebSocket = require('..'); +const { EMPTY_BUFFER } = require('../lib/constants'); + +describe('createWebSocketStream', () => { + it('is exposed as a property of the `WebSocket` class', () => { + assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream); + }); + + it('returns a `Duplex` stream', () => { + const duplex = createWebSocketStream(new EventEmitter()); + + assert.ok(duplex instanceof Duplex); + }); + + it('passes the options object to the `Duplex` constructor', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws, { + allowHalfOpen: false, + encoding: 'utf8' + }); + + duplex.on('data', (chunk) => { + assert.strictEqual(chunk, 'hi'); + + duplex.on('close', () => { + wss.close(done); + }); + }); + }); + + wss.on('connection', (ws) => { + ws.send(Buffer.from('hi')); + ws.close(); + }); + }); + + describe('The returned stream', () => { + it('buffers writes if `readyState` is `CONNECTING`', (done) => { + const chunk = randomBytes(1024); + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + assert.strictEqual(ws.readyState, WebSocket.CONNECTING); + + const duplex = createWebSocketStream(ws); + + duplex.write(chunk); + }); + + wss.on('connection', (ws) => { + ws.on('message', (message, isBinary) => { + ws.on('close', (code, reason) => { + assert.deepStrictEqual(message, chunk); + assert.ok(isBinary); + assert.strictEqual(code, 1005); + assert.strictEqual(reason, EMPTY_BUFFER); + wss.close(done); + }); + }); + + ws.close(); + }); + }); + + it('errors if a write occurs when `readyState` is `CLOSING`', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('error', (err) => { + assert.ok(duplex.destroyed); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 2 (CLOSING)' + ); + + duplex.on('close', () => { + wss.close(done); + }); + }); + + ws.on('open', () => { + ws._receiver.on('conclude', () => { + duplex.write('hi'); + }); + }); + }); + + wss.on('connection', (ws) => { + ws.close(); + }); + }); + + it('errors if a write occurs when `readyState` is `CLOSED`', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('error', (err) => { + assert.ok(duplex.destroyed); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'WebSocket is not open: readyState 3 (CLOSED)' + ); + + duplex.on('close', () => { + wss.close(done); + }); + }); + + ws.on('close', () => { + duplex.write('hi'); + }); + }); + + wss.on('connection', (ws) => { + ws.close(); + }); + }); + + it('does not error if `_final()` is called while connecting', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + + assert.strictEqual(ws.readyState, WebSocket.CONNECTING); + + const duplex = createWebSocketStream(ws); + + duplex.on('close', () => { + wss.close(done); + }); + + duplex.resume(); + duplex.end(); + }); + }); + + it('makes `_final()` a noop if no socket is assigned', (done) => { + const server = createServer(); + + server.on('upgrade', (request, socket) => { + socket.on('end', socket.end); + + const headers = [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + 'Sec-WebSocket-Accept: foo' + ]; + + socket.write(headers.concat('\r\n').join('\r\n')); + }); + + server.listen(() => { + const called = []; + const ws = new WebSocket(`ws://localhost:${server.address().port}`); + const duplex = WebSocket.createWebSocketStream(ws); + const final = duplex._final; + + duplex._final = (callback) => { + called.push('final'); + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + assert.strictEqual(ws._socket, null); + + final(callback); + }; + + duplex.on('error', (err) => { + called.push('error'); + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'Invalid Sec-WebSocket-Accept header' + ); + }); + + duplex.on('finish', () => { + called.push('finish'); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(called, ['final', 'error']); + server.close(done); + }); + + ws.on('upgrade', () => { + process.nextTick(() => { + duplex.end(); + }); + }); + }); + }); + + it('reemits errors', (done) => { + let duplexCloseEventEmitted = false; + let serverClientCloseEventEmitted = false; + + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('error', (err) => { + assert.ok(err instanceof RangeError); + assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE'); + assert.strictEqual( + err.message, + 'Invalid WebSocket frame: invalid opcode 5' + ); + + duplex.on('close', () => { + duplexCloseEventEmitted = true; + if (serverClientCloseEventEmitted) wss.close(done); + }); + }); + }); + + wss.on('connection', (ws) => { + ws._socket.write(Buffer.from([0x85, 0x00])); + ws.on('close', (code, reason) => { + assert.strictEqual(code, 1002); + assert.deepStrictEqual(reason, EMPTY_BUFFER); + + serverClientCloseEventEmitted = true; + if (duplexCloseEventEmitted) wss.close(done); + }); + }); + }); + + it('does not swallow errors that may occur while destroying', (done) => { + const frame = Buffer.concat( + Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), { + fin: true, + rsv1: true, + opcode: 0x02, + mask: false, + readOnly: false + }) + ); + + const wss = new WebSocket.Server( + { + perMessageDeflate: true, + port: 0 + }, + () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('error', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual(err.code, 'Z_DATA_ERROR'); + assert.strictEqual(err.errno, -3); + + duplex.on('close', () => { + wss.close(done); + }); + }); + + let bytesRead = 0; + + ws.on('open', () => { + ws._socket.on('data', (chunk) => { + bytesRead += chunk.length; + if (bytesRead === frame.length) duplex.destroy(); + }); + }); + } + ); + + wss.on('connection', (ws) => { + ws._socket.write(frame); + }); + }); + + it("does not suppress the throwing behavior of 'error' events", (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + createWebSocketStream(ws); + }); + + wss.on('connection', (ws) => { + ws._socket.write(Buffer.from([0x85, 0x00])); + }); + + assert.strictEqual(process.listenerCount('uncaughtException'), 1); + + const [listener] = process.listeners('uncaughtException'); + + process.removeAllListeners('uncaughtException'); + process.once('uncaughtException', (err) => { + assert.ok(err instanceof Error); + assert.strictEqual( + err.message, + 'Invalid WebSocket frame: invalid opcode 5' + ); + + process.on('uncaughtException', listener); + wss.close(done); + }); + }); + + it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const events = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('end', () => { + events.push('end'); + assert.ok(duplex.destroyed); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(events, ['finish', 'end']); + wss.close(done); + }); + + duplex.on('finish', () => { + events.push('finish'); + assert.ok(!duplex.destroyed); + assert.ok(duplex.readable); + + duplex.resume(); + }); + + ws.on('close', () => { + duplex.end(); + }); + }); + + wss.on('connection', (ws) => { + ws.send('foo'); + ws.close(); + }); + }); + + it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const events = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('end', () => { + events.push('end'); + assert.ok(!duplex.destroyed); + assert.ok(duplex.writable); + + duplex.end(); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(events, ['end', 'finish']); + wss.close(done); + }); + + duplex.on('finish', () => { + events.push('finish'); + }); + + duplex.resume(); + }); + + wss.on('connection', (ws) => { + ws.close(); + }); + }); + + it('handles backpressure (1/3)', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + // eslint-disable-next-line no-unused-vars + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + }); + + wss.on('connection', (ws) => { + const duplex = createWebSocketStream(ws); + + duplex.resume(); + + duplex.on('drain', () => { + duplex.on('close', () => { + wss.close(done); + }); + + duplex.end(); + }); + + const chunk = randomBytes(1024); + let ret; + + do { + ret = duplex.write(chunk); + } while (ret !== false); + }); + }); + + it('handles backpressure (2/3)', (done) => { + const wss = new WebSocket.Server( + { port: 0, perMessageDeflate: true }, + () => { + const called = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + const read = duplex._read; + + duplex._read = () => { + duplex._read = read; + called.push('read'); + assert.ok(ws._receiver._writableState.needDrain); + read(); + assert.ok(ws._socket.isPaused()); + }; + + ws.on('open', () => { + ws._socket.on('pause', () => { + duplex.resume(); + }); + + ws._receiver.on('drain', () => { + called.push('drain'); + assert.ok(!ws._socket.isPaused()); + duplex.end(); + }); + + const opts = { + fin: true, + opcode: 0x02, + mask: false, + readOnly: false + }; + + const list = [ + ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), + ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) + ]; + + // This hack is used because there is no guarantee that more than + // 16 KiB will be sent as a single TCP packet. + ws._socket.push(Buffer.concat(list)); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(called, ['read', 'drain']); + wss.close(done); + }); + } + ); + }); + + it('handles backpressure (3/3)', (done) => { + const wss = new WebSocket.Server( + { port: 0, perMessageDeflate: true }, + () => { + const called = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + const read = duplex._read; + + duplex._read = () => { + called.push('read'); + assert.ok(!ws._receiver._writableState.needDrain); + read(); + assert.ok(!ws._socket.isPaused()); + duplex.end(); + }; + + ws.on('open', () => { + ws._receiver.on('drain', () => { + called.push('drain'); + assert.ok(ws._socket.isPaused()); + duplex.resume(); + }); + + const opts = { + fin: true, + opcode: 0x02, + mask: false, + readOnly: false + }; + + const list = [ + ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), + ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) + ]; + + ws._socket.push(Buffer.concat(list)); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(called, ['drain', 'read']); + wss.close(done); + }); + } + ); + }); + + it('can be destroyed (1/2)', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const error = new Error('Oops'); + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('error', (err) => { + assert.strictEqual(err, error); + + duplex.on('close', () => { + wss.close(done); + }); + }); + + ws.on('open', () => { + duplex.destroy(error); + }); + }); + }); + + it('can be destroyed (2/2)', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + duplex.on('close', () => { + wss.close(done); + }); + + ws.on('open', () => { + duplex.destroy(); + }); + }); + }); + + it('converts text messages to strings in readable object mode', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const events = []; + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws, { readableObjectMode: true }); + + duplex.on('data', (data) => { + events.push('data'); + assert.strictEqual(data, 'foo'); + }); + + duplex.on('end', () => { + events.push('end'); + duplex.end(); + }); + + duplex.on('close', () => { + assert.deepStrictEqual(events, ['data', 'end']); + wss.close(done); + }); + }); + + wss.on('connection', (ws) => { + ws.send('foo'); + ws.close(); + }); + }); + + it('resumes the socket if `readyState` is `CLOSING`', (done) => { + const wss = new WebSocket.Server({ port: 0 }, () => { + const ws = new WebSocket(`ws://localhost:${wss.address().port}`); + const duplex = createWebSocketStream(ws); + + ws.on('message', () => { + assert.ok(ws._socket.isPaused()); + + duplex.on('close', () => { + wss.close(done); + }); + + duplex.end(); + + process.nextTick(() => { + assert.strictEqual(ws.readyState, WebSocket.CLOSING); + duplex.resume(); + }); + }); + }); + + wss.on('connection', (ws) => { + ws.send(randomBytes(16 * 1024)); + }); + }); + }); +}); |