598 lines
16 KiB
JavaScript
598 lines
16 KiB
JavaScript
'use strict';
|
|
|
|
const assert = require('assert');
|
|
const EventEmitter = require('events');
|
|
const { createServer } = require('http');
|
|
const { Duplex } = require('stream');
|
|
const { randomBytes } = require('crypto');
|
|
|
|
const createWebSocketStream = require('../lib/stream');
|
|
const Sender = require('../lib/sender');
|
|
const WebSocket = require('..');
|
|
const { EMPTY_BUFFER } = require('../lib/constants');
|
|
|
|
describe('createWebSocketStream', () => {
|
|
it('is exposed as a property of the `WebSocket` class', () => {
|
|
assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
|
|
});
|
|
|
|
it('returns a `Duplex` stream', () => {
|
|
const duplex = createWebSocketStream(new EventEmitter());
|
|
|
|
assert.ok(duplex instanceof Duplex);
|
|
});
|
|
|
|
it('passes the options object to the `Duplex` constructor', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws, {
|
|
allowHalfOpen: false,
|
|
encoding: 'utf8'
|
|
});
|
|
|
|
duplex.on('data', (chunk) => {
|
|
assert.strictEqual(chunk, 'hi');
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.send(Buffer.from('hi'));
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
describe('The returned stream', () => {
|
|
it('buffers writes if `readyState` is `CONNECTING`', (done) => {
|
|
const chunk = randomBytes(1024);
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
|
|
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
|
|
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.write(chunk);
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.on('message', (message, isBinary) => {
|
|
ws.on('close', (code, reason) => {
|
|
assert.deepStrictEqual(message, chunk);
|
|
assert.ok(isBinary);
|
|
assert.strictEqual(code, 1005);
|
|
assert.strictEqual(reason, EMPTY_BUFFER);
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('error', (err) => {
|
|
assert.ok(duplex.destroyed);
|
|
assert.ok(err instanceof Error);
|
|
assert.strictEqual(
|
|
err.message,
|
|
'WebSocket is not open: readyState 2 (CLOSING)'
|
|
);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
ws.on('open', () => {
|
|
ws._receiver.on('conclude', () => {
|
|
duplex.write('hi');
|
|
});
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('error', (err) => {
|
|
assert.ok(duplex.destroyed);
|
|
assert.ok(err instanceof Error);
|
|
assert.strictEqual(
|
|
err.message,
|
|
'WebSocket is not open: readyState 3 (CLOSED)'
|
|
);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
duplex.write('hi');
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it('does not error if `_final()` is called while connecting', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
|
|
assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
|
|
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
|
|
duplex.resume();
|
|
duplex.end();
|
|
});
|
|
});
|
|
|
|
it('makes `_final()` a noop if no socket is assigned', (done) => {
|
|
const server = createServer();
|
|
|
|
server.on('upgrade', (request, socket) => {
|
|
socket.on('end', socket.end);
|
|
|
|
const headers = [
|
|
'HTTP/1.1 101 Switching Protocols',
|
|
'Upgrade: websocket',
|
|
'Connection: Upgrade',
|
|
'Sec-WebSocket-Accept: foo'
|
|
];
|
|
|
|
socket.write(headers.concat('\r\n').join('\r\n'));
|
|
});
|
|
|
|
server.listen(() => {
|
|
const called = [];
|
|
const ws = new WebSocket(`ws://localhost:${server.address().port}`);
|
|
const duplex = WebSocket.createWebSocketStream(ws);
|
|
const final = duplex._final;
|
|
|
|
duplex._final = (callback) => {
|
|
called.push('final');
|
|
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
|
|
assert.strictEqual(ws._socket, null);
|
|
|
|
final(callback);
|
|
};
|
|
|
|
duplex.on('error', (err) => {
|
|
called.push('error');
|
|
assert.ok(err instanceof Error);
|
|
assert.strictEqual(
|
|
err.message,
|
|
'Invalid Sec-WebSocket-Accept header'
|
|
);
|
|
});
|
|
|
|
duplex.on('finish', () => {
|
|
called.push('finish');
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(called, ['final', 'error']);
|
|
server.close(done);
|
|
});
|
|
|
|
ws.on('upgrade', () => {
|
|
process.nextTick(() => {
|
|
duplex.end();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
|
|
it('reemits errors', (done) => {
|
|
let duplexCloseEventEmitted = false;
|
|
let serverClientCloseEventEmitted = false;
|
|
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('error', (err) => {
|
|
assert.ok(err instanceof RangeError);
|
|
assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE');
|
|
assert.strictEqual(
|
|
err.message,
|
|
'Invalid WebSocket frame: invalid opcode 5'
|
|
);
|
|
|
|
duplex.on('close', () => {
|
|
duplexCloseEventEmitted = true;
|
|
if (serverClientCloseEventEmitted) wss.close(done);
|
|
});
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws._socket.write(Buffer.from([0x85, 0x00]));
|
|
ws.on('close', (code, reason) => {
|
|
assert.strictEqual(code, 1002);
|
|
assert.deepStrictEqual(reason, EMPTY_BUFFER);
|
|
|
|
serverClientCloseEventEmitted = true;
|
|
if (duplexCloseEventEmitted) wss.close(done);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('does not swallow errors that may occur while destroying', (done) => {
|
|
const frame = Buffer.concat(
|
|
Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), {
|
|
fin: true,
|
|
rsv1: true,
|
|
opcode: 0x02,
|
|
mask: false,
|
|
readOnly: false
|
|
})
|
|
);
|
|
|
|
const wss = new WebSocket.Server(
|
|
{
|
|
perMessageDeflate: true,
|
|
port: 0
|
|
},
|
|
() => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('error', (err) => {
|
|
assert.ok(err instanceof Error);
|
|
assert.strictEqual(err.code, 'Z_DATA_ERROR');
|
|
assert.strictEqual(err.errno, -3);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
let bytesRead = 0;
|
|
|
|
ws.on('open', () => {
|
|
ws._socket.on('data', (chunk) => {
|
|
bytesRead += chunk.length;
|
|
if (bytesRead === frame.length) duplex.destroy();
|
|
});
|
|
});
|
|
}
|
|
);
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws._socket.write(frame);
|
|
});
|
|
});
|
|
|
|
it("does not suppress the throwing behavior of 'error' events", (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
createWebSocketStream(ws);
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws._socket.write(Buffer.from([0x85, 0x00]));
|
|
});
|
|
|
|
assert.strictEqual(process.listenerCount('uncaughtException'), 1);
|
|
|
|
const [listener] = process.listeners('uncaughtException');
|
|
|
|
process.removeAllListeners('uncaughtException');
|
|
process.once('uncaughtException', (err) => {
|
|
assert.ok(err instanceof Error);
|
|
assert.strictEqual(
|
|
err.message,
|
|
'Invalid WebSocket frame: invalid opcode 5'
|
|
);
|
|
|
|
process.on('uncaughtException', listener);
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const events = [];
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('end', () => {
|
|
events.push('end');
|
|
assert.ok(duplex.destroyed);
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(events, ['finish', 'end']);
|
|
wss.close(done);
|
|
});
|
|
|
|
duplex.on('finish', () => {
|
|
events.push('finish');
|
|
assert.ok(!duplex.destroyed);
|
|
assert.ok(duplex.readable);
|
|
|
|
duplex.resume();
|
|
});
|
|
|
|
ws.on('close', () => {
|
|
duplex.end();
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.send('foo');
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const events = [];
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('end', () => {
|
|
events.push('end');
|
|
assert.ok(!duplex.destroyed);
|
|
assert.ok(duplex.writable);
|
|
|
|
duplex.end();
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(events, ['end', 'finish']);
|
|
wss.close(done);
|
|
});
|
|
|
|
duplex.on('finish', () => {
|
|
events.push('finish');
|
|
});
|
|
|
|
duplex.resume();
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it('handles backpressure (1/3)', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
// eslint-disable-next-line no-unused-vars
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.resume();
|
|
|
|
duplex.on('drain', () => {
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
|
|
duplex.end();
|
|
});
|
|
|
|
const chunk = randomBytes(1024);
|
|
let ret;
|
|
|
|
do {
|
|
ret = duplex.write(chunk);
|
|
} while (ret !== false);
|
|
});
|
|
});
|
|
|
|
it('handles backpressure (2/3)', (done) => {
|
|
const wss = new WebSocket.Server(
|
|
{ port: 0, perMessageDeflate: true },
|
|
() => {
|
|
const called = [];
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
const read = duplex._read;
|
|
|
|
duplex._read = () => {
|
|
duplex._read = read;
|
|
called.push('read');
|
|
assert.ok(ws._receiver._writableState.needDrain);
|
|
read();
|
|
assert.ok(ws._socket.isPaused());
|
|
};
|
|
|
|
ws.on('open', () => {
|
|
ws._socket.on('pause', () => {
|
|
duplex.resume();
|
|
});
|
|
|
|
ws._receiver.on('drain', () => {
|
|
called.push('drain');
|
|
assert.ok(!ws._socket.isPaused());
|
|
duplex.end();
|
|
});
|
|
|
|
const opts = {
|
|
fin: true,
|
|
opcode: 0x02,
|
|
mask: false,
|
|
readOnly: false
|
|
};
|
|
|
|
const list = [
|
|
...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
|
|
...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
|
|
];
|
|
|
|
// This hack is used because there is no guarantee that more than
|
|
// 16 KiB will be sent as a single TCP packet.
|
|
ws._socket.push(Buffer.concat(list));
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(called, ['read', 'drain']);
|
|
wss.close(done);
|
|
});
|
|
}
|
|
);
|
|
});
|
|
|
|
it('handles backpressure (3/3)', (done) => {
|
|
const wss = new WebSocket.Server(
|
|
{ port: 0, perMessageDeflate: true },
|
|
() => {
|
|
const called = [];
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
const read = duplex._read;
|
|
|
|
duplex._read = () => {
|
|
called.push('read');
|
|
assert.ok(!ws._receiver._writableState.needDrain);
|
|
read();
|
|
assert.ok(!ws._socket.isPaused());
|
|
duplex.end();
|
|
};
|
|
|
|
ws.on('open', () => {
|
|
ws._receiver.on('drain', () => {
|
|
called.push('drain');
|
|
assert.ok(ws._socket.isPaused());
|
|
duplex.resume();
|
|
});
|
|
|
|
const opts = {
|
|
fin: true,
|
|
opcode: 0x02,
|
|
mask: false,
|
|
readOnly: false
|
|
};
|
|
|
|
const list = [
|
|
...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
|
|
...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
|
|
];
|
|
|
|
ws._socket.push(Buffer.concat(list));
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(called, ['drain', 'read']);
|
|
wss.close(done);
|
|
});
|
|
}
|
|
);
|
|
});
|
|
|
|
it('can be destroyed (1/2)', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const error = new Error('Oops');
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('error', (err) => {
|
|
assert.strictEqual(err, error);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
ws.on('open', () => {
|
|
duplex.destroy(error);
|
|
});
|
|
});
|
|
});
|
|
|
|
it('can be destroyed (2/2)', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
|
|
ws.on('open', () => {
|
|
duplex.destroy();
|
|
});
|
|
});
|
|
});
|
|
|
|
it('converts text messages to strings in readable object mode', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const events = [];
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws, { readableObjectMode: true });
|
|
|
|
duplex.on('data', (data) => {
|
|
events.push('data');
|
|
assert.strictEqual(data, 'foo');
|
|
});
|
|
|
|
duplex.on('end', () => {
|
|
events.push('end');
|
|
duplex.end();
|
|
});
|
|
|
|
duplex.on('close', () => {
|
|
assert.deepStrictEqual(events, ['data', 'end']);
|
|
wss.close(done);
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.send('foo');
|
|
ws.close();
|
|
});
|
|
});
|
|
|
|
it('resumes the socket if `readyState` is `CLOSING`', (done) => {
|
|
const wss = new WebSocket.Server({ port: 0 }, () => {
|
|
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
|
|
const duplex = createWebSocketStream(ws);
|
|
|
|
ws.on('message', () => {
|
|
assert.ok(ws._socket.isPaused());
|
|
|
|
duplex.on('close', () => {
|
|
wss.close(done);
|
|
});
|
|
|
|
duplex.end();
|
|
|
|
process.nextTick(() => {
|
|
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
|
|
duplex.resume();
|
|
});
|
|
});
|
|
});
|
|
|
|
wss.on('connection', (ws) => {
|
|
ws.send(randomBytes(16 * 1024));
|
|
});
|
|
});
|
|
});
|
|
});
|