From 26a029d407be480d791972afb5975cf62c9360a6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 02:47:55 +0200 Subject: Adding upstream version 124.0.1. Signed-off-by: Daniel Baumann --- devtools/shared/transport/child-transport.js | 128 ++++++ .../shared/transport/js-window-actor-transport.js | 66 +++ devtools/shared/transport/local-transport.js | 204 +++++++++ devtools/shared/transport/moz.build | 18 + devtools/shared/transport/packets.js | 440 ++++++++++++++++++ devtools/shared/transport/stream-utils.js | 254 +++++++++++ .../shared/transport/tests/xpcshell/.eslintrc.js | 6 + .../shared/transport/tests/xpcshell/head_dbg.js | 179 ++++++++ .../transport/tests/xpcshell/test_bulk_error.js | 94 ++++ .../tests/xpcshell/test_client_server_bulk.js | 312 +++++++++++++ .../transport/tests/xpcshell/test_dbgsocket.js | 163 +++++++ .../xpcshell/test_dbgsocket_connection_drop.js | 86 ++++ .../tests/xpcshell/test_delimited_read.js | 30 ++ .../shared/transport/tests/xpcshell/test_packet.js | 24 + .../shared/transport/tests/xpcshell/test_queue.js | 198 ++++++++ .../tests/xpcshell/test_transport_bulk.js | 169 +++++++ .../shared/transport/tests/xpcshell/testactors.js | 16 + .../shared/transport/tests/xpcshell/xpcshell.toml | 22 + devtools/shared/transport/transport.js | 499 +++++++++++++++++++++ devtools/shared/transport/websocket-transport.js | 86 ++++ devtools/shared/transport/worker-transport.js | 113 +++++ 21 files changed, 3107 insertions(+) create mode 100644 devtools/shared/transport/child-transport.js create mode 100644 devtools/shared/transport/js-window-actor-transport.js create mode 100644 devtools/shared/transport/local-transport.js create mode 100644 devtools/shared/transport/moz.build create mode 100644 devtools/shared/transport/packets.js create mode 100644 devtools/shared/transport/stream-utils.js create mode 100644 devtools/shared/transport/tests/xpcshell/.eslintrc.js create mode 100644 devtools/shared/transport/tests/xpcshell/head_dbg.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_bulk_error.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_dbgsocket.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_delimited_read.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_packet.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_queue.js create mode 100644 devtools/shared/transport/tests/xpcshell/test_transport_bulk.js create mode 100644 devtools/shared/transport/tests/xpcshell/testactors.js create mode 100644 devtools/shared/transport/tests/xpcshell/xpcshell.toml create mode 100644 devtools/shared/transport/transport.js create mode 100644 devtools/shared/transport/websocket-transport.js create mode 100644 devtools/shared/transport/worker-transport.js (limited to 'devtools/shared/transport') diff --git a/devtools/shared/transport/child-transport.js b/devtools/shared/transport/child-transport.js new file mode 100644 index 0000000000..52511432e5 --- /dev/null +++ b/devtools/shared/transport/child-transport.js @@ -0,0 +1,128 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const flags = require("resource://devtools/shared/flags.js"); + +/** + * A transport for the debugging protocol that uses nsIMessageManagers to + * exchange packets with servers running in child processes. + * + * In the parent process, |mm| should be the nsIMessageSender for the + * child process. In a child process, |mm| should be the child process + * message manager, which sends packets to the parent. + * + * |prefix| is a string included in the message names, to distinguish + * multiple servers running in the same child process. + * + * This transport exchanges messages named 'debug::packet', where + * is |prefix|, whose data is the protocol packet. + */ +function ChildDebuggerTransport(mm, prefix) { + this._mm = mm; + this._messageName = "debug:" + prefix + ":packet"; +} + +/* + * To avoid confusion, we use 'message' to mean something that + * nsIMessageSender conveys, and 'packet' to mean a remote debugging + * protocol packet. + */ +ChildDebuggerTransport.prototype = { + constructor: ChildDebuggerTransport, + + hooks: null, + + _addListener() { + this._mm.addMessageListener(this._messageName, this); + }, + + _removeListener() { + try { + this._mm.removeMessageListener(this._messageName, this); + } catch (e) { + if (e.result != Cr.NS_ERROR_NULL_POINTER) { + throw e; + } + // In some cases, especially when using messageManagers in non-e10s mode, we reach + // this point with a dead messageManager which only throws errors but does not + // seem to indicate in any other way that it is dead. + } + }, + + ready() { + this._addListener(); + }, + + close(options) { + this._removeListener(); + if (this.hooks.onTransportClosed) { + this.hooks.onTransportClosed(null, options); + } + }, + + receiveMessage({ data }) { + this.hooks.onPacket(data); + }, + + /** + * Helper method to ensure a given `object` can be sent across message manager + * without being serialized to JSON. + * See https://searchfox.org/mozilla-central/rev/6bfadf95b4a6aaa8bb3b2a166d6c3545983e179a/dom/base/nsFrameMessageManager.cpp#458-469 + */ + _canBeSerialized(object) { + try { + const holder = new StructuredCloneHolder( + "ChildDebuggerTransport._canBeSerialized", + null, + object + ); + holder.deserialize(this); + } catch (e) { + return false; + } + return true; + }, + + pathToUnserializable(object) { + for (const key in object) { + const value = object[key]; + if (!this._canBeSerialized(value)) { + if (typeof value == "object") { + return [key].concat(this.pathToUnserializable(value)); + } + return [key]; + } + } + return []; + }, + + send(packet) { + if (flags.testing && !this._canBeSerialized(packet)) { + const attributes = this.pathToUnserializable(packet); + let msg = + "Following packet can't be serialized: " + JSON.stringify(packet); + msg += "\nBecause of attributes: " + attributes.join(", ") + "\n"; + msg += "Did you pass a function or an XPCOM object in it?"; + throw new Error(msg); + } + try { + this._mm.sendAsyncMessage(this._messageName, packet); + } catch (e) { + if (e.result != Cr.NS_ERROR_NULL_POINTER) { + throw e; + } + // In some cases, especially when using messageManagers in non-e10s mode, we reach + // this point with a dead messageManager which only throws errors but does not + // seem to indicate in any other way that it is dead. + } + }, + + startBulkSend() { + throw new Error("Can't send bulk data to child processes."); + }, +}; + +exports.ChildDebuggerTransport = ChildDebuggerTransport; diff --git a/devtools/shared/transport/js-window-actor-transport.js b/devtools/shared/transport/js-window-actor-transport.js new file mode 100644 index 0000000000..9f0fb82497 --- /dev/null +++ b/devtools/shared/transport/js-window-actor-transport.js @@ -0,0 +1,66 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +/** + * DevTools transport relying on JS Window Actors. This is an experimental + * transport. It is only used when using the JS Window Actor based frame + * connector. In that case this transport will be used to communicate between + * the DevToolsServer living in the parent process and the DevToolsServer + * living in the process of the target frame. + * + * This is intended to be a replacement for child-transport.js which is a + * message-manager based transport. + */ +class JsWindowActorTransport { + constructor(jsWindowActor, prefix) { + this.hooks = null; + this._jsWindowActor = jsWindowActor; + this._prefix = prefix; + + this._onPacketReceived = this._onPacketReceived.bind(this); + } + + _addListener() { + this._jsWindowActor.on("packet-received", this._onPacketReceived); + } + + _removeListener() { + this._jsWindowActor.off("packet-received", this._onPacketReceived); + } + + ready() { + this._addListener(); + } + + /** + * @param {object} options + * @param {boolean} options.isModeSwitching + * true when this is called as the result of a change to the devtools.browsertoolbox.scope pref + */ + close(options) { + this._removeListener(); + if (this.hooks.onTransportClosed) { + this.hooks.onTransportClosed(null, options); + } + } + + _onPacketReceived(eventName, { data }) { + const { prefix, packet } = data; + if (prefix === this._prefix) { + this.hooks.onPacket(packet); + } + } + + send(packet) { + this._jsWindowActor.sendPacket(packet, this._prefix); + } + + startBulkSend() { + throw new Error("startBulkSend not implemented for JsWindowActorTransport"); + } +} + +exports.JsWindowActorTransport = JsWindowActorTransport; diff --git a/devtools/shared/transport/local-transport.js b/devtools/shared/transport/local-transport.js new file mode 100644 index 0000000000..bff83b7666 --- /dev/null +++ b/devtools/shared/transport/local-transport.js @@ -0,0 +1,204 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); +const { dumpn } = DevToolsUtils; +const flags = require("resource://devtools/shared/flags.js"); +const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js"); + +loader.lazyGetter(this, "Pipe", () => { + return Components.Constructor("@mozilla.org/pipe;1", "nsIPipe", "init"); +}); + +/** + * An adapter that handles data transfers between the devtools client and + * server when they both run in the same process. It presents the same API as + * DebuggerTransport, but instead of transmitting serialized messages across a + * connection it merely calls the packet dispatcher of the other side. + * + * @param other LocalDebuggerTransport + * The other endpoint for this debugger connection. + * + * @see DebuggerTransport + */ +function LocalDebuggerTransport(other) { + this.other = other; + this.hooks = null; + + // A packet number, shared between this and this.other. This isn't used by the + // protocol at all, but it makes the packet traces a lot easier to follow. + this._serial = this.other ? this.other._serial : { count: 0 }; + this.close = this.close.bind(this); +} + +LocalDebuggerTransport.prototype = { + /** + * Transmit a message by directly calling the onPacket handler of the other + * endpoint. + */ + send(packet) { + const serial = this._serial.count++; + if (flags.wantLogging) { + // Check 'from' first, as 'echo' packets have both. + if (packet.from) { + dumpn("Packet " + serial + " sent from " + JSON.stringify(packet.from)); + } else if (packet.to) { + dumpn("Packet " + serial + " sent to " + JSON.stringify(packet.to)); + } + } + this._deepFreeze(packet); + const other = this.other; + if (other) { + DevToolsUtils.executeSoon( + DevToolsUtils.makeInfallible(() => { + // Avoid the cost of JSON.stringify() when logging is disabled. + if (flags.wantLogging) { + dumpn( + "Received packet " + + serial + + ": " + + JSON.stringify(packet, null, 2) + ); + } + if (other.hooks) { + other.hooks.onPacket(packet); + } + }, "LocalDebuggerTransport instance's this.other.hooks.onPacket") + ); + } + }, + + /** + * Send a streaming bulk packet directly to the onBulkPacket handler of the + * other endpoint. + * + * This case is much simpler than the full DebuggerTransport, since there is + * no primary stream we have to worry about managing while we hand it off to + * others temporarily. Instead, we can just make a single use pipe and be + * done with it. + */ + startBulkSend({ actor, type, length }) { + const serial = this._serial.count++; + + dumpn("Sent bulk packet " + serial + " for actor " + actor); + if (!this.other) { + const error = new Error("startBulkSend: other side of transport missing"); + return Promise.reject(error); + } + + const pipe = new Pipe(true, true, 0, 0, null); + + DevToolsUtils.executeSoon( + DevToolsUtils.makeInfallible(() => { + dumpn("Received bulk packet " + serial); + if (!this.other.hooks) { + return; + } + + // Receiver + new Promise(receiverResolve => { + const packet = { + actor, + type, + length, + copyTo: output => { + const copying = StreamUtils.copyStream( + pipe.inputStream, + output, + length + ); + receiverResolve(copying); + return copying; + }, + stream: pipe.inputStream, + done: receiverResolve, + }; + + this.other.hooks.onBulkPacket(packet); + }) + // Await the result of reading from the stream + .then(() => pipe.inputStream.close(), this.close); + }, "LocalDebuggerTransport instance's this.other.hooks.onBulkPacket") + ); + + // Sender + return new Promise(senderResolve => { + // The remote transport is not capable of resolving immediately here, so we + // shouldn't be able to either. + DevToolsUtils.executeSoon(() => { + return ( + new Promise(copyResolve => { + senderResolve({ + copyFrom: input => { + const copying = StreamUtils.copyStream( + input, + pipe.outputStream, + length + ); + copyResolve(copying); + return copying; + }, + stream: pipe.outputStream, + done: copyResolve, + }); + }) + // Await the result of writing to the stream + .then(() => pipe.outputStream.close(), this.close) + ); + }); + }); + }, + + /** + * Close the transport. + */ + close() { + if (this.other) { + // Remove the reference to the other endpoint before calling close(), to + // avoid infinite recursion. + const other = this.other; + this.other = null; + other.close(); + } + if (this.hooks) { + try { + if (this.hooks.onTransportClosed) { + this.hooks.onTransportClosed(); + } + } catch (ex) { + console.error(ex); + } + this.hooks = null; + } + }, + + /** + * An empty method for emulating the DebuggerTransport API. + */ + ready() {}, + + /** + * Helper function that makes an object fully immutable. + */ + _deepFreeze(object) { + Object.freeze(object); + for (const prop in object) { + // Freeze the properties that are objects, not on the prototype, and not + // already frozen. Note that this might leave an unfrozen reference + // somewhere in the object if there is an already frozen object containing + // an unfrozen object. + if ( + object.hasOwnProperty(prop) && + typeof object === "object" && + !Object.isFrozen(object) + ) { + this._deepFreeze(object[prop]); + } + } + }, +}; + +exports.LocalDebuggerTransport = LocalDebuggerTransport; diff --git a/devtools/shared/transport/moz.build b/devtools/shared/transport/moz.build new file mode 100644 index 0000000000..8b1527d5ca --- /dev/null +++ b/devtools/shared/transport/moz.build @@ -0,0 +1,18 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +XPCSHELL_TESTS_MANIFESTS += ["tests/xpcshell/xpcshell.toml"] + +DevToolsModules( + "child-transport.js", + "js-window-actor-transport.js", + "local-transport.js", + "packets.js", + "stream-utils.js", + "transport.js", + "websocket-transport.js", + "worker-transport.js", +) diff --git a/devtools/shared/transport/packets.js b/devtools/shared/transport/packets.js new file mode 100644 index 0000000000..9f9409a123 --- /dev/null +++ b/devtools/shared/transport/packets.js @@ -0,0 +1,440 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +/** + * Packets contain read / write functionality for the different packet types + * supported by the debugging protocol, so that a transport can focus on + * delivery and queue management without worrying too much about the specific + * packet types. + * + * They are intended to be "one use only", so a new packet should be + * instantiated for each incoming or outgoing packet. + * + * A complete Packet type should expose at least the following: + * * read(stream, scriptableStream) + * Called when the input stream has data to read + * * write(stream) + * Called when the output stream is ready to write + * * get done() + * Returns true once the packet is done being read / written + * * destroy() + * Called to clean up at the end of use + */ + +const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); +const { dumpn, dumpv } = DevToolsUtils; +const flags = require("resource://devtools/shared/flags.js"); +const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js"); + +DevToolsUtils.defineLazyGetter(this, "unicodeConverter", () => { + // eslint-disable-next-line no-shadow + const unicodeConverter = Cc[ + "@mozilla.org/intl/scriptableunicodeconverter" + ].createInstance(Ci.nsIScriptableUnicodeConverter); + unicodeConverter.charset = "UTF-8"; + return unicodeConverter; +}); + +// The transport's previous check ensured the header length did not exceed 20 +// characters. Here, we opt for the somewhat smaller, but still large limit of +// 1 TiB. +const PACKET_LENGTH_MAX = Math.pow(2, 40); + +/** + * A generic Packet processing object (extended by two subtypes below). + */ +function Packet(transport) { + this._transport = transport; + this._length = 0; +} + +/** + * Attempt to initialize a new Packet based on the incoming packet header we've + * received so far. We try each of the types in succession, trying JSON packets + * first since they are much more common. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return Packet + * The parsed packet of the matching type, or null if no types matched. + */ +Packet.fromHeader = function (header, transport) { + return ( + JSONPacket.fromHeader(header, transport) || + BulkPacket.fromHeader(header, transport) + ); +}; + +Packet.prototype = { + get length() { + return this._length; + }, + + set length(length) { + if (length > PACKET_LENGTH_MAX) { + throw Error( + "Packet length " + + length + + " exceeds the max length of " + + PACKET_LENGTH_MAX + ); + } + this._length = length; + }, + + destroy() { + this._transport = null; + }, +}; + +exports.Packet = Packet; + +/** + * With a JSON packet (the typical packet type sent via the transport), data is + * transferred as a JSON packet serialized into a string, with the string length + * prepended to the packet, followed by a colon ([length]:[packet]). The + * contents of the JSON packet are specified in the Remote Debugging Protocol + * specification. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + */ +function JSONPacket(transport) { + Packet.call(this, transport); + this._data = ""; + this._done = false; +} + +/** + * Attempt to initialize a new JSONPacket based on the incoming packet header + * we've received so far. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return JSONPacket + * The parsed packet, or null if it's not a match. + */ +JSONPacket.fromHeader = function (header, transport) { + const match = this.HEADER_PATTERN.exec(header); + + if (!match) { + return null; + } + + dumpv("Header matches JSON packet"); + const packet = new JSONPacket(transport); + packet.length = +match[1]; + return packet; +}; + +JSONPacket.HEADER_PATTERN = /^(\d+):$/; + +JSONPacket.prototype = Object.create(Packet.prototype); + +Object.defineProperty(JSONPacket.prototype, "object", { + /** + * Gets the object (not the serialized string) being read or written. + */ + get() { + return this._object; + }, + + /** + * Sets the object to be sent when write() is called. + */ + set(object) { + this._object = object; + const data = JSON.stringify(object); + this._data = unicodeConverter.ConvertFromUnicode(data); + this.length = this._data.length; + }, +}); + +JSONPacket.prototype.read = function (stream, scriptableStream) { + dumpv("Reading JSON packet"); + + // Read in more packet data. + this._readData(stream, scriptableStream); + + if (!this.done) { + // Don't have a complete packet yet. + return; + } + + let json = this._data; + try { + json = unicodeConverter.ConvertToUnicode(json); + this._object = JSON.parse(json); + } catch (e) { + const msg = + "Error parsing incoming packet: " + + json + + " (" + + e + + " - " + + e.stack + + ")"; + console.error(msg); + dumpn(msg); + return; + } + + this._transport._onJSONObjectReady(this._object); +}; + +JSONPacket.prototype._readData = function (stream, scriptableStream) { + if (flags.wantVerbose) { + dumpv( + "Reading JSON data: _l: " + + this.length + + " dL: " + + this._data.length + + " sA: " + + stream.available() + ); + } + const bytesToRead = Math.min( + this.length - this._data.length, + stream.available() + ); + this._data += scriptableStream.readBytes(bytesToRead); + this._done = this._data.length === this.length; +}; + +JSONPacket.prototype.write = function (stream) { + dumpv("Writing JSON packet"); + + if (this._outgoing === undefined) { + // Format the serialized packet to a buffer + this._outgoing = this.length + ":" + this._data; + } + + const written = stream.write(this._outgoing, this._outgoing.length); + this._outgoing = this._outgoing.slice(written); + this._done = !this._outgoing.length; +}; + +Object.defineProperty(JSONPacket.prototype, "done", { + get() { + return this._done; + }, +}); + +JSONPacket.prototype.toString = function () { + return JSON.stringify(this._object, null, 2); +}; + +exports.JSONPacket = JSONPacket; + +/** + * With a bulk packet, data is transferred by temporarily handing over the + * transport's input or output stream to the application layer for writing data + * directly. This can be much faster for large data sets, and avoids various + * stages of copies and data duplication inherent in the JSON packet type. The + * bulk packet looks like: + * + * bulk [actor] [type] [length]:[data] + * + * The interpretation of the data portion depends on the kind of actor and the + * packet's type. See the Remote Debugging Protocol Stream Transport spec for + * more details. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + */ +function BulkPacket(transport) { + Packet.call(this, transport); + this._done = false; + let _resolve; + this._readyForWriting = new Promise(resolve => { + _resolve = resolve; + }); + this._readyForWriting.resolve = _resolve; +} + +/** + * Attempt to initialize a new BulkPacket based on the incoming packet header + * we've received so far. + * @param header string + * The packet header string to attempt parsing. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @return BulkPacket + * The parsed packet, or null if it's not a match. + */ +BulkPacket.fromHeader = function (header, transport) { + const match = this.HEADER_PATTERN.exec(header); + + if (!match) { + return null; + } + + dumpv("Header matches bulk packet"); + const packet = new BulkPacket(transport); + packet.header = { + actor: match[1], + type: match[2], + length: +match[3], + }; + return packet; +}; + +BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/; + +BulkPacket.prototype = Object.create(Packet.prototype); + +BulkPacket.prototype.read = function (stream) { + dumpv("Reading bulk packet, handing off input stream"); + + // Temporarily pause monitoring of the input stream + this._transport.pauseIncoming(); + + new Promise(resolve => { + this._transport._onBulkReadReady({ + actor: this.actor, + type: this.type, + length: this.length, + copyTo: output => { + dumpv("CT length: " + this.length); + const copying = StreamUtils.copyStream(stream, output, this.length); + resolve(copying); + return copying; + }, + stream, + done: resolve, + }); + // Await the result of reading from the stream + }).then(() => { + dumpv("onReadDone called, ending bulk mode"); + this._done = true; + this._transport.resumeIncoming(); + }, this._transport.close); + + // Ensure this is only done once + this.read = () => { + throw new Error("Tried to read() a BulkPacket's stream multiple times."); + }; +}; + +BulkPacket.prototype.write = function (stream) { + dumpv("Writing bulk packet"); + + if (this._outgoingHeader === undefined) { + dumpv("Serializing bulk packet header"); + // Format the serialized packet header to a buffer + this._outgoingHeader = + "bulk " + this.actor + " " + this.type + " " + this.length + ":"; + } + + // Write the header, or whatever's left of it to write. + if (this._outgoingHeader.length) { + dumpv("Writing bulk packet header"); + const written = stream.write( + this._outgoingHeader, + this._outgoingHeader.length + ); + this._outgoingHeader = this._outgoingHeader.slice(written); + return; + } + + dumpv("Handing off output stream"); + + // Temporarily pause the monitoring of the output stream + this._transport.pauseOutgoing(); + + new Promise(resolve => { + this._readyForWriting.resolve({ + copyFrom: input => { + dumpv("CF length: " + this.length); + const copying = StreamUtils.copyStream(input, stream, this.length); + resolve(copying); + return copying; + }, + stream, + done: resolve, + }); + // Await the result of writing to the stream + }).then(() => { + dumpv("onWriteDone called, ending bulk mode"); + this._done = true; + this._transport.resumeOutgoing(); + }, this._transport.close); + + // Ensure this is only done once + this.write = () => { + throw new Error("Tried to write() a BulkPacket's stream multiple times."); + }; +}; + +Object.defineProperty(BulkPacket.prototype, "streamReadyForWriting", { + get() { + return this._readyForWriting; + }, +}); + +Object.defineProperty(BulkPacket.prototype, "header", { + get() { + return { + actor: this.actor, + type: this.type, + length: this.length, + }; + }, + + set(header) { + this.actor = header.actor; + this.type = header.type; + this.length = header.length; + }, +}); + +Object.defineProperty(BulkPacket.prototype, "done", { + get() { + return this._done; + }, +}); + +BulkPacket.prototype.toString = function () { + return "Bulk: " + JSON.stringify(this.header, null, 2); +}; + +exports.BulkPacket = BulkPacket; + +/** + * RawPacket is used to test the transport's error handling of malformed + * packets, by writing data directly onto the stream. + * @param transport DebuggerTransport + * The transport instance that will own the packet. + * @param data string + * The raw string to send out onto the stream. + */ +function RawPacket(transport, data) { + Packet.call(this, transport); + this._data = data; + this.length = data.length; + this._done = false; +} + +RawPacket.prototype = Object.create(Packet.prototype); + +RawPacket.prototype.read = function (stream) { + // This hasn't yet been needed for testing. + throw Error("Not implmented."); +}; + +RawPacket.prototype.write = function (stream) { + const written = stream.write(this._data, this._data.length); + this._data = this._data.slice(written); + this._done = !this._data.length; +}; + +Object.defineProperty(RawPacket.prototype, "done", { + get() { + return this._done; + }, +}); + +exports.RawPacket = RawPacket; diff --git a/devtools/shared/transport/stream-utils.js b/devtools/shared/transport/stream-utils.js new file mode 100644 index 0000000000..a3efd1d2aa --- /dev/null +++ b/devtools/shared/transport/stream-utils.js @@ -0,0 +1,254 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); +const { dumpv } = DevToolsUtils; +const EventEmitter = require("resource://devtools/shared/event-emitter.js"); + +DevToolsUtils.defineLazyGetter(this, "IOUtil", () => { + return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil); +}); + +DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => { + return Components.Constructor( + "@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", + "init" + ); +}); + +const BUFFER_SIZE = 0x8000; + +/** + * This helper function (and its companion object) are used by bulk senders and + * receivers to read and write data in and out of other streams. Functions that + * make use of this tool are passed to callers when it is time to read or write + * bulk data. It is highly recommended to use these copier functions instead of + * the stream directly because the copier enforces the agreed upon length. + * Since bulk mode reuses an existing stream, the sender and receiver must write + * and read exactly the agreed upon amount of data, or else the entire transport + * will be left in a invalid state. Additionally, other methods of stream + * copying (such as NetUtil.asyncCopy) close the streams involved, which would + * terminate the debugging transport, and so it is avoided here. + * + * Overall, this *works*, but clearly the optimal solution would be able to just + * use the streams directly. If it were possible to fully implement + * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to + * enforce the length and avoid closing, and consumers could use familiar stream + * utilities like NetUtil.asyncCopy. + * + * The function takes two async streams and copies a precise number of bytes + * from one to the other. Copying begins immediately, but may complete at some + * future time depending on data size. Use the returned promise to know when + * it's complete. + * + * @param input nsIAsyncInputStream + * The stream to copy from. + * @param output nsIAsyncOutputStream + * The stream to copy to. + * @param length Integer + * The amount of data that needs to be copied. + * @return Promise + * The promise is resolved when copying completes or rejected if any + * (unexpected) errors occur. + */ +function copyStream(input, output, length) { + const copier = new StreamCopier(input, output, length); + return copier.copy(); +} + +function StreamCopier(input, output, length) { + EventEmitter.decorate(this); + this._id = StreamCopier._nextId++; + this.input = input; + // Save off the base output stream, since we know it's async as we've required + this.baseAsyncOutput = output; + if (IOUtil.outputStreamIsBuffered(output)) { + this.output = output; + } else { + this.output = Cc[ + "@mozilla.org/network/buffered-output-stream;1" + ].createInstance(Ci.nsIBufferedOutputStream); + this.output.init(output, BUFFER_SIZE); + } + this._length = length; + this._amountLeft = length; + let _resolve; + let _reject; + this._deferred = new Promise((resolve, reject) => { + _resolve = resolve; + _reject = reject; + }); + this._deferred.resolve = _resolve; + this._deferred.reject = _reject; + + this._copy = this._copy.bind(this); + this._flush = this._flush.bind(this); + this._destroy = this._destroy.bind(this); + + // Copy promise's then method up to this object. + // Allows the copier to offer a promise interface for the simple succeed or + // fail scenarios, but also emit events (due to the EventEmitter) for other + // states, like progress. + this.then = this._deferred.then.bind(this._deferred); + this.then(this._destroy, this._destroy); + + // Stream ready callback starts as |_copy|, but may switch to |_flush| at end + // if flushing would block the output stream. + this._streamReadyCallback = this._copy; +} +StreamCopier._nextId = 0; + +StreamCopier.prototype = { + copy() { + // Dispatch to the next tick so that it's possible to attach a progress + // event listener, even for extremely fast copies (like when testing). + Services.tm.dispatchToMainThread(() => { + try { + this._copy(); + } catch (e) { + this._deferred.reject(e); + } + }); + return this; + }, + + _copy() { + const bytesAvailable = this.input.available(); + const amountToCopy = Math.min(bytesAvailable, this._amountLeft); + this._debug("Trying to copy: " + amountToCopy); + + let bytesCopied; + try { + bytesCopied = this.output.writeFrom(this.input, amountToCopy); + } catch (e) { + if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this._debug("Base stream would block, will retry"); + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } + throw e; + } + + this._amountLeft -= bytesCopied; + this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft); + this._emitProgress(); + + if (this._amountLeft === 0) { + this._debug("Copy done!"); + this._flush(); + return; + } + + this._debug("Waiting for input stream"); + this.input.asyncWait(this, 0, 0, Services.tm.currentThread); + }, + + _emitProgress() { + this.emit("progress", { + bytesSent: this._length - this._amountLeft, + totalBytes: this._length, + }); + }, + + _flush() { + try { + this.output.flush(); + } catch (e) { + if ( + e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK || + e.result == Cr.NS_ERROR_FAILURE + ) { + this._debug("Flush would block, will retry"); + this._streamReadyCallback = this._flush; + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } + throw e; + } + this._deferred.resolve(); + }, + + _destroy() { + this._destroy = null; + this._copy = null; + this._flush = null; + this.input = null; + this.output = null; + }, + + // nsIInputStreamCallback + onInputStreamReady() { + this._streamReadyCallback(); + }, + + // nsIOutputStreamCallback + onOutputStreamReady() { + this._streamReadyCallback(); + }, + + _debug(msg) { + // Prefix logs with the copier ID, which makes logs much easier to + // understand when several copiers are running simultaneously + dumpv("Copier: " + this._id + " " + msg); + }, +}; + +/** + * Read from a stream, one byte at a time, up to the next |delimiter| + * character, but stopping if we've read |count| without finding it. Reading + * also terminates early if there are less than |count| bytes available on the + * stream. In that case, we only read as many bytes as the stream currently has + * to offer. + * TODO: This implementation could be removed if bug 984651 is fixed, which + * provides a native version of the same idea. + * @param stream nsIInputStream + * The input stream to read from. + * @param delimiter string + * The character we're trying to find. + * @param count integer + * The max number of characters to read while searching. + * @return string + * The data collected. If the delimiter was found, this string will + * end with it. + */ +function delimitedRead(stream, delimiter, count) { + dumpv( + "Starting delimited read for " + delimiter + " up to " + count + " bytes" + ); + + let scriptableStream; + if (stream instanceof Ci.nsIScriptableInputStream) { + scriptableStream = stream; + } else { + scriptableStream = new ScriptableInputStream(stream); + } + + let data = ""; + + // Don't exceed what's available on the stream + count = Math.min(count, stream.available()); + + if (count <= 0) { + return data; + } + + let char; + while (char !== delimiter && count > 0) { + char = scriptableStream.readBytes(1); + count--; + data += char; + } + + return data; +} + +module.exports = { + copyStream, + delimitedRead, +}; diff --git a/devtools/shared/transport/tests/xpcshell/.eslintrc.js b/devtools/shared/transport/tests/xpcshell/.eslintrc.js new file mode 100644 index 0000000000..8611c174f5 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/.eslintrc.js @@ -0,0 +1,6 @@ +"use strict"; + +module.exports = { + // Extend from the common devtools xpcshell eslintrc config. + extends: "../../../../.eslintrc.xpcshell.js", +}; diff --git a/devtools/shared/transport/tests/xpcshell/head_dbg.js b/devtools/shared/transport/tests/xpcshell/head_dbg.js new file mode 100644 index 0000000000..18e235f17e --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/head_dbg.js @@ -0,0 +1,179 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +/* exported Cr, CC, NetUtil, errorCount, initTestDevToolsServer, + writeTestTempFile, socket_transport, local_transport, really_long +*/ + +var CC = Components.Constructor; + +const { require } = ChromeUtils.importESModule( + "resource://devtools/shared/loader/Loader.sys.mjs" +); +const { NetUtil } = ChromeUtils.importESModule( + "resource://gre/modules/NetUtil.sys.mjs" +); + +// We do not want to log packets by default, because in some tests, +// we can be sending large amounts of data. The test harness has +// trouble dealing with logging all the data, and we end up with +// intermittent time outs (e.g. bug 775924). +// Services.prefs.setBoolPref("devtools.debugger.log", true); +// Services.prefs.setBoolPref("devtools.debugger.log.verbose", true); +// Enable remote debugging for the relevant tests. +Services.prefs.setBoolPref("devtools.debugger.remote-enabled", true); + +const { + ActorRegistry, +} = require("resource://devtools/server/actors/utils/actor-registry.js"); +const { + DevToolsServer, +} = require("resource://devtools/server/devtools-server.js"); +const { + DevToolsClient, +} = require("resource://devtools/client/devtools-client.js"); +const { + SocketListener, +} = require("resource://devtools/shared/security/socket.js"); + +// Convert an nsIScriptError 'logLevel' value into an appropriate string. +function scriptErrorLogLevel(message) { + switch (message.logLevel) { + case Ci.nsIConsoleMessage.info: + return "info"; + case Ci.nsIConsoleMessage.warn: + return "warning"; + default: + Assert.equal(message.logLevel, Ci.nsIConsoleMessage.error); + return "error"; + } +} + +// Register a console listener, so console messages don't just disappear +// into the ether. +var errorCount = 0; +var listener = { + observe(message) { + errorCount++; + let string = ""; + try { + // If we've been given an nsIScriptError, then we can print out + // something nicely formatted, for tools like Emacs to pick up. + message.QueryInterface(Ci.nsIScriptError); + dump( + message.sourceName + + ":" + + message.lineNumber + + ": " + + scriptErrorLogLevel(message) + + ": " + + message.errorMessage + + "\n" + ); + string = message.errorMessage; + } catch (x) { + // Be a little paranoid with message, as the whole goal here is to lose + // no information. + try { + string = message.message; + } catch (e) { + string = ""; + } + } + + // Make sure we exit all nested event loops so that the test can finish. + while (DevToolsServer.xpcInspector.eventLoopNestLevel > 0) { + DevToolsServer.xpcInspector.exitNestedEventLoop(); + } + + do_throw("head_dbg.js got console message: " + string + "\n"); + }, +}; + +Services.console.registerListener(listener); + +/** + * Initialize the testing devtools server. + */ +function initTestDevToolsServer() { + ActorRegistry.registerModule("devtools/server/actors/thread", { + prefix: "script", + constructor: "ScriptActor", + type: { global: true, target: true }, + }); + const { createRootActor } = require("xpcshell-test/testactors"); + DevToolsServer.setRootActor(createRootActor); + // Allow incoming connections. + DevToolsServer.init(); + // Avoid the server from being destroyed when the last connection closes + DevToolsServer.keepAlive = true; +} + +/** + * Wrapper around do_get_file to prefix files with the name of current test to + * avoid collisions when running in parallel. + */ +function getTestTempFile(fileName, allowMissing) { + let thisTest = _TEST_FILE.toString().replace(/\\/g, "/"); + thisTest = thisTest.substring(thisTest.lastIndexOf("/") + 1); + thisTest = thisTest.replace(/\..*$/, ""); + return do_get_file(fileName + "-" + thisTest, allowMissing); +} + +function writeTestTempFile(fileName, content) { + const file = getTestTempFile(fileName, true); + const stream = Cc["@mozilla.org/network/file-output-stream;1"].createInstance( + Ci.nsIFileOutputStream + ); + stream.init(file, -1, -1, 0); + try { + do { + const numWritten = stream.write(content, content.length); + content = content.slice(numWritten); + } while (content.length); + } finally { + stream.close(); + } +} + +/** * Transport Factories ***/ + +var socket_transport = async function () { + if (!DevToolsServer.listeningSockets) { + const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT"); + const authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DevToolsServer.AuthenticationResult.ALLOW; + }; + const socketOptions = { + authenticator, + portOrPath: -1, + }; + const debuggerListener = new SocketListener(DevToolsServer, socketOptions); + await debuggerListener.open(); + } + const port = DevToolsServer._listeners[0].port; + info("DevTools server port is " + port); + return DevToolsClient.socketConnect({ host: "127.0.0.1", port }); +}; + +function local_transport() { + return Promise.resolve(DevToolsServer.connectPipe()); +} + +/** * Sample Data ***/ + +var gReallyLong; +function really_long() { + if (gReallyLong) { + return gReallyLong; + } + let ret = "0123456789"; + for (let i = 0; i < 18; i++) { + ret += ret; + } + gReallyLong = ret; + return ret; +} diff --git a/devtools/shared/transport/tests/xpcshell/test_bulk_error.js b/devtools/shared/transport/tests/xpcshell/test_bulk_error.js new file mode 100644 index 0000000000..52cc826e51 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_bulk_error.js @@ -0,0 +1,94 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +function run_test() { + initTestDevToolsServer(); + add_test_bulk_actor(); + + add_task(async function () { + await test_string_error(socket_transport, json_reply); + await test_string_error(local_transport, json_reply); + DevToolsServer.destroy(); + }); + + run_next_test(); +} + +/** * Sample Bulk Actor ***/ +const { Actor } = require("resource://devtools/shared/protocol/Actor.js"); +class TestBulkActor extends Actor { + constructor(conn) { + super(conn); + + this.typeName = "testBulk"; + this.requestTypes = { + jsonReply: this.jsonReply, + }; + } + + jsonReply({ length, reader, reply, done }) { + Assert.equal(length, really_long().length); + + return { + allDone: true, + }; + } +} + +function add_test_bulk_actor() { + ActorRegistry.addGlobalActor( + { + constructorName: "TestBulkActor", + constructorFun: TestBulkActor, + }, + "testBulk" + ); +} + +/** * Tests ***/ + +var test_string_error = async function (transportFactory, onReady) { + const transport = await transportFactory(); + + const client = new DevToolsClient(transport); + await client.connect(); + const response = await client.mainRoot.rootForm; + + await onReady(client, response); + client.close(); + transport.close(); +}; + +/** * Reply Types ***/ + +function json_reply(client, response) { + const reallyLong = really_long(); + + const request = client.startBulkRequest({ + actor: response.testBulk, + type: "jsonReply", + length: reallyLong.length, + }); + + // Send bulk data to server + return new Promise(resolve => { + request.on("bulk-send-ready", ({ writer, done }) => { + const input = Cc["@mozilla.org/io/string-input-stream;1"].createInstance( + Ci.nsIStringInputStream + ); + input.setData(reallyLong, reallyLong.length); + try { + writer.copyFrom(input, () => { + input.close(); + done(); + }); + do_throw(new Error("Copying should fail, the stream is not async.")); + } catch (e) { + Assert.ok(true); + resolve(); + } + }); + }); +} diff --git a/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js b/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js new file mode 100644 index 0000000000..22efdc6eba --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js @@ -0,0 +1,312 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +var { FileUtils } = ChromeUtils.importESModule( + "resource://gre/modules/FileUtils.sys.mjs" +); +var Pipe = Components.Constructor("@mozilla.org/pipe;1", "nsIPipe", "init"); + +function run_test() { + initTestDevToolsServer(); + add_test_bulk_actor(); + + add_task(async function () { + await test_bulk_request_cs(socket_transport, "jsonReply", "json"); + await test_bulk_request_cs(local_transport, "jsonReply", "json"); + await test_bulk_request_cs(socket_transport, "bulkEcho", "bulk"); + await test_bulk_request_cs(local_transport, "bulkEcho", "bulk"); + await test_json_request_cs(socket_transport, "bulkReply", "bulk"); + await test_json_request_cs(local_transport, "bulkReply", "bulk"); + DevToolsServer.destroy(); + }); + + run_next_test(); +} + +/** * Sample Bulk Actor ***/ +const { Actor } = require("resource://devtools/shared/protocol/Actor.js"); +class TestBulkActor extends Actor { + constructor(conn) { + super(conn, { typeName: "testBulk", methods: [] }); + + this.requestTypes = { + bulkEcho: this.bulkEcho, + bulkReply: this.bulkReply, + jsonReply: this.jsonReply, + }; + } + + bulkEcho({ actor, type, length, copyTo }) { + Assert.equal(length, really_long().length); + this.conn + .startBulkSend({ + actor, + type, + length, + }) + .then(({ copyFrom }) => { + // We'll just echo back the same thing + const pipe = new Pipe(true, true, 0, 0, null); + copyTo(pipe.outputStream).then(() => { + pipe.outputStream.close(); + }); + copyFrom(pipe.inputStream).then(() => { + pipe.inputStream.close(); + }); + }); + } + + bulkReply({ to, type }) { + this.conn + .startBulkSend({ + actor: to, + type, + length: really_long().length, + }) + .then(({ copyFrom }) => { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true, + }, + input => { + copyFrom(input).then(() => { + input.close(); + }); + } + ); + }); + } + + jsonReply({ length, copyTo }) { + Assert.equal(length, really_long().length); + + const outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + const output = FileUtils.openSafeFileOutputStream(outputFile); + + return copyTo(output) + .then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify_files(); + }) + .then(() => { + return { allDone: true }; + }, do_throw); + } +} + +function add_test_bulk_actor() { + ActorRegistry.addGlobalActor( + { + constructorName: "TestBulkActor", + constructorFun: TestBulkActor, + }, + "testBulk" + ); +} + +/** * Reply Handlers ***/ + +var replyHandlers = { + json(request) { + // Receive JSON reply from server + return new Promise(resolve => { + request.on("json-reply", reply => { + Assert.ok(reply.allDone); + resolve(); + }); + }); + }, + + bulk(request) { + // Receive bulk data reply from server + return new Promise(resolve => { + request.on("bulk-reply", ({ length, copyTo }) => { + Assert.equal(length, really_long().length); + + const outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + const output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output).then(() => { + FileUtils.closeSafeFileOutputStream(output); + resolve(verify_files()); + }); + }); + }); + }, +}; + +/** * Tests ***/ + +var test_bulk_request_cs = async function ( + transportFactory, + actorType, + replyType +) { + // Ensure test files are not present from a failed run + cleanup_files(); + writeTestTempFile("bulk-input", really_long()); + + let clientResolve; + const clientDeferred = new Promise(resolve => { + clientResolve = resolve; + }); + + let serverResolve; + const serverDeferred = new Promise(resolve => { + serverResolve = resolve; + }); + + let bulkCopyResolve; + const bulkCopyDeferred = new Promise(resolve => { + bulkCopyResolve = resolve; + }); + + const transport = await transportFactory(); + + const client = new DevToolsClient(transport); + client.connect().then(() => { + client.mainRoot.rootForm.then(clientResolve); + }); + + function bulkSendReadyCallback({ copyFrom }) { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true, + }, + input => { + copyFrom(input).then(() => { + input.close(); + bulkCopyResolve(); + }); + } + ); + } + + clientDeferred + .then(response => { + const request = client.startBulkRequest({ + actor: response.testBulk, + type: actorType, + length: really_long().length, + }); + + // Send bulk data to server + request.on("bulk-send-ready", bulkSendReadyCallback); + + // Set up reply handling for this type + replyHandlers[replyType](request).then(() => { + client.close(); + transport.close(); + }); + }) + .catch(do_throw); + + DevToolsServer.on("connectionchange", type => { + if (type === "closed") { + serverResolve(); + } + }); + + return Promise.all([clientDeferred, bulkCopyDeferred, serverDeferred]); +}; + +var test_json_request_cs = async function ( + transportFactory, + actorType, + replyType +) { + // Ensure test files are not present from a failed run + cleanup_files(); + writeTestTempFile("bulk-input", really_long()); + + let clientResolve; + const clientDeferred = new Promise(resolve => { + clientResolve = resolve; + }); + + let serverResolve; + const serverDeferred = new Promise(resolve => { + serverResolve = resolve; + }); + + const transport = await transportFactory(); + + const client = new DevToolsClient(transport); + await client.connect(); + client.mainRoot.rootForm.then(clientResolve); + + clientDeferred + .then(response => { + const request = client.request({ + to: response.testBulk, + type: actorType, + }); + + // Set up reply handling for this type + replyHandlers[replyType](request).then(() => { + client.close(); + transport.close(); + }); + }) + .catch(do_throw); + + DevToolsServer.on("connectionchange", type => { + if (type === "closed") { + serverResolve(); + } + }); + + return Promise.all([clientDeferred, serverDeferred]); +}; + +/** * Test Utils ***/ + +function verify_files() { + const reallyLong = really_long(); + + const inputFile = getTestTempFile("bulk-input"); + const outputFile = getTestTempFile("bulk-output"); + + Assert.equal(inputFile.fileSize, reallyLong.length); + Assert.equal(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + return new Promise(resolve => { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true, + }, + input => { + const outputData = NetUtil.readInputStreamToString( + input, + reallyLong.length + ); + // Avoid do_check_eq here so we don't log the contents + Assert.ok(outputData === reallyLong); + input.close(); + resolve(); + } + ); + }).then(cleanup_files); +} + +function cleanup_files() { + const inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + const outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js b/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js new file mode 100644 index 0000000000..535431aa38 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js @@ -0,0 +1,163 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ +"use strict"; + +/* global structuredClone */ + +var gPort; +var gExtraListener; + +function run_test() { + info("Starting test at " + new Date().toTimeString()); + initTestDevToolsServer(); + + add_task(test_socket_conn); + add_task(test_socket_shutdown); + add_test(test_pipe_conn); + + run_next_test(); +} + +const { Actor } = require("resource://devtools/shared/protocol/Actor.js"); +class EchoTestActor extends Actor { + constructor(conn) { + super(conn, { typeName: "EchoTestActor", methods: [] }); + + this.requestTypes = { + echo: EchoTestActor.prototype.onEcho, + }; + } + + onEcho(request) { + /* + * Request packets are frozen. Copy request, so that + * DevToolsServerConnection.onPacket can attach a 'from' property. + */ + return structuredClone(request); + } +} + +async function test_socket_conn() { + Assert.equal(DevToolsServer.listeningSockets, 0); + const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT"); + const authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DevToolsServer.AuthenticationResult.ALLOW; + }; + const socketOptions = { + authenticator, + portOrPath: -1, + }; + const listener = new SocketListener(DevToolsServer, socketOptions); + Assert.ok(listener); + listener.open(); + Assert.equal(DevToolsServer.listeningSockets, 1); + gPort = DevToolsServer._listeners[0].port; + info("DevTools server port is " + gPort); + // Open a second, separate listener + gExtraListener = new SocketListener(DevToolsServer, socketOptions); + gExtraListener.open(); + Assert.equal(DevToolsServer.listeningSockets, 2); + Assert.ok(!DevToolsServer.hasConnection()); + + info("Starting long and unicode tests at " + new Date().toTimeString()); + // We can't use EventEmitter.once as this is the second argument we care about... + const onConnectionChange = new Promise(res => { + DevToolsServer.once("connectionchange", (type, conn) => res(conn)); + }); + + const transport = await DevToolsClient.socketConnect({ + host: "127.0.0.1", + port: gPort, + }); + Assert.ok(DevToolsServer.hasConnection()); + info("Wait for server connection"); + const conn = await onConnectionChange; + + // Register a custom actor to do echo requests + const actor = new EchoTestActor(conn); + actor.actorID = "echo-actor"; + conn.addActor(actor); + + // Assert that connection settings are available on transport object + const settings = transport.connectionSettings; + Assert.equal(settings.host, "127.0.0.1"); + Assert.equal(settings.port, gPort); + + const onDebuggerConnectionClosed = DevToolsServer.once("connectionchange"); + const unicodeString = "(╯°□°)╯︵ ┻━┻"; + await new Promise(resolve => { + transport.hooks = { + onPacket(packet) { + this.onPacket = function ({ unicode }) { + Assert.equal(unicode, unicodeString); + transport.close(); + }; + // Verify that things work correctly when bigger than the output + // transport buffers and when transporting unicode... + transport.send({ + to: "echo-actor", + type: "echo", + reallylong: really_long(), + unicode: unicodeString, + }); + Assert.equal(packet.from, "root"); + }, + onTransportClosed(status) { + resolve(); + }, + }; + transport.ready(); + }); + const type = await onDebuggerConnectionClosed; + Assert.equal(type, "closed"); + Assert.ok(!DevToolsServer.hasConnection()); +} + +async function test_socket_shutdown() { + Assert.equal(DevToolsServer.listeningSockets, 2); + gExtraListener.close(); + Assert.equal(DevToolsServer.listeningSockets, 1); + Assert.ok(DevToolsServer.closeAllSocketListeners()); + Assert.equal(DevToolsServer.listeningSockets, 0); + // Make sure closing the listener twice does nothing. + Assert.ok(!DevToolsServer.closeAllSocketListeners()); + Assert.equal(DevToolsServer.listeningSockets, 0); + + info("Connecting to a server socket at " + new Date().toTimeString()); + try { + await DevToolsClient.socketConnect({ + host: "127.0.0.1", + port: gPort, + }); + } catch (e) { + if ( + e.result == Cr.NS_ERROR_CONNECTION_REFUSED || + e.result == Cr.NS_ERROR_NET_TIMEOUT + ) { + // The connection should be refused here, but on slow or overloaded + // machines it may just time out. + Assert.ok(true); + return; + } + throw e; + } + + // Shouldn't reach this, should never connect. + Assert.ok(false); +} + +function test_pipe_conn() { + const transport = DevToolsServer.connectPipe(); + transport.hooks = { + onPacket(packet) { + Assert.equal(packet.from, "root"); + transport.close(); + }, + onTransportClosed(status) { + run_next_test(); + }, + }; + + transport.ready(); +} diff --git a/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js b/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js new file mode 100644 index 0000000000..e08c2380fb --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js @@ -0,0 +1,86 @@ +/** + * Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +/** + * Bug 755412 - checks if the server drops the connection on an improperly + * framed packet, i.e. when the length header is invalid. + */ +"use strict"; + +const { + RawPacket, +} = require("resource://devtools/shared/transport/packets.js"); + +function run_test() { + info("Starting test at " + new Date().toTimeString()); + initTestDevToolsServer(); + + add_task(test_socket_conn_drops_after_invalid_header); + add_task(test_socket_conn_drops_after_invalid_header_2); + add_task(test_socket_conn_drops_after_too_large_length); + add_task(test_socket_conn_drops_after_too_long_header); + run_next_test(); +} + +function test_socket_conn_drops_after_invalid_header() { + return test_helper('fluff30:27:{"to":"root","type":"echo"}'); +} + +function test_socket_conn_drops_after_invalid_header_2() { + return test_helper('27asd:{"to":"root","type":"echo"}'); +} + +function test_socket_conn_drops_after_too_large_length() { + // Packet length is limited (semi-arbitrarily) to 1 TiB (2^40) + return test_helper("4305724038957487634549823475894325:"); +} + +function test_socket_conn_drops_after_too_long_header() { + // The packet header is currently limited to no more than 200 bytes + let rawPacket = "4305724038957487634549823475894325"; + for (let i = 0; i < 8; i++) { + rawPacket += rawPacket; + } + return test_helper(rawPacket + ":"); +} + +var test_helper = async function (payload) { + const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT"); + const authenticator = new AuthenticatorType.Server(); + authenticator.allowConnection = () => { + return DevToolsServer.AuthenticationResult.ALLOW; + }; + const socketOptions = { + authenticator, + portOrPath: -1, + }; + + const listener = new SocketListener(DevToolsServer, socketOptions); + listener.open(); + + const transport = await DevToolsClient.socketConnect({ + host: "127.0.0.1", + port: listener.port, + }); + return new Promise(resolve => { + transport.hooks = { + onPacket(packet) { + this.onPacket = function () { + do_throw(new Error("This connection should be dropped.")); + transport.close(); + }; + + // Inject the payload directly into the stream. + transport._outgoing.push(new RawPacket(transport, payload)); + transport._flushOutgoing(); + }, + onTransportClosed(status) { + Assert.ok(true); + resolve(); + }, + }; + transport.ready(); + }); +}; diff --git a/devtools/shared/transport/tests/xpcshell/test_delimited_read.js b/devtools/shared/transport/tests/xpcshell/test_delimited_read.js new file mode 100644 index 0000000000..fe94f81bbf --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_delimited_read.js @@ -0,0 +1,30 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ +"use strict"; + +const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js"); + +const StringInputStream = Components.Constructor( + "@mozilla.org/io/string-input-stream;1", + "nsIStringInputStream", + "setData" +); + +function run_test() { + add_task(async function () { + await test_delimited_read("0123:", "0123:"); + await test_delimited_read("0123:4567:", "0123:"); + await test_delimited_read("012345678901:", "0123456789"); + await test_delimited_read("0123/0123", "0123/0123"); + }); + + run_next_test(); +} + +/** * Tests ***/ + +function test_delimited_read(input, expected) { + input = new StringInputStream(input, input.length); + const result = StreamUtils.delimitedRead(input, ":", 10); + Assert.equal(result, expected); +} diff --git a/devtools/shared/transport/tests/xpcshell/test_packet.js b/devtools/shared/transport/tests/xpcshell/test_packet.js new file mode 100644 index 0000000000..459a7a8211 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_packet.js @@ -0,0 +1,24 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ +"use strict"; + +const { + JSONPacket, + BulkPacket, +} = require("resource://devtools/shared/transport/packets.js"); + +function run_test() { + add_test(test_packet_done); + run_next_test(); +} + +// Ensure done can be checked without getting an error +function test_packet_done() { + const json = new JSONPacket(); + Assert.ok(!json.done); + + const bulk = new BulkPacket(); + Assert.ok(!bulk.done); + + run_next_test(); +} diff --git a/devtools/shared/transport/tests/xpcshell/test_queue.js b/devtools/shared/transport/tests/xpcshell/test_queue.js new file mode 100644 index 0000000000..603640e34d --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_queue.js @@ -0,0 +1,198 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +/** + * This test verifies that the transport's queue operates correctly when various + * packets are scheduled simultaneously. + */ + +var { FileUtils } = ChromeUtils.importESModule( + "resource://gre/modules/FileUtils.sys.mjs" +); + +function run_test() { + initTestDevToolsServer(); + + add_task(async function () { + await test_transport(socket_transport); + await test_transport(local_transport); + DevToolsServer.destroy(); + }); + + run_next_test(); +} + +/** * Tests ***/ + +var test_transport = async function (transportFactory) { + let clientResolve; + const clientDeferred = new Promise(resolve => { + clientResolve = resolve; + }); + + let serverResolve; + const serverDeferred = new Promise(resolve => { + serverResolve = resolve; + }); + + // Ensure test files are not present from a failed run + cleanup_files(); + const reallyLong = really_long(); + writeTestTempFile("bulk-input", reallyLong); + + Assert.equal(Object.keys(DevToolsServer._connections).length, 0); + + const transport = await transportFactory(); + + // Sending from client to server + function write_data({ copyFrom }) { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true, + }, + function (input, status) { + copyFrom(input).then(() => { + input.close(); + }); + } + ); + } + + // Receiving on server from client + function on_bulk_packet({ actor, type, length, copyTo }) { + Assert.equal(actor, "root"); + Assert.equal(type, "file-stream"); + Assert.equal(length, reallyLong.length); + + const outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + const output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output) + .then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify(); + }) + .then(() => { + // It's now safe to close + transport.hooks.onTransportClosed = () => { + clientResolve(); + }; + transport.close(); + }); + } + + // Client + + function send_packets() { + // Specifically, we want to ensure that multiple send()s proceed without + // causing the transport to die. + transport.send({ + actor: "root", + type: "explode", + }); + + transport + .startBulkSend({ + actor: "root", + type: "file-stream", + length: reallyLong.length, + }) + .then(write_data); + } + + transport.hooks = { + onPacket(packet) { + if (packet.error) { + transport.hooks.onError(packet); + } else if (packet.applicationType) { + transport.hooks.onServerHello(packet); + } else { + do_throw("Unexpected server reply"); + } + }, + + onServerHello(packet) { + // We've received the initial start up packet + Assert.equal(packet.from, "root"); + Assert.equal(packet.applicationType, "xpcshell-tests"); + + // Server + Assert.equal(Object.keys(DevToolsServer._connections).length, 1); + info(Object.keys(DevToolsServer._connections)); + for (const connId in DevToolsServer._connections) { + DevToolsServer._connections[connId].onBulkPacket = on_bulk_packet; + } + + DevToolsServer.on("connectionchange", type => { + if (type === "closed") { + serverResolve(); + } + }); + + send_packets(); + }, + + onError(packet) { + // The explode actor doesn't exist + Assert.equal(packet.from, "root"); + Assert.equal(packet.error, "noSuchActor"); + }, + + onTransportClosed() { + do_throw("Transport closed before we expected"); + }, + }; + + transport.ready(); + + return Promise.all([clientDeferred, serverDeferred]); +}; + +/** * Test Utils ***/ + +function verify() { + const reallyLong = really_long(); + + const inputFile = getTestTempFile("bulk-input"); + const outputFile = getTestTempFile("bulk-output"); + + Assert.equal(inputFile.fileSize, reallyLong.length); + Assert.equal(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + return new Promise(resolve => { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true, + }, + input => { + const outputData = NetUtil.readInputStreamToString( + input, + reallyLong.length + ); + // Avoid do_check_eq here so we don't log the contents + Assert.ok(outputData === reallyLong); + input.close(); + resolve(); + } + ); + }).then(cleanup_files); +} + +function cleanup_files() { + const inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + const outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js b/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js new file mode 100644 index 0000000000..137cbd2679 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js @@ -0,0 +1,169 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +var { FileUtils } = ChromeUtils.importESModule( + "resource://gre/modules/FileUtils.sys.mjs" +); + +function run_test() { + initTestDevToolsServer(); + + add_task(async function () { + await test_bulk_transfer_transport(socket_transport); + await test_bulk_transfer_transport(local_transport); + DevToolsServer.destroy(); + }); + + run_next_test(); +} + +/** * Tests ***/ + +/** + * This tests a one-way bulk transfer at the transport layer. + */ +var test_bulk_transfer_transport = async function (transportFactory) { + info("Starting bulk transfer test at " + new Date().toTimeString()); + + let clientResolve; + const clientDeferred = new Promise(resolve => { + clientResolve = resolve; + }); + + let serverResolve; + const serverDeferred = new Promise(resolve => { + serverResolve = resolve; + }); + + // Ensure test files are not present from a failed run + cleanup_files(); + const reallyLong = really_long(); + writeTestTempFile("bulk-input", reallyLong); + + Assert.equal(Object.keys(DevToolsServer._connections).length, 0); + + const transport = await transportFactory(); + + // Sending from client to server + function write_data({ copyFrom }) { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-input")), + loadUsingSystemPrincipal: true, + }, + function (input, status) { + copyFrom(input).then(() => { + input.close(); + }); + } + ); + } + + // Receiving on server from client + function on_bulk_packet({ actor, type, length, copyTo }) { + Assert.equal(actor, "root"); + Assert.equal(type, "file-stream"); + Assert.equal(length, reallyLong.length); + + const outputFile = getTestTempFile("bulk-output", true); + outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8)); + + const output = FileUtils.openSafeFileOutputStream(outputFile); + + copyTo(output) + .then(() => { + FileUtils.closeSafeFileOutputStream(output); + return verify(); + }) + .then(() => { + // It's now safe to close + transport.hooks.onTransportClosed = () => { + clientResolve(); + }; + transport.close(); + }); + } + + // Client + transport.hooks = { + onPacket(packet) { + // We've received the initial start up packet + Assert.equal(packet.from, "root"); + + // Server + Assert.equal(Object.keys(DevToolsServer._connections).length, 1); + info(Object.keys(DevToolsServer._connections)); + for (const connId in DevToolsServer._connections) { + DevToolsServer._connections[connId].onBulkPacket = on_bulk_packet; + } + + DevToolsServer.on("connectionchange", type => { + if (type === "closed") { + serverResolve(); + } + }); + + transport + .startBulkSend({ + actor: "root", + type: "file-stream", + length: reallyLong.length, + }) + .then(write_data); + }, + + onTransportClosed() { + do_throw("Transport closed before we expected"); + }, + }; + + transport.ready(); + + return Promise.all([clientDeferred, serverDeferred]); +}; + +/** * Test Utils ***/ + +function verify() { + const reallyLong = really_long(); + + const inputFile = getTestTempFile("bulk-input"); + const outputFile = getTestTempFile("bulk-output"); + + Assert.equal(inputFile.fileSize, reallyLong.length); + Assert.equal(outputFile.fileSize, reallyLong.length); + + // Ensure output file contents actually match + return new Promise(resolve => { + NetUtil.asyncFetch( + { + uri: NetUtil.newURI(getTestTempFile("bulk-output")), + loadUsingSystemPrincipal: true, + }, + input => { + const outputData = NetUtil.readInputStreamToString( + input, + reallyLong.length + ); + // Avoid do_check_eq here so we don't log the contents + Assert.ok(outputData === reallyLong); + input.close(); + resolve(); + } + ); + }).then(cleanup_files); +} + +function cleanup_files() { + const inputFile = getTestTempFile("bulk-input", true); + if (inputFile.exists()) { + inputFile.remove(false); + } + + const outputFile = getTestTempFile("bulk-output", true); + if (outputFile.exists()) { + outputFile.remove(false); + } +} diff --git a/devtools/shared/transport/tests/xpcshell/testactors.js b/devtools/shared/transport/tests/xpcshell/testactors.js new file mode 100644 index 0000000000..0a35f05287 --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/testactors.js @@ -0,0 +1,16 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ +"use strict"; + +const { RootActor } = require("resource://devtools/server/actors/root.js"); +const { + ActorRegistry, +} = require("resource://devtools/server/actors/utils/actor-registry.js"); + +exports.createRootActor = function createRootActor(connection) { + const root = new RootActor(connection, { + globalActorFactories: ActorRegistry.globalActorFactories, + }); + root.applicationType = "xpcshell-tests"; + return root; +}; diff --git a/devtools/shared/transport/tests/xpcshell/xpcshell.toml b/devtools/shared/transport/tests/xpcshell/xpcshell.toml new file mode 100644 index 0000000000..38e49ab71c --- /dev/null +++ b/devtools/shared/transport/tests/xpcshell/xpcshell.toml @@ -0,0 +1,22 @@ +[DEFAULT] +tags = "devtools" +head = "head_dbg.js" +firefox-appdir = "browser" +skip-if = ["os == 'android'"] +support-files = ["testactors.js"] + +["test_bulk_error.js"] + +["test_client_server_bulk.js"] + +["test_dbgsocket.js"] + +["test_dbgsocket_connection_drop.js"] + +["test_delimited_read.js"] + +["test_packet.js"] + +["test_queue.js"] + +["test_transport_bulk.js"] diff --git a/devtools/shared/transport/transport.js b/devtools/shared/transport/transport.js new file mode 100644 index 0000000000..460f45206f --- /dev/null +++ b/devtools/shared/transport/transport.js @@ -0,0 +1,499 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); +const { dumpn, dumpv } = DevToolsUtils; +const flags = require("resource://devtools/shared/flags.js"); +const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js"); +const { + Packet, + JSONPacket, + BulkPacket, +} = require("resource://devtools/shared/transport/packets.js"); + +loader.lazyGetter(this, "ScriptableInputStream", () => { + return Components.Constructor( + "@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", + "init" + ); +}); + +const PACKET_HEADER_MAX = 200; + +/** + * An adapter that handles data transfers between the devtools client and + * server. It can work with both nsIPipe and nsIServerSocket transports so + * long as the properly created input and output streams are specified. + * (However, for intra-process connections, LocalDebuggerTransport, below, + * is more efficient than using an nsIPipe pair with DebuggerTransport.) + * + * @param input nsIAsyncInputStream + * The input stream. + * @param output nsIAsyncOutputStream + * The output stream. + * + * Given a DebuggerTransport instance dt: + * 1) Set dt.hooks to a packet handler object (described below). + * 2) Call dt.ready() to begin watching for input packets. + * 3) Call dt.send() / dt.startBulkSend() to send packets. + * 4) Call dt.close() to close the connection, and disengage from the event + * loop. + * + * A packet handler is an object with the following methods: + * + * - onPacket(packet) - called when we have received a complete packet. + * |packet| is the parsed form of the packet --- a JavaScript value, not + * a JSON-syntax string. + * + * - onBulkPacket(packet) - called when we have switched to bulk packet + * receiving mode. |packet| is an object containing: + * * actor: Name of actor that will receive the packet + * * type: Name of actor's method that should be called on receipt + * * length: Size of the data to be read + * * stream: This input stream should only be used directly if you can ensure + * that you will read exactly |length| bytes and will not close the + * stream when reading is complete + * * done: If you use the stream directly (instead of |copyTo| below), you + * must signal completion by resolving / rejecting this deferred. + * If it's rejected, the transport will be closed. If an Error is + * supplied as a rejection value, it will be logged via |dumpn|. + * If you do use |copyTo|, resolving is taken care of for you when + * copying completes. + * * copyTo: A helper function for getting your data out of the stream that + * meets the stream handling requirements above, and has the + * following signature: + * @param output nsIAsyncOutputStream + * The stream to copy to. + * @return Promise + * The promise is resolved when copying completes or rejected if any + * (unexpected) errors occur. + * This object also emits "progress" events for each chunk that is + * copied. See stream-utils.js. + * + * - onTransportClosed(reason) - called when the connection is closed. |reason| is + * an optional nsresult or object, typically passed when the transport is + * closed due to some error in a underlying stream. + * + * See ./packets.js and the Remote Debugging Protocol specification for more + * details on the format of these packets. + */ +function DebuggerTransport(input, output) { + this._input = input; + this._scriptableInput = new ScriptableInputStream(input); + this._output = output; + + // The current incoming (possibly partial) header, which will determine which + // type of Packet |_incoming| below will become. + this._incomingHeader = ""; + // The current incoming Packet object + this._incoming = null; + // A queue of outgoing Packet objects + this._outgoing = []; + + this.hooks = null; + this.active = false; + + this._incomingEnabled = true; + this._outgoingEnabled = true; + + this.close = this.close.bind(this); +} + +DebuggerTransport.prototype = { + /** + * Transmit an object as a JSON packet. + * + * This method returns immediately, without waiting for the entire + * packet to be transmitted, registering event handlers as needed to + * transmit the entire packet. Packets are transmitted in the order + * they are passed to this method. + */ + send(object) { + const packet = new JSONPacket(this); + packet.object = object; + this._outgoing.push(packet); + this._flushOutgoing(); + }, + + /** + * Transmit streaming data via a bulk packet. + * + * This method initiates the bulk send process by queuing up the header data. + * The caller receives eventual access to a stream for writing. + * + * N.B.: Do *not* attempt to close the stream handed to you, as it will + * continue to be used by this transport afterwards. Most users should + * instead use the provided |copyFrom| function instead. + * + * @param header Object + * This is modeled after the format of JSON packets above, but does not + * actually contain the data, but is instead just a routing header: + * * actor: Name of actor that will receive the packet + * * type: Name of actor's method that should be called on receipt + * * length: Size of the data to be sent + * @return Promise + * The promise will be resolved when you are allowed to write to the + * stream with an object containing: + * * stream: This output stream should only be used directly if + * you can ensure that you will write exactly |length| + * bytes and will not close the stream when writing is + * complete + * * done: If you use the stream directly (instead of |copyFrom| + * below), you must signal completion by resolving / + * rejecting this deferred. If it's rejected, the + * transport will be closed. If an Error is supplied as + * a rejection value, it will be logged via |dumpn|. If + * you do use |copyFrom|, resolving is taken care of for + * you when copying completes. + * * copyFrom: A helper function for getting your data onto the + * stream that meets the stream handling requirements + * above, and has the following signature: + * @param input nsIAsyncInputStream + * The stream to copy from. + * @return Promise + * The promise is resolved when copying completes or + * rejected if any (unexpected) errors occur. + * This object also emits "progress" events for each chunk + * that is copied. See stream-utils.js. + */ + startBulkSend(header) { + const packet = new BulkPacket(this); + packet.header = header; + this._outgoing.push(packet); + this._flushOutgoing(); + return packet.streamReadyForWriting; + }, + + /** + * Close the transport. + * @param reason nsresult / object (optional) + * The status code or error message that corresponds to the reason for + * closing the transport (likely because a stream closed or failed). + */ + close(reason) { + this.active = false; + this._input.close(); + this._scriptableInput.close(); + this._output.close(); + this._destroyIncoming(); + this._destroyAllOutgoing(); + if (this.hooks) { + this.hooks.onTransportClosed(reason); + this.hooks = null; + } + if (reason) { + dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason)); + } else { + dumpn("Transport closed."); + } + }, + + /** + * The currently outgoing packet (at the top of the queue). + */ + get _currentOutgoing() { + return this._outgoing[0]; + }, + + /** + * Flush data to the outgoing stream. Waits until the output stream notifies + * us that it is ready to be written to (via onOutputStreamReady). + */ + _flushOutgoing() { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + // If the top of the packet queue has nothing more to send, remove it. + if (this._currentOutgoing.done) { + this._finishCurrentOutgoing(); + } + + if (this._outgoing.length) { + const threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._output.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to write to the output stream. This is + * used when we've temporarily handed off our output stream for writing bulk + * data. + */ + pauseOutgoing() { + this._outgoingEnabled = false; + }, + + /** + * Resume this transport's attempts to write to the output stream. + */ + resumeOutgoing() { + this._outgoingEnabled = true; + this._flushOutgoing(); + }, + + // nsIOutputStreamCallback + /** + * This is called when the output stream is ready for more data to be written. + * The current outgoing packet will attempt to write some amount of data, but + * may not complete. + */ + onOutputStreamReady: DevToolsUtils.makeInfallible(function (stream) { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + try { + this._currentOutgoing.write(stream); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + return; + } + throw e; + } + + this._flushOutgoing(); + }, "DebuggerTransport.prototype.onOutputStreamReady"), + + /** + * Remove the current outgoing packet from the queue upon completion. + */ + _finishCurrentOutgoing() { + if (this._currentOutgoing) { + this._currentOutgoing.destroy(); + this._outgoing.shift(); + } + }, + + /** + * Clear the entire outgoing queue. + */ + _destroyAllOutgoing() { + for (const packet of this._outgoing) { + packet.destroy(); + } + this._outgoing = []; + }, + + /** + * Initialize the input stream for reading. Once this method has been called, + * we watch for packets on the input stream, and pass them to the appropriate + * handlers via this.hooks. + */ + ready() { + this.active = true; + this._waitForIncoming(); + }, + + /** + * Asks the input stream to notify us (via onInputStreamReady) when it is + * ready for reading. + */ + _waitForIncoming() { + if (this._incomingEnabled) { + const threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._input.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to read from the input stream. This is + * used when we've temporarily handed off our input stream for reading bulk + * data. + */ + pauseIncoming() { + this._incomingEnabled = false; + }, + + /** + * Resume this transport's attempts to read from the input stream. + */ + resumeIncoming() { + this._incomingEnabled = true; + this._flushIncoming(); + this._waitForIncoming(); + }, + + // nsIInputStreamCallback + /** + * Called when the stream is either readable or closed. + */ + onInputStreamReady: DevToolsUtils.makeInfallible(function (stream) { + try { + while ( + stream.available() && + this._incomingEnabled && + this._processIncoming(stream, stream.available()) + ) { + // Loop until there is nothing more to process + } + this._waitForIncoming(); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + } else { + throw e; + } + } + }, "DebuggerTransport.prototype.onInputStreamReady"), + + /** + * Process the incoming data. Will create a new currently incoming Packet if + * needed. Tells the incoming Packet to read as much data as it can, but + * reading may not complete. The Packet signals that its data is ready for + * delivery by calling one of this transport's _on*Ready methods (see + * ./packets.js and the _on*Ready methods below). + * @return boolean + * Whether incoming stream processing should continue for any + * remaining data. + */ + _processIncoming(stream, count) { + dumpv("Data available: " + count); + + if (!count) { + dumpv("Nothing to read, skipping"); + return false; + } + + try { + if (!this._incoming) { + dumpv("Creating a new packet from incoming"); + + if (!this._readHeader(stream)) { + // Not enough data to read packet type + return false; + } + + // Attempt to create a new Packet by trying to parse each possible + // header pattern. + this._incoming = Packet.fromHeader(this._incomingHeader, this); + if (!this._incoming) { + throw new Error( + "No packet types for header: " + this._incomingHeader + ); + } + } + + if (!this._incoming.done) { + // We have an incomplete packet, keep reading it. + dumpv("Existing packet incomplete, keep reading"); + this._incoming.read(stream, this._scriptableInput); + } + } catch (e) { + const msg = + "Error reading incoming packet: (" + e + " - " + e.stack + ")"; + dumpn(msg); + + // Now in an invalid state, shut down the transport. + this.close(); + return false; + } + + if (!this._incoming.done) { + // Still not complete, we'll wait for more data. + dumpv("Packet not done, wait for more"); + return true; + } + + // Ready for next packet + this._flushIncoming(); + return true; + }, + + /** + * Read as far as we can into the incoming data, attempting to build up a + * complete packet header (which terminates with ":"). We'll only read up to + * PACKET_HEADER_MAX characters. + * @return boolean + * True if we now have a complete header. + */ + _readHeader() { + const amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length; + this._incomingHeader += StreamUtils.delimitedRead( + this._scriptableInput, + ":", + amountToRead + ); + if (flags.wantVerbose) { + dumpv("Header read: " + this._incomingHeader); + } + + if (this._incomingHeader.endsWith(":")) { + if (flags.wantVerbose) { + dumpv("Found packet header successfully: " + this._incomingHeader); + } + return true; + } + + if (this._incomingHeader.length >= PACKET_HEADER_MAX) { + throw new Error("Failed to parse packet header!"); + } + + // Not enough data yet. + return false; + }, + + /** + * If the incoming packet is done, log it as needed and clear the buffer. + */ + _flushIncoming() { + if (!this._incoming.done) { + return; + } + if (flags.wantLogging) { + dumpn("Got: " + this._incoming); + } + this._destroyIncoming(); + }, + + /** + * Handler triggered by an incoming JSONPacket completing it's |read| method. + * Delivers the packet to this.hooks.onPacket. + */ + _onJSONObjectReady(object) { + DevToolsUtils.executeSoon( + DevToolsUtils.makeInfallible(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.hooks.onPacket(object); + } + }, "DebuggerTransport instance's this.hooks.onPacket") + ); + }, + + /** + * Handler triggered by an incoming BulkPacket entering the |read| phase for + * the stream portion of the packet. Delivers info about the incoming + * streaming data to this.hooks.onBulkPacket. See the main comment on the + * transport at the top of this file for more details. + */ + _onBulkReadReady(...args) { + DevToolsUtils.executeSoon( + DevToolsUtils.makeInfallible(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.hooks.onBulkPacket(...args); + } + }, "DebuggerTransport instance's this.hooks.onBulkPacket") + ); + }, + + /** + * Remove all handlers and references related to the current incoming packet, + * either because it is now complete or because the transport is closing. + */ + _destroyIncoming() { + if (this._incoming) { + this._incoming.destroy(); + } + this._incomingHeader = ""; + this._incoming = null; + }, +}; + +exports.DebuggerTransport = DebuggerTransport; diff --git a/devtools/shared/transport/websocket-transport.js b/devtools/shared/transport/websocket-transport.js new file mode 100644 index 0000000000..b8255e0067 --- /dev/null +++ b/devtools/shared/transport/websocket-transport.js @@ -0,0 +1,86 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +const EventEmitter = require("resource://devtools/shared/event-emitter.js"); + +function WebSocketDebuggerTransport(socket) { + EventEmitter.decorate(this); + + this.active = false; + this.hooks = null; + this.socket = socket; +} + +WebSocketDebuggerTransport.prototype = { + ready() { + if (this.active) { + return; + } + + this.socket.addEventListener("message", this); + this.socket.addEventListener("close", this); + + this.active = true; + }, + + send(object) { + this.emit("send", object); + if (this.socket) { + this.socket.send(JSON.stringify(object)); + } + }, + + startBulkSend() { + throw new Error("Bulk send is not supported by WebSocket transport"); + }, + + close() { + if (!this.socket) { + return; + } + this.emit("close"); + this.active = false; + + this.socket.removeEventListener("message", this); + this.socket.removeEventListener("close", this); + this.socket.close(); + this.socket = null; + + if (this.hooks) { + if (this.hooks.onTransportClosed) { + this.hooks.onTransportClosed(); + } + this.hooks = null; + } + }, + + handleEvent(event) { + switch (event.type) { + case "message": + this.onMessage(event); + break; + case "close": + this.close(); + break; + } + }, + + onMessage({ data }) { + if (typeof data !== "string") { + throw new Error( + "Binary messages are not supported by WebSocket transport" + ); + } + + const object = JSON.parse(data); + this.emit("packet", object); + if (this.hooks) { + this.hooks.onPacket(object); + } + }, +}; + +module.exports = WebSocketDebuggerTransport; diff --git a/devtools/shared/transport/worker-transport.js b/devtools/shared/transport/worker-transport.js new file mode 100644 index 0000000000..903fd69cf4 --- /dev/null +++ b/devtools/shared/transport/worker-transport.js @@ -0,0 +1,113 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +"use strict"; + +// Each worker debugger supports only a single connection to the main thread. +// However, its theoretically possible for multiple servers to connect to the +// same worker. Consequently, each transport has a connection id, to allow +// messages from multiple connections to be multiplexed on a single channel. + +/** + * A transport that uses a WorkerDebugger to send packets from the main + * thread to a worker thread. + */ +class MainThreadWorkerDebuggerTransport { + constructor(dbg, id) { + this._dbg = dbg; + this._id = id; + + this._dbgListener = { + onMessage: this._onMessage.bind(this), + }; + } + + ready() { + this._dbg.addListener(this._dbgListener); + } + + close() { + if (this._dbgListener) { + this._dbg.removeListener(this._dbgListener); + } + this._dbgListener = null; + this.hooks?.onTransportClosed(); + } + + send(packet) { + this._dbg.postMessage( + JSON.stringify({ + type: "message", + id: this._id, + message: packet, + }) + ); + } + + startBulkSend() { + throw new Error("Can't send bulk data from worker threads!"); + } + + _onMessage(message) { + const packet = JSON.parse(message); + if (packet.type !== "message" || packet.id !== this._id || !this.hooks) { + return; + } + + this.hooks.onPacket(packet.message); + } +} + +exports.MainThreadWorkerDebuggerTransport = MainThreadWorkerDebuggerTransport; + +/** + * A transport that uses a WorkerDebuggerGlobalScope to send packets from a + * worker thread to the main thread. + */ +function WorkerThreadWorkerDebuggerTransport(scope, id) { + this._scope = scope; + this._id = id; + this._onMessage = this._onMessage.bind(this); +} + +WorkerThreadWorkerDebuggerTransport.prototype = { + constructor: WorkerThreadWorkerDebuggerTransport, + + ready() { + this._scope.addEventListener("message", this._onMessage); + }, + + close() { + this._scope.removeEventListener("message", this._onMessage); + this.hooks?.onTransportClosed(); + }, + + send(packet) { + this._scope.postMessage( + JSON.stringify({ + type: "message", + id: this._id, + message: packet, + }) + ); + }, + + startBulkSend() { + throw new Error("Can't send bulk data from worker threads!"); + }, + + _onMessage(event) { + const packet = JSON.parse(event.data); + if (packet.type !== "message" || packet.id !== this._id) { + return; + } + + if (this.hooks) { + this.hooks.onPacket(packet.message); + } + }, +}; + +exports.WorkerThreadWorkerDebuggerTransport = + WorkerThreadWorkerDebuggerTransport; -- cgit v1.2.3