summaryrefslogtreecommitdiffstats
path: root/devtools/shared/transport
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /devtools/shared/transport
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'devtools/shared/transport')
-rw-r--r--devtools/shared/transport/child-transport.js128
-rw-r--r--devtools/shared/transport/js-window-actor-transport.js66
-rw-r--r--devtools/shared/transport/local-transport.js204
-rw-r--r--devtools/shared/transport/moz.build18
-rw-r--r--devtools/shared/transport/packets.js440
-rw-r--r--devtools/shared/transport/stream-utils.js254
-rw-r--r--devtools/shared/transport/tests/xpcshell/.eslintrc.js6
-rw-r--r--devtools/shared/transport/tests/xpcshell/head_dbg.js177
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_bulk_error.js94
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js312
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_dbgsocket.js163
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js86
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_delimited_read.js30
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_packet.js24
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_queue.js198
-rw-r--r--devtools/shared/transport/tests/xpcshell/test_transport_bulk.js169
-rw-r--r--devtools/shared/transport/tests/xpcshell/testactors.js16
-rw-r--r--devtools/shared/transport/tests/xpcshell/xpcshell.ini17
-rw-r--r--devtools/shared/transport/transport.js499
-rw-r--r--devtools/shared/transport/websocket-transport.js86
-rw-r--r--devtools/shared/transport/worker-transport.js113
21 files changed, 3100 insertions, 0 deletions
diff --git a/devtools/shared/transport/child-transport.js b/devtools/shared/transport/child-transport.js
new file mode 100644
index 0000000000..52511432e5
--- /dev/null
+++ b/devtools/shared/transport/child-transport.js
@@ -0,0 +1,128 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const flags = require("resource://devtools/shared/flags.js");
+
+/**
+ * A transport for the debugging protocol that uses nsIMessageManagers to
+ * exchange packets with servers running in child processes.
+ *
+ * In the parent process, |mm| should be the nsIMessageSender for the
+ * child process. In a child process, |mm| should be the child process
+ * message manager, which sends packets to the parent.
+ *
+ * |prefix| is a string included in the message names, to distinguish
+ * multiple servers running in the same child process.
+ *
+ * This transport exchanges messages named 'debug:<prefix>:packet', where
+ * <prefix> is |prefix|, whose data is the protocol packet.
+ */
+function ChildDebuggerTransport(mm, prefix) {
+ this._mm = mm;
+ this._messageName = "debug:" + prefix + ":packet";
+}
+
+/*
+ * To avoid confusion, we use 'message' to mean something that
+ * nsIMessageSender conveys, and 'packet' to mean a remote debugging
+ * protocol packet.
+ */
+ChildDebuggerTransport.prototype = {
+ constructor: ChildDebuggerTransport,
+
+ hooks: null,
+
+ _addListener() {
+ this._mm.addMessageListener(this._messageName, this);
+ },
+
+ _removeListener() {
+ try {
+ this._mm.removeMessageListener(this._messageName, this);
+ } catch (e) {
+ if (e.result != Cr.NS_ERROR_NULL_POINTER) {
+ throw e;
+ }
+ // In some cases, especially when using messageManagers in non-e10s mode, we reach
+ // this point with a dead messageManager which only throws errors but does not
+ // seem to indicate in any other way that it is dead.
+ }
+ },
+
+ ready() {
+ this._addListener();
+ },
+
+ close(options) {
+ this._removeListener();
+ if (this.hooks.onTransportClosed) {
+ this.hooks.onTransportClosed(null, options);
+ }
+ },
+
+ receiveMessage({ data }) {
+ this.hooks.onPacket(data);
+ },
+
+ /**
+ * Helper method to ensure a given `object` can be sent across message manager
+ * without being serialized to JSON.
+ * See https://searchfox.org/mozilla-central/rev/6bfadf95b4a6aaa8bb3b2a166d6c3545983e179a/dom/base/nsFrameMessageManager.cpp#458-469
+ */
+ _canBeSerialized(object) {
+ try {
+ const holder = new StructuredCloneHolder(
+ "ChildDebuggerTransport._canBeSerialized",
+ null,
+ object
+ );
+ holder.deserialize(this);
+ } catch (e) {
+ return false;
+ }
+ return true;
+ },
+
+ pathToUnserializable(object) {
+ for (const key in object) {
+ const value = object[key];
+ if (!this._canBeSerialized(value)) {
+ if (typeof value == "object") {
+ return [key].concat(this.pathToUnserializable(value));
+ }
+ return [key];
+ }
+ }
+ return [];
+ },
+
+ send(packet) {
+ if (flags.testing && !this._canBeSerialized(packet)) {
+ const attributes = this.pathToUnserializable(packet);
+ let msg =
+ "Following packet can't be serialized: " + JSON.stringify(packet);
+ msg += "\nBecause of attributes: " + attributes.join(", ") + "\n";
+ msg += "Did you pass a function or an XPCOM object in it?";
+ throw new Error(msg);
+ }
+ try {
+ this._mm.sendAsyncMessage(this._messageName, packet);
+ } catch (e) {
+ if (e.result != Cr.NS_ERROR_NULL_POINTER) {
+ throw e;
+ }
+ // In some cases, especially when using messageManagers in non-e10s mode, we reach
+ // this point with a dead messageManager which only throws errors but does not
+ // seem to indicate in any other way that it is dead.
+ }
+ },
+
+ startBulkSend() {
+ throw new Error("Can't send bulk data to child processes.");
+ },
+};
+
+exports.ChildDebuggerTransport = ChildDebuggerTransport;
diff --git a/devtools/shared/transport/js-window-actor-transport.js b/devtools/shared/transport/js-window-actor-transport.js
new file mode 100644
index 0000000000..9f0fb82497
--- /dev/null
+++ b/devtools/shared/transport/js-window-actor-transport.js
@@ -0,0 +1,66 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+/**
+ * DevTools transport relying on JS Window Actors. This is an experimental
+ * transport. It is only used when using the JS Window Actor based frame
+ * connector. In that case this transport will be used to communicate between
+ * the DevToolsServer living in the parent process and the DevToolsServer
+ * living in the process of the target frame.
+ *
+ * This is intended to be a replacement for child-transport.js which is a
+ * message-manager based transport.
+ */
+class JsWindowActorTransport {
+ constructor(jsWindowActor, prefix) {
+ this.hooks = null;
+ this._jsWindowActor = jsWindowActor;
+ this._prefix = prefix;
+
+ this._onPacketReceived = this._onPacketReceived.bind(this);
+ }
+
+ _addListener() {
+ this._jsWindowActor.on("packet-received", this._onPacketReceived);
+ }
+
+ _removeListener() {
+ this._jsWindowActor.off("packet-received", this._onPacketReceived);
+ }
+
+ ready() {
+ this._addListener();
+ }
+
+ /**
+ * @param {object} options
+ * @param {boolean} options.isModeSwitching
+ * true when this is called as the result of a change to the devtools.browsertoolbox.scope pref
+ */
+ close(options) {
+ this._removeListener();
+ if (this.hooks.onTransportClosed) {
+ this.hooks.onTransportClosed(null, options);
+ }
+ }
+
+ _onPacketReceived(eventName, { data }) {
+ const { prefix, packet } = data;
+ if (prefix === this._prefix) {
+ this.hooks.onPacket(packet);
+ }
+ }
+
+ send(packet) {
+ this._jsWindowActor.sendPacket(packet, this._prefix);
+ }
+
+ startBulkSend() {
+ throw new Error("startBulkSend not implemented for JsWindowActorTransport");
+ }
+}
+
+exports.JsWindowActorTransport = JsWindowActorTransport;
diff --git a/devtools/shared/transport/local-transport.js b/devtools/shared/transport/local-transport.js
new file mode 100644
index 0000000000..bff83b7666
--- /dev/null
+++ b/devtools/shared/transport/local-transport.js
@@ -0,0 +1,204 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
+const { dumpn } = DevToolsUtils;
+const flags = require("resource://devtools/shared/flags.js");
+const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
+
+loader.lazyGetter(this, "Pipe", () => {
+ return Components.Constructor("@mozilla.org/pipe;1", "nsIPipe", "init");
+});
+
+/**
+ * An adapter that handles data transfers between the devtools client and
+ * server when they both run in the same process. It presents the same API as
+ * DebuggerTransport, but instead of transmitting serialized messages across a
+ * connection it merely calls the packet dispatcher of the other side.
+ *
+ * @param other LocalDebuggerTransport
+ * The other endpoint for this debugger connection.
+ *
+ * @see DebuggerTransport
+ */
+function LocalDebuggerTransport(other) {
+ this.other = other;
+ this.hooks = null;
+
+ // A packet number, shared between this and this.other. This isn't used by the
+ // protocol at all, but it makes the packet traces a lot easier to follow.
+ this._serial = this.other ? this.other._serial : { count: 0 };
+ this.close = this.close.bind(this);
+}
+
+LocalDebuggerTransport.prototype = {
+ /**
+ * Transmit a message by directly calling the onPacket handler of the other
+ * endpoint.
+ */
+ send(packet) {
+ const serial = this._serial.count++;
+ if (flags.wantLogging) {
+ // Check 'from' first, as 'echo' packets have both.
+ if (packet.from) {
+ dumpn("Packet " + serial + " sent from " + JSON.stringify(packet.from));
+ } else if (packet.to) {
+ dumpn("Packet " + serial + " sent to " + JSON.stringify(packet.to));
+ }
+ }
+ this._deepFreeze(packet);
+ const other = this.other;
+ if (other) {
+ DevToolsUtils.executeSoon(
+ DevToolsUtils.makeInfallible(() => {
+ // Avoid the cost of JSON.stringify() when logging is disabled.
+ if (flags.wantLogging) {
+ dumpn(
+ "Received packet " +
+ serial +
+ ": " +
+ JSON.stringify(packet, null, 2)
+ );
+ }
+ if (other.hooks) {
+ other.hooks.onPacket(packet);
+ }
+ }, "LocalDebuggerTransport instance's this.other.hooks.onPacket")
+ );
+ }
+ },
+
+ /**
+ * Send a streaming bulk packet directly to the onBulkPacket handler of the
+ * other endpoint.
+ *
+ * This case is much simpler than the full DebuggerTransport, since there is
+ * no primary stream we have to worry about managing while we hand it off to
+ * others temporarily. Instead, we can just make a single use pipe and be
+ * done with it.
+ */
+ startBulkSend({ actor, type, length }) {
+ const serial = this._serial.count++;
+
+ dumpn("Sent bulk packet " + serial + " for actor " + actor);
+ if (!this.other) {
+ const error = new Error("startBulkSend: other side of transport missing");
+ return Promise.reject(error);
+ }
+
+ const pipe = new Pipe(true, true, 0, 0, null);
+
+ DevToolsUtils.executeSoon(
+ DevToolsUtils.makeInfallible(() => {
+ dumpn("Received bulk packet " + serial);
+ if (!this.other.hooks) {
+ return;
+ }
+
+ // Receiver
+ new Promise(receiverResolve => {
+ const packet = {
+ actor,
+ type,
+ length,
+ copyTo: output => {
+ const copying = StreamUtils.copyStream(
+ pipe.inputStream,
+ output,
+ length
+ );
+ receiverResolve(copying);
+ return copying;
+ },
+ stream: pipe.inputStream,
+ done: receiverResolve,
+ };
+
+ this.other.hooks.onBulkPacket(packet);
+ })
+ // Await the result of reading from the stream
+ .then(() => pipe.inputStream.close(), this.close);
+ }, "LocalDebuggerTransport instance's this.other.hooks.onBulkPacket")
+ );
+
+ // Sender
+ return new Promise(senderResolve => {
+ // The remote transport is not capable of resolving immediately here, so we
+ // shouldn't be able to either.
+ DevToolsUtils.executeSoon(() => {
+ return (
+ new Promise(copyResolve => {
+ senderResolve({
+ copyFrom: input => {
+ const copying = StreamUtils.copyStream(
+ input,
+ pipe.outputStream,
+ length
+ );
+ copyResolve(copying);
+ return copying;
+ },
+ stream: pipe.outputStream,
+ done: copyResolve,
+ });
+ })
+ // Await the result of writing to the stream
+ .then(() => pipe.outputStream.close(), this.close)
+ );
+ });
+ });
+ },
+
+ /**
+ * Close the transport.
+ */
+ close() {
+ if (this.other) {
+ // Remove the reference to the other endpoint before calling close(), to
+ // avoid infinite recursion.
+ const other = this.other;
+ this.other = null;
+ other.close();
+ }
+ if (this.hooks) {
+ try {
+ if (this.hooks.onTransportClosed) {
+ this.hooks.onTransportClosed();
+ }
+ } catch (ex) {
+ console.error(ex);
+ }
+ this.hooks = null;
+ }
+ },
+
+ /**
+ * An empty method for emulating the DebuggerTransport API.
+ */
+ ready() {},
+
+ /**
+ * Helper function that makes an object fully immutable.
+ */
+ _deepFreeze(object) {
+ Object.freeze(object);
+ for (const prop in object) {
+ // Freeze the properties that are objects, not on the prototype, and not
+ // already frozen. Note that this might leave an unfrozen reference
+ // somewhere in the object if there is an already frozen object containing
+ // an unfrozen object.
+ if (
+ object.hasOwnProperty(prop) &&
+ typeof object === "object" &&
+ !Object.isFrozen(object)
+ ) {
+ this._deepFreeze(object[prop]);
+ }
+ }
+ },
+};
+
+exports.LocalDebuggerTransport = LocalDebuggerTransport;
diff --git a/devtools/shared/transport/moz.build b/devtools/shared/transport/moz.build
new file mode 100644
index 0000000000..1ec5d01777
--- /dev/null
+++ b/devtools/shared/transport/moz.build
@@ -0,0 +1,18 @@
+# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*-
+# vim: set filetype=python:
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+XPCSHELL_TESTS_MANIFESTS += ["tests/xpcshell/xpcshell.ini"]
+
+DevToolsModules(
+ "child-transport.js",
+ "js-window-actor-transport.js",
+ "local-transport.js",
+ "packets.js",
+ "stream-utils.js",
+ "transport.js",
+ "websocket-transport.js",
+ "worker-transport.js",
+)
diff --git a/devtools/shared/transport/packets.js b/devtools/shared/transport/packets.js
new file mode 100644
index 0000000000..9f9409a123
--- /dev/null
+++ b/devtools/shared/transport/packets.js
@@ -0,0 +1,440 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+/**
+ * Packets contain read / write functionality for the different packet types
+ * supported by the debugging protocol, so that a transport can focus on
+ * delivery and queue management without worrying too much about the specific
+ * packet types.
+ *
+ * They are intended to be "one use only", so a new packet should be
+ * instantiated for each incoming or outgoing packet.
+ *
+ * A complete Packet type should expose at least the following:
+ * * read(stream, scriptableStream)
+ * Called when the input stream has data to read
+ * * write(stream)
+ * Called when the output stream is ready to write
+ * * get done()
+ * Returns true once the packet is done being read / written
+ * * destroy()
+ * Called to clean up at the end of use
+ */
+
+const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
+const { dumpn, dumpv } = DevToolsUtils;
+const flags = require("resource://devtools/shared/flags.js");
+const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
+
+DevToolsUtils.defineLazyGetter(this, "unicodeConverter", () => {
+ // eslint-disable-next-line no-shadow
+ const unicodeConverter = Cc[
+ "@mozilla.org/intl/scriptableunicodeconverter"
+ ].createInstance(Ci.nsIScriptableUnicodeConverter);
+ unicodeConverter.charset = "UTF-8";
+ return unicodeConverter;
+});
+
+// The transport's previous check ensured the header length did not exceed 20
+// characters. Here, we opt for the somewhat smaller, but still large limit of
+// 1 TiB.
+const PACKET_LENGTH_MAX = Math.pow(2, 40);
+
+/**
+ * A generic Packet processing object (extended by two subtypes below).
+ */
+function Packet(transport) {
+ this._transport = transport;
+ this._length = 0;
+}
+
+/**
+ * Attempt to initialize a new Packet based on the incoming packet header we've
+ * received so far. We try each of the types in succession, trying JSON packets
+ * first since they are much more common.
+ * @param header string
+ * The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ * @return Packet
+ * The parsed packet of the matching type, or null if no types matched.
+ */
+Packet.fromHeader = function (header, transport) {
+ return (
+ JSONPacket.fromHeader(header, transport) ||
+ BulkPacket.fromHeader(header, transport)
+ );
+};
+
+Packet.prototype = {
+ get length() {
+ return this._length;
+ },
+
+ set length(length) {
+ if (length > PACKET_LENGTH_MAX) {
+ throw Error(
+ "Packet length " +
+ length +
+ " exceeds the max length of " +
+ PACKET_LENGTH_MAX
+ );
+ }
+ this._length = length;
+ },
+
+ destroy() {
+ this._transport = null;
+ },
+};
+
+exports.Packet = Packet;
+
+/**
+ * With a JSON packet (the typical packet type sent via the transport), data is
+ * transferred as a JSON packet serialized into a string, with the string length
+ * prepended to the packet, followed by a colon ([length]:[packet]). The
+ * contents of the JSON packet are specified in the Remote Debugging Protocol
+ * specification.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ */
+function JSONPacket(transport) {
+ Packet.call(this, transport);
+ this._data = "";
+ this._done = false;
+}
+
+/**
+ * Attempt to initialize a new JSONPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ * The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ * @return JSONPacket
+ * The parsed packet, or null if it's not a match.
+ */
+JSONPacket.fromHeader = function (header, transport) {
+ const match = this.HEADER_PATTERN.exec(header);
+
+ if (!match) {
+ return null;
+ }
+
+ dumpv("Header matches JSON packet");
+ const packet = new JSONPacket(transport);
+ packet.length = +match[1];
+ return packet;
+};
+
+JSONPacket.HEADER_PATTERN = /^(\d+):$/;
+
+JSONPacket.prototype = Object.create(Packet.prototype);
+
+Object.defineProperty(JSONPacket.prototype, "object", {
+ /**
+ * Gets the object (not the serialized string) being read or written.
+ */
+ get() {
+ return this._object;
+ },
+
+ /**
+ * Sets the object to be sent when write() is called.
+ */
+ set(object) {
+ this._object = object;
+ const data = JSON.stringify(object);
+ this._data = unicodeConverter.ConvertFromUnicode(data);
+ this.length = this._data.length;
+ },
+});
+
+JSONPacket.prototype.read = function (stream, scriptableStream) {
+ dumpv("Reading JSON packet");
+
+ // Read in more packet data.
+ this._readData(stream, scriptableStream);
+
+ if (!this.done) {
+ // Don't have a complete packet yet.
+ return;
+ }
+
+ let json = this._data;
+ try {
+ json = unicodeConverter.ConvertToUnicode(json);
+ this._object = JSON.parse(json);
+ } catch (e) {
+ const msg =
+ "Error parsing incoming packet: " +
+ json +
+ " (" +
+ e +
+ " - " +
+ e.stack +
+ ")";
+ console.error(msg);
+ dumpn(msg);
+ return;
+ }
+
+ this._transport._onJSONObjectReady(this._object);
+};
+
+JSONPacket.prototype._readData = function (stream, scriptableStream) {
+ if (flags.wantVerbose) {
+ dumpv(
+ "Reading JSON data: _l: " +
+ this.length +
+ " dL: " +
+ this._data.length +
+ " sA: " +
+ stream.available()
+ );
+ }
+ const bytesToRead = Math.min(
+ this.length - this._data.length,
+ stream.available()
+ );
+ this._data += scriptableStream.readBytes(bytesToRead);
+ this._done = this._data.length === this.length;
+};
+
+JSONPacket.prototype.write = function (stream) {
+ dumpv("Writing JSON packet");
+
+ if (this._outgoing === undefined) {
+ // Format the serialized packet to a buffer
+ this._outgoing = this.length + ":" + this._data;
+ }
+
+ const written = stream.write(this._outgoing, this._outgoing.length);
+ this._outgoing = this._outgoing.slice(written);
+ this._done = !this._outgoing.length;
+};
+
+Object.defineProperty(JSONPacket.prototype, "done", {
+ get() {
+ return this._done;
+ },
+});
+
+JSONPacket.prototype.toString = function () {
+ return JSON.stringify(this._object, null, 2);
+};
+
+exports.JSONPacket = JSONPacket;
+
+/**
+ * With a bulk packet, data is transferred by temporarily handing over the
+ * transport's input or output stream to the application layer for writing data
+ * directly. This can be much faster for large data sets, and avoids various
+ * stages of copies and data duplication inherent in the JSON packet type. The
+ * bulk packet looks like:
+ *
+ * bulk [actor] [type] [length]:[data]
+ *
+ * The interpretation of the data portion depends on the kind of actor and the
+ * packet's type. See the Remote Debugging Protocol Stream Transport spec for
+ * more details.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ */
+function BulkPacket(transport) {
+ Packet.call(this, transport);
+ this._done = false;
+ let _resolve;
+ this._readyForWriting = new Promise(resolve => {
+ _resolve = resolve;
+ });
+ this._readyForWriting.resolve = _resolve;
+}
+
+/**
+ * Attempt to initialize a new BulkPacket based on the incoming packet header
+ * we've received so far.
+ * @param header string
+ * The packet header string to attempt parsing.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ * @return BulkPacket
+ * The parsed packet, or null if it's not a match.
+ */
+BulkPacket.fromHeader = function (header, transport) {
+ const match = this.HEADER_PATTERN.exec(header);
+
+ if (!match) {
+ return null;
+ }
+
+ dumpv("Header matches bulk packet");
+ const packet = new BulkPacket(transport);
+ packet.header = {
+ actor: match[1],
+ type: match[2],
+ length: +match[3],
+ };
+ return packet;
+};
+
+BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/;
+
+BulkPacket.prototype = Object.create(Packet.prototype);
+
+BulkPacket.prototype.read = function (stream) {
+ dumpv("Reading bulk packet, handing off input stream");
+
+ // Temporarily pause monitoring of the input stream
+ this._transport.pauseIncoming();
+
+ new Promise(resolve => {
+ this._transport._onBulkReadReady({
+ actor: this.actor,
+ type: this.type,
+ length: this.length,
+ copyTo: output => {
+ dumpv("CT length: " + this.length);
+ const copying = StreamUtils.copyStream(stream, output, this.length);
+ resolve(copying);
+ return copying;
+ },
+ stream,
+ done: resolve,
+ });
+ // Await the result of reading from the stream
+ }).then(() => {
+ dumpv("onReadDone called, ending bulk mode");
+ this._done = true;
+ this._transport.resumeIncoming();
+ }, this._transport.close);
+
+ // Ensure this is only done once
+ this.read = () => {
+ throw new Error("Tried to read() a BulkPacket's stream multiple times.");
+ };
+};
+
+BulkPacket.prototype.write = function (stream) {
+ dumpv("Writing bulk packet");
+
+ if (this._outgoingHeader === undefined) {
+ dumpv("Serializing bulk packet header");
+ // Format the serialized packet header to a buffer
+ this._outgoingHeader =
+ "bulk " + this.actor + " " + this.type + " " + this.length + ":";
+ }
+
+ // Write the header, or whatever's left of it to write.
+ if (this._outgoingHeader.length) {
+ dumpv("Writing bulk packet header");
+ const written = stream.write(
+ this._outgoingHeader,
+ this._outgoingHeader.length
+ );
+ this._outgoingHeader = this._outgoingHeader.slice(written);
+ return;
+ }
+
+ dumpv("Handing off output stream");
+
+ // Temporarily pause the monitoring of the output stream
+ this._transport.pauseOutgoing();
+
+ new Promise(resolve => {
+ this._readyForWriting.resolve({
+ copyFrom: input => {
+ dumpv("CF length: " + this.length);
+ const copying = StreamUtils.copyStream(input, stream, this.length);
+ resolve(copying);
+ return copying;
+ },
+ stream,
+ done: resolve,
+ });
+ // Await the result of writing to the stream
+ }).then(() => {
+ dumpv("onWriteDone called, ending bulk mode");
+ this._done = true;
+ this._transport.resumeOutgoing();
+ }, this._transport.close);
+
+ // Ensure this is only done once
+ this.write = () => {
+ throw new Error("Tried to write() a BulkPacket's stream multiple times.");
+ };
+};
+
+Object.defineProperty(BulkPacket.prototype, "streamReadyForWriting", {
+ get() {
+ return this._readyForWriting;
+ },
+});
+
+Object.defineProperty(BulkPacket.prototype, "header", {
+ get() {
+ return {
+ actor: this.actor,
+ type: this.type,
+ length: this.length,
+ };
+ },
+
+ set(header) {
+ this.actor = header.actor;
+ this.type = header.type;
+ this.length = header.length;
+ },
+});
+
+Object.defineProperty(BulkPacket.prototype, "done", {
+ get() {
+ return this._done;
+ },
+});
+
+BulkPacket.prototype.toString = function () {
+ return "Bulk: " + JSON.stringify(this.header, null, 2);
+};
+
+exports.BulkPacket = BulkPacket;
+
+/**
+ * RawPacket is used to test the transport's error handling of malformed
+ * packets, by writing data directly onto the stream.
+ * @param transport DebuggerTransport
+ * The transport instance that will own the packet.
+ * @param data string
+ * The raw string to send out onto the stream.
+ */
+function RawPacket(transport, data) {
+ Packet.call(this, transport);
+ this._data = data;
+ this.length = data.length;
+ this._done = false;
+}
+
+RawPacket.prototype = Object.create(Packet.prototype);
+
+RawPacket.prototype.read = function (stream) {
+ // This hasn't yet been needed for testing.
+ throw Error("Not implmented.");
+};
+
+RawPacket.prototype.write = function (stream) {
+ const written = stream.write(this._data, this._data.length);
+ this._data = this._data.slice(written);
+ this._done = !this._data.length;
+};
+
+Object.defineProperty(RawPacket.prototype, "done", {
+ get() {
+ return this._done;
+ },
+});
+
+exports.RawPacket = RawPacket;
diff --git a/devtools/shared/transport/stream-utils.js b/devtools/shared/transport/stream-utils.js
new file mode 100644
index 0000000000..a3efd1d2aa
--- /dev/null
+++ b/devtools/shared/transport/stream-utils.js
@@ -0,0 +1,254 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
+const { dumpv } = DevToolsUtils;
+const EventEmitter = require("resource://devtools/shared/event-emitter.js");
+
+DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
+ return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
+});
+
+DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
+ return Components.Constructor(
+ "@mozilla.org/scriptableinputstream;1",
+ "nsIScriptableInputStream",
+ "init"
+ );
+});
+
+const BUFFER_SIZE = 0x8000;
+
+/**
+ * This helper function (and its companion object) are used by bulk senders and
+ * receivers to read and write data in and out of other streams. Functions that
+ * make use of this tool are passed to callers when it is time to read or write
+ * bulk data. It is highly recommended to use these copier functions instead of
+ * the stream directly because the copier enforces the agreed upon length.
+ * Since bulk mode reuses an existing stream, the sender and receiver must write
+ * and read exactly the agreed upon amount of data, or else the entire transport
+ * will be left in a invalid state. Additionally, other methods of stream
+ * copying (such as NetUtil.asyncCopy) close the streams involved, which would
+ * terminate the debugging transport, and so it is avoided here.
+ *
+ * Overall, this *works*, but clearly the optimal solution would be able to just
+ * use the streams directly. If it were possible to fully implement
+ * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
+ * enforce the length and avoid closing, and consumers could use familiar stream
+ * utilities like NetUtil.asyncCopy.
+ *
+ * The function takes two async streams and copies a precise number of bytes
+ * from one to the other. Copying begins immediately, but may complete at some
+ * future time depending on data size. Use the returned promise to know when
+ * it's complete.
+ *
+ * @param input nsIAsyncInputStream
+ * The stream to copy from.
+ * @param output nsIAsyncOutputStream
+ * The stream to copy to.
+ * @param length Integer
+ * The amount of data that needs to be copied.
+ * @return Promise
+ * The promise is resolved when copying completes or rejected if any
+ * (unexpected) errors occur.
+ */
+function copyStream(input, output, length) {
+ const copier = new StreamCopier(input, output, length);
+ return copier.copy();
+}
+
+function StreamCopier(input, output, length) {
+ EventEmitter.decorate(this);
+ this._id = StreamCopier._nextId++;
+ this.input = input;
+ // Save off the base output stream, since we know it's async as we've required
+ this.baseAsyncOutput = output;
+ if (IOUtil.outputStreamIsBuffered(output)) {
+ this.output = output;
+ } else {
+ this.output = Cc[
+ "@mozilla.org/network/buffered-output-stream;1"
+ ].createInstance(Ci.nsIBufferedOutputStream);
+ this.output.init(output, BUFFER_SIZE);
+ }
+ this._length = length;
+ this._amountLeft = length;
+ let _resolve;
+ let _reject;
+ this._deferred = new Promise((resolve, reject) => {
+ _resolve = resolve;
+ _reject = reject;
+ });
+ this._deferred.resolve = _resolve;
+ this._deferred.reject = _reject;
+
+ this._copy = this._copy.bind(this);
+ this._flush = this._flush.bind(this);
+ this._destroy = this._destroy.bind(this);
+
+ // Copy promise's then method up to this object.
+ // Allows the copier to offer a promise interface for the simple succeed or
+ // fail scenarios, but also emit events (due to the EventEmitter) for other
+ // states, like progress.
+ this.then = this._deferred.then.bind(this._deferred);
+ this.then(this._destroy, this._destroy);
+
+ // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
+ // if flushing would block the output stream.
+ this._streamReadyCallback = this._copy;
+}
+StreamCopier._nextId = 0;
+
+StreamCopier.prototype = {
+ copy() {
+ // Dispatch to the next tick so that it's possible to attach a progress
+ // event listener, even for extremely fast copies (like when testing).
+ Services.tm.dispatchToMainThread(() => {
+ try {
+ this._copy();
+ } catch (e) {
+ this._deferred.reject(e);
+ }
+ });
+ return this;
+ },
+
+ _copy() {
+ const bytesAvailable = this.input.available();
+ const amountToCopy = Math.min(bytesAvailable, this._amountLeft);
+ this._debug("Trying to copy: " + amountToCopy);
+
+ let bytesCopied;
+ try {
+ bytesCopied = this.output.writeFrom(this.input, amountToCopy);
+ } catch (e) {
+ if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+ this._debug("Base stream would block, will retry");
+ this._debug("Waiting for output stream");
+ this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+ return;
+ }
+ throw e;
+ }
+
+ this._amountLeft -= bytesCopied;
+ this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
+ this._emitProgress();
+
+ if (this._amountLeft === 0) {
+ this._debug("Copy done!");
+ this._flush();
+ return;
+ }
+
+ this._debug("Waiting for input stream");
+ this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
+ },
+
+ _emitProgress() {
+ this.emit("progress", {
+ bytesSent: this._length - this._amountLeft,
+ totalBytes: this._length,
+ });
+ },
+
+ _flush() {
+ try {
+ this.output.flush();
+ } catch (e) {
+ if (
+ e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
+ e.result == Cr.NS_ERROR_FAILURE
+ ) {
+ this._debug("Flush would block, will retry");
+ this._streamReadyCallback = this._flush;
+ this._debug("Waiting for output stream");
+ this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+ return;
+ }
+ throw e;
+ }
+ this._deferred.resolve();
+ },
+
+ _destroy() {
+ this._destroy = null;
+ this._copy = null;
+ this._flush = null;
+ this.input = null;
+ this.output = null;
+ },
+
+ // nsIInputStreamCallback
+ onInputStreamReady() {
+ this._streamReadyCallback();
+ },
+
+ // nsIOutputStreamCallback
+ onOutputStreamReady() {
+ this._streamReadyCallback();
+ },
+
+ _debug(msg) {
+ // Prefix logs with the copier ID, which makes logs much easier to
+ // understand when several copiers are running simultaneously
+ dumpv("Copier: " + this._id + " " + msg);
+ },
+};
+
+/**
+ * Read from a stream, one byte at a time, up to the next |delimiter|
+ * character, but stopping if we've read |count| without finding it. Reading
+ * also terminates early if there are less than |count| bytes available on the
+ * stream. In that case, we only read as many bytes as the stream currently has
+ * to offer.
+ * TODO: This implementation could be removed if bug 984651 is fixed, which
+ * provides a native version of the same idea.
+ * @param stream nsIInputStream
+ * The input stream to read from.
+ * @param delimiter string
+ * The character we're trying to find.
+ * @param count integer
+ * The max number of characters to read while searching.
+ * @return string
+ * The data collected. If the delimiter was found, this string will
+ * end with it.
+ */
+function delimitedRead(stream, delimiter, count) {
+ dumpv(
+ "Starting delimited read for " + delimiter + " up to " + count + " bytes"
+ );
+
+ let scriptableStream;
+ if (stream instanceof Ci.nsIScriptableInputStream) {
+ scriptableStream = stream;
+ } else {
+ scriptableStream = new ScriptableInputStream(stream);
+ }
+
+ let data = "";
+
+ // Don't exceed what's available on the stream
+ count = Math.min(count, stream.available());
+
+ if (count <= 0) {
+ return data;
+ }
+
+ let char;
+ while (char !== delimiter && count > 0) {
+ char = scriptableStream.readBytes(1);
+ count--;
+ data += char;
+ }
+
+ return data;
+}
+
+module.exports = {
+ copyStream,
+ delimitedRead,
+};
diff --git a/devtools/shared/transport/tests/xpcshell/.eslintrc.js b/devtools/shared/transport/tests/xpcshell/.eslintrc.js
new file mode 100644
index 0000000000..8611c174f5
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/.eslintrc.js
@@ -0,0 +1,6 @@
+"use strict";
+
+module.exports = {
+ // Extend from the common devtools xpcshell eslintrc config.
+ extends: "../../../../.eslintrc.xpcshell.js",
+};
diff --git a/devtools/shared/transport/tests/xpcshell/head_dbg.js b/devtools/shared/transport/tests/xpcshell/head_dbg.js
new file mode 100644
index 0000000000..42794c7712
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/head_dbg.js
@@ -0,0 +1,177 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+/* exported Cr, CC, NetUtil, errorCount, initTestDevToolsServer,
+ writeTestTempFile, socket_transport, local_transport, really_long
+*/
+
+var CC = Components.Constructor;
+
+const { require } = ChromeUtils.importESModule(
+ "resource://devtools/shared/loader/Loader.sys.mjs"
+);
+const { NetUtil } = ChromeUtils.import("resource://gre/modules/NetUtil.jsm");
+
+// We do not want to log packets by default, because in some tests,
+// we can be sending large amounts of data. The test harness has
+// trouble dealing with logging all the data, and we end up with
+// intermittent time outs (e.g. bug 775924).
+// Services.prefs.setBoolPref("devtools.debugger.log", true);
+// Services.prefs.setBoolPref("devtools.debugger.log.verbose", true);
+// Enable remote debugging for the relevant tests.
+Services.prefs.setBoolPref("devtools.debugger.remote-enabled", true);
+
+const {
+ ActorRegistry,
+} = require("resource://devtools/server/actors/utils/actor-registry.js");
+const {
+ DevToolsServer,
+} = require("resource://devtools/server/devtools-server.js");
+const {
+ DevToolsClient,
+} = require("resource://devtools/client/devtools-client.js");
+const {
+ SocketListener,
+} = require("resource://devtools/shared/security/socket.js");
+
+// Convert an nsIScriptError 'logLevel' value into an appropriate string.
+function scriptErrorLogLevel(message) {
+ switch (message.logLevel) {
+ case Ci.nsIConsoleMessage.info:
+ return "info";
+ case Ci.nsIConsoleMessage.warn:
+ return "warning";
+ default:
+ Assert.equal(message.logLevel, Ci.nsIConsoleMessage.error);
+ return "error";
+ }
+}
+
+// Register a console listener, so console messages don't just disappear
+// into the ether.
+var errorCount = 0;
+var listener = {
+ observe(message) {
+ errorCount++;
+ let string = "";
+ try {
+ // If we've been given an nsIScriptError, then we can print out
+ // something nicely formatted, for tools like Emacs to pick up.
+ message.QueryInterface(Ci.nsIScriptError);
+ dump(
+ message.sourceName +
+ ":" +
+ message.lineNumber +
+ ": " +
+ scriptErrorLogLevel(message) +
+ ": " +
+ message.errorMessage +
+ "\n"
+ );
+ string = message.errorMessage;
+ } catch (x) {
+ // Be a little paranoid with message, as the whole goal here is to lose
+ // no information.
+ try {
+ string = message.message;
+ } catch (e) {
+ string = "<error converting error message to string>";
+ }
+ }
+
+ // Make sure we exit all nested event loops so that the test can finish.
+ while (DevToolsServer.xpcInspector.eventLoopNestLevel > 0) {
+ DevToolsServer.xpcInspector.exitNestedEventLoop();
+ }
+
+ do_throw("head_dbg.js got console message: " + string + "\n");
+ },
+};
+
+Services.console.registerListener(listener);
+
+/**
+ * Initialize the testing devtools server.
+ */
+function initTestDevToolsServer() {
+ ActorRegistry.registerModule("devtools/server/actors/thread", {
+ prefix: "script",
+ constructor: "ScriptActor",
+ type: { global: true, target: true },
+ });
+ const { createRootActor } = require("xpcshell-test/testactors");
+ DevToolsServer.setRootActor(createRootActor);
+ // Allow incoming connections.
+ DevToolsServer.init();
+ // Avoid the server from being destroyed when the last connection closes
+ DevToolsServer.keepAlive = true;
+}
+
+/**
+ * Wrapper around do_get_file to prefix files with the name of current test to
+ * avoid collisions when running in parallel.
+ */
+function getTestTempFile(fileName, allowMissing) {
+ let thisTest = _TEST_FILE.toString().replace(/\\/g, "/");
+ thisTest = thisTest.substring(thisTest.lastIndexOf("/") + 1);
+ thisTest = thisTest.replace(/\..*$/, "");
+ return do_get_file(fileName + "-" + thisTest, allowMissing);
+}
+
+function writeTestTempFile(fileName, content) {
+ const file = getTestTempFile(fileName, true);
+ const stream = Cc["@mozilla.org/network/file-output-stream;1"].createInstance(
+ Ci.nsIFileOutputStream
+ );
+ stream.init(file, -1, -1, 0);
+ try {
+ do {
+ const numWritten = stream.write(content, content.length);
+ content = content.slice(numWritten);
+ } while (content.length);
+ } finally {
+ stream.close();
+ }
+}
+
+/** * Transport Factories ***/
+
+var socket_transport = async function () {
+ if (!DevToolsServer.listeningSockets) {
+ const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT");
+ const authenticator = new AuthenticatorType.Server();
+ authenticator.allowConnection = () => {
+ return DevToolsServer.AuthenticationResult.ALLOW;
+ };
+ const socketOptions = {
+ authenticator,
+ portOrPath: -1,
+ };
+ const debuggerListener = new SocketListener(DevToolsServer, socketOptions);
+ await debuggerListener.open();
+ }
+ const port = DevToolsServer._listeners[0].port;
+ info("DevTools server port is " + port);
+ return DevToolsClient.socketConnect({ host: "127.0.0.1", port });
+};
+
+function local_transport() {
+ return Promise.resolve(DevToolsServer.connectPipe());
+}
+
+/** * Sample Data ***/
+
+var gReallyLong;
+function really_long() {
+ if (gReallyLong) {
+ return gReallyLong;
+ }
+ let ret = "0123456789";
+ for (let i = 0; i < 18; i++) {
+ ret += ret;
+ }
+ gReallyLong = ret;
+ return ret;
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_bulk_error.js b/devtools/shared/transport/tests/xpcshell/test_bulk_error.js
new file mode 100644
index 0000000000..52cc826e51
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_bulk_error.js
@@ -0,0 +1,94 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+function run_test() {
+ initTestDevToolsServer();
+ add_test_bulk_actor();
+
+ add_task(async function () {
+ await test_string_error(socket_transport, json_reply);
+ await test_string_error(local_transport, json_reply);
+ DevToolsServer.destroy();
+ });
+
+ run_next_test();
+}
+
+/** * Sample Bulk Actor ***/
+const { Actor } = require("resource://devtools/shared/protocol/Actor.js");
+class TestBulkActor extends Actor {
+ constructor(conn) {
+ super(conn);
+
+ this.typeName = "testBulk";
+ this.requestTypes = {
+ jsonReply: this.jsonReply,
+ };
+ }
+
+ jsonReply({ length, reader, reply, done }) {
+ Assert.equal(length, really_long().length);
+
+ return {
+ allDone: true,
+ };
+ }
+}
+
+function add_test_bulk_actor() {
+ ActorRegistry.addGlobalActor(
+ {
+ constructorName: "TestBulkActor",
+ constructorFun: TestBulkActor,
+ },
+ "testBulk"
+ );
+}
+
+/** * Tests ***/
+
+var test_string_error = async function (transportFactory, onReady) {
+ const transport = await transportFactory();
+
+ const client = new DevToolsClient(transport);
+ await client.connect();
+ const response = await client.mainRoot.rootForm;
+
+ await onReady(client, response);
+ client.close();
+ transport.close();
+};
+
+/** * Reply Types ***/
+
+function json_reply(client, response) {
+ const reallyLong = really_long();
+
+ const request = client.startBulkRequest({
+ actor: response.testBulk,
+ type: "jsonReply",
+ length: reallyLong.length,
+ });
+
+ // Send bulk data to server
+ return new Promise(resolve => {
+ request.on("bulk-send-ready", ({ writer, done }) => {
+ const input = Cc["@mozilla.org/io/string-input-stream;1"].createInstance(
+ Ci.nsIStringInputStream
+ );
+ input.setData(reallyLong, reallyLong.length);
+ try {
+ writer.copyFrom(input, () => {
+ input.close();
+ done();
+ });
+ do_throw(new Error("Copying should fail, the stream is not async."));
+ } catch (e) {
+ Assert.ok(true);
+ resolve();
+ }
+ });
+ });
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js b/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js
new file mode 100644
index 0000000000..22efdc6eba
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_client_server_bulk.js
@@ -0,0 +1,312 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+var { FileUtils } = ChromeUtils.importESModule(
+ "resource://gre/modules/FileUtils.sys.mjs"
+);
+var Pipe = Components.Constructor("@mozilla.org/pipe;1", "nsIPipe", "init");
+
+function run_test() {
+ initTestDevToolsServer();
+ add_test_bulk_actor();
+
+ add_task(async function () {
+ await test_bulk_request_cs(socket_transport, "jsonReply", "json");
+ await test_bulk_request_cs(local_transport, "jsonReply", "json");
+ await test_bulk_request_cs(socket_transport, "bulkEcho", "bulk");
+ await test_bulk_request_cs(local_transport, "bulkEcho", "bulk");
+ await test_json_request_cs(socket_transport, "bulkReply", "bulk");
+ await test_json_request_cs(local_transport, "bulkReply", "bulk");
+ DevToolsServer.destroy();
+ });
+
+ run_next_test();
+}
+
+/** * Sample Bulk Actor ***/
+const { Actor } = require("resource://devtools/shared/protocol/Actor.js");
+class TestBulkActor extends Actor {
+ constructor(conn) {
+ super(conn, { typeName: "testBulk", methods: [] });
+
+ this.requestTypes = {
+ bulkEcho: this.bulkEcho,
+ bulkReply: this.bulkReply,
+ jsonReply: this.jsonReply,
+ };
+ }
+
+ bulkEcho({ actor, type, length, copyTo }) {
+ Assert.equal(length, really_long().length);
+ this.conn
+ .startBulkSend({
+ actor,
+ type,
+ length,
+ })
+ .then(({ copyFrom }) => {
+ // We'll just echo back the same thing
+ const pipe = new Pipe(true, true, 0, 0, null);
+ copyTo(pipe.outputStream).then(() => {
+ pipe.outputStream.close();
+ });
+ copyFrom(pipe.inputStream).then(() => {
+ pipe.inputStream.close();
+ });
+ });
+ }
+
+ bulkReply({ to, type }) {
+ this.conn
+ .startBulkSend({
+ actor: to,
+ type,
+ length: really_long().length,
+ })
+ .then(({ copyFrom }) => {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-input")),
+ loadUsingSystemPrincipal: true,
+ },
+ input => {
+ copyFrom(input).then(() => {
+ input.close();
+ });
+ }
+ );
+ });
+ }
+
+ jsonReply({ length, copyTo }) {
+ Assert.equal(length, really_long().length);
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8));
+
+ const output = FileUtils.openSafeFileOutputStream(outputFile);
+
+ return copyTo(output)
+ .then(() => {
+ FileUtils.closeSafeFileOutputStream(output);
+ return verify_files();
+ })
+ .then(() => {
+ return { allDone: true };
+ }, do_throw);
+ }
+}
+
+function add_test_bulk_actor() {
+ ActorRegistry.addGlobalActor(
+ {
+ constructorName: "TestBulkActor",
+ constructorFun: TestBulkActor,
+ },
+ "testBulk"
+ );
+}
+
+/** * Reply Handlers ***/
+
+var replyHandlers = {
+ json(request) {
+ // Receive JSON reply from server
+ return new Promise(resolve => {
+ request.on("json-reply", reply => {
+ Assert.ok(reply.allDone);
+ resolve();
+ });
+ });
+ },
+
+ bulk(request) {
+ // Receive bulk data reply from server
+ return new Promise(resolve => {
+ request.on("bulk-reply", ({ length, copyTo }) => {
+ Assert.equal(length, really_long().length);
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8));
+
+ const output = FileUtils.openSafeFileOutputStream(outputFile);
+
+ copyTo(output).then(() => {
+ FileUtils.closeSafeFileOutputStream(output);
+ resolve(verify_files());
+ });
+ });
+ });
+ },
+};
+
+/** * Tests ***/
+
+var test_bulk_request_cs = async function (
+ transportFactory,
+ actorType,
+ replyType
+) {
+ // Ensure test files are not present from a failed run
+ cleanup_files();
+ writeTestTempFile("bulk-input", really_long());
+
+ let clientResolve;
+ const clientDeferred = new Promise(resolve => {
+ clientResolve = resolve;
+ });
+
+ let serverResolve;
+ const serverDeferred = new Promise(resolve => {
+ serverResolve = resolve;
+ });
+
+ let bulkCopyResolve;
+ const bulkCopyDeferred = new Promise(resolve => {
+ bulkCopyResolve = resolve;
+ });
+
+ const transport = await transportFactory();
+
+ const client = new DevToolsClient(transport);
+ client.connect().then(() => {
+ client.mainRoot.rootForm.then(clientResolve);
+ });
+
+ function bulkSendReadyCallback({ copyFrom }) {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-input")),
+ loadUsingSystemPrincipal: true,
+ },
+ input => {
+ copyFrom(input).then(() => {
+ input.close();
+ bulkCopyResolve();
+ });
+ }
+ );
+ }
+
+ clientDeferred
+ .then(response => {
+ const request = client.startBulkRequest({
+ actor: response.testBulk,
+ type: actorType,
+ length: really_long().length,
+ });
+
+ // Send bulk data to server
+ request.on("bulk-send-ready", bulkSendReadyCallback);
+
+ // Set up reply handling for this type
+ replyHandlers[replyType](request).then(() => {
+ client.close();
+ transport.close();
+ });
+ })
+ .catch(do_throw);
+
+ DevToolsServer.on("connectionchange", type => {
+ if (type === "closed") {
+ serverResolve();
+ }
+ });
+
+ return Promise.all([clientDeferred, bulkCopyDeferred, serverDeferred]);
+};
+
+var test_json_request_cs = async function (
+ transportFactory,
+ actorType,
+ replyType
+) {
+ // Ensure test files are not present from a failed run
+ cleanup_files();
+ writeTestTempFile("bulk-input", really_long());
+
+ let clientResolve;
+ const clientDeferred = new Promise(resolve => {
+ clientResolve = resolve;
+ });
+
+ let serverResolve;
+ const serverDeferred = new Promise(resolve => {
+ serverResolve = resolve;
+ });
+
+ const transport = await transportFactory();
+
+ const client = new DevToolsClient(transport);
+ await client.connect();
+ client.mainRoot.rootForm.then(clientResolve);
+
+ clientDeferred
+ .then(response => {
+ const request = client.request({
+ to: response.testBulk,
+ type: actorType,
+ });
+
+ // Set up reply handling for this type
+ replyHandlers[replyType](request).then(() => {
+ client.close();
+ transport.close();
+ });
+ })
+ .catch(do_throw);
+
+ DevToolsServer.on("connectionchange", type => {
+ if (type === "closed") {
+ serverResolve();
+ }
+ });
+
+ return Promise.all([clientDeferred, serverDeferred]);
+};
+
+/** * Test Utils ***/
+
+function verify_files() {
+ const reallyLong = really_long();
+
+ const inputFile = getTestTempFile("bulk-input");
+ const outputFile = getTestTempFile("bulk-output");
+
+ Assert.equal(inputFile.fileSize, reallyLong.length);
+ Assert.equal(outputFile.fileSize, reallyLong.length);
+
+ // Ensure output file contents actually match
+ return new Promise(resolve => {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-output")),
+ loadUsingSystemPrincipal: true,
+ },
+ input => {
+ const outputData = NetUtil.readInputStreamToString(
+ input,
+ reallyLong.length
+ );
+ // Avoid do_check_eq here so we don't log the contents
+ Assert.ok(outputData === reallyLong);
+ input.close();
+ resolve();
+ }
+ );
+ }).then(cleanup_files);
+}
+
+function cleanup_files() {
+ const inputFile = getTestTempFile("bulk-input", true);
+ if (inputFile.exists()) {
+ inputFile.remove(false);
+ }
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ if (outputFile.exists()) {
+ outputFile.remove(false);
+ }
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js b/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js
new file mode 100644
index 0000000000..535431aa38
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_dbgsocket.js
@@ -0,0 +1,163 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+"use strict";
+
+/* global structuredClone */
+
+var gPort;
+var gExtraListener;
+
+function run_test() {
+ info("Starting test at " + new Date().toTimeString());
+ initTestDevToolsServer();
+
+ add_task(test_socket_conn);
+ add_task(test_socket_shutdown);
+ add_test(test_pipe_conn);
+
+ run_next_test();
+}
+
+const { Actor } = require("resource://devtools/shared/protocol/Actor.js");
+class EchoTestActor extends Actor {
+ constructor(conn) {
+ super(conn, { typeName: "EchoTestActor", methods: [] });
+
+ this.requestTypes = {
+ echo: EchoTestActor.prototype.onEcho,
+ };
+ }
+
+ onEcho(request) {
+ /*
+ * Request packets are frozen. Copy request, so that
+ * DevToolsServerConnection.onPacket can attach a 'from' property.
+ */
+ return structuredClone(request);
+ }
+}
+
+async function test_socket_conn() {
+ Assert.equal(DevToolsServer.listeningSockets, 0);
+ const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT");
+ const authenticator = new AuthenticatorType.Server();
+ authenticator.allowConnection = () => {
+ return DevToolsServer.AuthenticationResult.ALLOW;
+ };
+ const socketOptions = {
+ authenticator,
+ portOrPath: -1,
+ };
+ const listener = new SocketListener(DevToolsServer, socketOptions);
+ Assert.ok(listener);
+ listener.open();
+ Assert.equal(DevToolsServer.listeningSockets, 1);
+ gPort = DevToolsServer._listeners[0].port;
+ info("DevTools server port is " + gPort);
+ // Open a second, separate listener
+ gExtraListener = new SocketListener(DevToolsServer, socketOptions);
+ gExtraListener.open();
+ Assert.equal(DevToolsServer.listeningSockets, 2);
+ Assert.ok(!DevToolsServer.hasConnection());
+
+ info("Starting long and unicode tests at " + new Date().toTimeString());
+ // We can't use EventEmitter.once as this is the second argument we care about...
+ const onConnectionChange = new Promise(res => {
+ DevToolsServer.once("connectionchange", (type, conn) => res(conn));
+ });
+
+ const transport = await DevToolsClient.socketConnect({
+ host: "127.0.0.1",
+ port: gPort,
+ });
+ Assert.ok(DevToolsServer.hasConnection());
+ info("Wait for server connection");
+ const conn = await onConnectionChange;
+
+ // Register a custom actor to do echo requests
+ const actor = new EchoTestActor(conn);
+ actor.actorID = "echo-actor";
+ conn.addActor(actor);
+
+ // Assert that connection settings are available on transport object
+ const settings = transport.connectionSettings;
+ Assert.equal(settings.host, "127.0.0.1");
+ Assert.equal(settings.port, gPort);
+
+ const onDebuggerConnectionClosed = DevToolsServer.once("connectionchange");
+ const unicodeString = "(╯°□°)╯︵ ┻━┻";
+ await new Promise(resolve => {
+ transport.hooks = {
+ onPacket(packet) {
+ this.onPacket = function ({ unicode }) {
+ Assert.equal(unicode, unicodeString);
+ transport.close();
+ };
+ // Verify that things work correctly when bigger than the output
+ // transport buffers and when transporting unicode...
+ transport.send({
+ to: "echo-actor",
+ type: "echo",
+ reallylong: really_long(),
+ unicode: unicodeString,
+ });
+ Assert.equal(packet.from, "root");
+ },
+ onTransportClosed(status) {
+ resolve();
+ },
+ };
+ transport.ready();
+ });
+ const type = await onDebuggerConnectionClosed;
+ Assert.equal(type, "closed");
+ Assert.ok(!DevToolsServer.hasConnection());
+}
+
+async function test_socket_shutdown() {
+ Assert.equal(DevToolsServer.listeningSockets, 2);
+ gExtraListener.close();
+ Assert.equal(DevToolsServer.listeningSockets, 1);
+ Assert.ok(DevToolsServer.closeAllSocketListeners());
+ Assert.equal(DevToolsServer.listeningSockets, 0);
+ // Make sure closing the listener twice does nothing.
+ Assert.ok(!DevToolsServer.closeAllSocketListeners());
+ Assert.equal(DevToolsServer.listeningSockets, 0);
+
+ info("Connecting to a server socket at " + new Date().toTimeString());
+ try {
+ await DevToolsClient.socketConnect({
+ host: "127.0.0.1",
+ port: gPort,
+ });
+ } catch (e) {
+ if (
+ e.result == Cr.NS_ERROR_CONNECTION_REFUSED ||
+ e.result == Cr.NS_ERROR_NET_TIMEOUT
+ ) {
+ // The connection should be refused here, but on slow or overloaded
+ // machines it may just time out.
+ Assert.ok(true);
+ return;
+ }
+ throw e;
+ }
+
+ // Shouldn't reach this, should never connect.
+ Assert.ok(false);
+}
+
+function test_pipe_conn() {
+ const transport = DevToolsServer.connectPipe();
+ transport.hooks = {
+ onPacket(packet) {
+ Assert.equal(packet.from, "root");
+ transport.close();
+ },
+ onTransportClosed(status) {
+ run_next_test();
+ },
+ };
+
+ transport.ready();
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js b/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js
new file mode 100644
index 0000000000..e08c2380fb
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_dbgsocket_connection_drop.js
@@ -0,0 +1,86 @@
+/**
+ * Any copyright is dedicated to the Public Domain.
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+/**
+ * Bug 755412 - checks if the server drops the connection on an improperly
+ * framed packet, i.e. when the length header is invalid.
+ */
+"use strict";
+
+const {
+ RawPacket,
+} = require("resource://devtools/shared/transport/packets.js");
+
+function run_test() {
+ info("Starting test at " + new Date().toTimeString());
+ initTestDevToolsServer();
+
+ add_task(test_socket_conn_drops_after_invalid_header);
+ add_task(test_socket_conn_drops_after_invalid_header_2);
+ add_task(test_socket_conn_drops_after_too_large_length);
+ add_task(test_socket_conn_drops_after_too_long_header);
+ run_next_test();
+}
+
+function test_socket_conn_drops_after_invalid_header() {
+ return test_helper('fluff30:27:{"to":"root","type":"echo"}');
+}
+
+function test_socket_conn_drops_after_invalid_header_2() {
+ return test_helper('27asd:{"to":"root","type":"echo"}');
+}
+
+function test_socket_conn_drops_after_too_large_length() {
+ // Packet length is limited (semi-arbitrarily) to 1 TiB (2^40)
+ return test_helper("4305724038957487634549823475894325:");
+}
+
+function test_socket_conn_drops_after_too_long_header() {
+ // The packet header is currently limited to no more than 200 bytes
+ let rawPacket = "4305724038957487634549823475894325";
+ for (let i = 0; i < 8; i++) {
+ rawPacket += rawPacket;
+ }
+ return test_helper(rawPacket + ":");
+}
+
+var test_helper = async function (payload) {
+ const AuthenticatorType = DevToolsServer.Authenticators.get("PROMPT");
+ const authenticator = new AuthenticatorType.Server();
+ authenticator.allowConnection = () => {
+ return DevToolsServer.AuthenticationResult.ALLOW;
+ };
+ const socketOptions = {
+ authenticator,
+ portOrPath: -1,
+ };
+
+ const listener = new SocketListener(DevToolsServer, socketOptions);
+ listener.open();
+
+ const transport = await DevToolsClient.socketConnect({
+ host: "127.0.0.1",
+ port: listener.port,
+ });
+ return new Promise(resolve => {
+ transport.hooks = {
+ onPacket(packet) {
+ this.onPacket = function () {
+ do_throw(new Error("This connection should be dropped."));
+ transport.close();
+ };
+
+ // Inject the payload directly into the stream.
+ transport._outgoing.push(new RawPacket(transport, payload));
+ transport._flushOutgoing();
+ },
+ onTransportClosed(status) {
+ Assert.ok(true);
+ resolve();
+ },
+ };
+ transport.ready();
+ });
+};
diff --git a/devtools/shared/transport/tests/xpcshell/test_delimited_read.js b/devtools/shared/transport/tests/xpcshell/test_delimited_read.js
new file mode 100644
index 0000000000..fe94f81bbf
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_delimited_read.js
@@ -0,0 +1,30 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+"use strict";
+
+const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
+
+const StringInputStream = Components.Constructor(
+ "@mozilla.org/io/string-input-stream;1",
+ "nsIStringInputStream",
+ "setData"
+);
+
+function run_test() {
+ add_task(async function () {
+ await test_delimited_read("0123:", "0123:");
+ await test_delimited_read("0123:4567:", "0123:");
+ await test_delimited_read("012345678901:", "0123456789");
+ await test_delimited_read("0123/0123", "0123/0123");
+ });
+
+ run_next_test();
+}
+
+/** * Tests ***/
+
+function test_delimited_read(input, expected) {
+ input = new StringInputStream(input, input.length);
+ const result = StreamUtils.delimitedRead(input, ":", 10);
+ Assert.equal(result, expected);
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_packet.js b/devtools/shared/transport/tests/xpcshell/test_packet.js
new file mode 100644
index 0000000000..459a7a8211
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_packet.js
@@ -0,0 +1,24 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+"use strict";
+
+const {
+ JSONPacket,
+ BulkPacket,
+} = require("resource://devtools/shared/transport/packets.js");
+
+function run_test() {
+ add_test(test_packet_done);
+ run_next_test();
+}
+
+// Ensure done can be checked without getting an error
+function test_packet_done() {
+ const json = new JSONPacket();
+ Assert.ok(!json.done);
+
+ const bulk = new BulkPacket();
+ Assert.ok(!bulk.done);
+
+ run_next_test();
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_queue.js b/devtools/shared/transport/tests/xpcshell/test_queue.js
new file mode 100644
index 0000000000..603640e34d
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_queue.js
@@ -0,0 +1,198 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+/**
+ * This test verifies that the transport's queue operates correctly when various
+ * packets are scheduled simultaneously.
+ */
+
+var { FileUtils } = ChromeUtils.importESModule(
+ "resource://gre/modules/FileUtils.sys.mjs"
+);
+
+function run_test() {
+ initTestDevToolsServer();
+
+ add_task(async function () {
+ await test_transport(socket_transport);
+ await test_transport(local_transport);
+ DevToolsServer.destroy();
+ });
+
+ run_next_test();
+}
+
+/** * Tests ***/
+
+var test_transport = async function (transportFactory) {
+ let clientResolve;
+ const clientDeferred = new Promise(resolve => {
+ clientResolve = resolve;
+ });
+
+ let serverResolve;
+ const serverDeferred = new Promise(resolve => {
+ serverResolve = resolve;
+ });
+
+ // Ensure test files are not present from a failed run
+ cleanup_files();
+ const reallyLong = really_long();
+ writeTestTempFile("bulk-input", reallyLong);
+
+ Assert.equal(Object.keys(DevToolsServer._connections).length, 0);
+
+ const transport = await transportFactory();
+
+ // Sending from client to server
+ function write_data({ copyFrom }) {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-input")),
+ loadUsingSystemPrincipal: true,
+ },
+ function (input, status) {
+ copyFrom(input).then(() => {
+ input.close();
+ });
+ }
+ );
+ }
+
+ // Receiving on server from client
+ function on_bulk_packet({ actor, type, length, copyTo }) {
+ Assert.equal(actor, "root");
+ Assert.equal(type, "file-stream");
+ Assert.equal(length, reallyLong.length);
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8));
+
+ const output = FileUtils.openSafeFileOutputStream(outputFile);
+
+ copyTo(output)
+ .then(() => {
+ FileUtils.closeSafeFileOutputStream(output);
+ return verify();
+ })
+ .then(() => {
+ // It's now safe to close
+ transport.hooks.onTransportClosed = () => {
+ clientResolve();
+ };
+ transport.close();
+ });
+ }
+
+ // Client
+
+ function send_packets() {
+ // Specifically, we want to ensure that multiple send()s proceed without
+ // causing the transport to die.
+ transport.send({
+ actor: "root",
+ type: "explode",
+ });
+
+ transport
+ .startBulkSend({
+ actor: "root",
+ type: "file-stream",
+ length: reallyLong.length,
+ })
+ .then(write_data);
+ }
+
+ transport.hooks = {
+ onPacket(packet) {
+ if (packet.error) {
+ transport.hooks.onError(packet);
+ } else if (packet.applicationType) {
+ transport.hooks.onServerHello(packet);
+ } else {
+ do_throw("Unexpected server reply");
+ }
+ },
+
+ onServerHello(packet) {
+ // We've received the initial start up packet
+ Assert.equal(packet.from, "root");
+ Assert.equal(packet.applicationType, "xpcshell-tests");
+
+ // Server
+ Assert.equal(Object.keys(DevToolsServer._connections).length, 1);
+ info(Object.keys(DevToolsServer._connections));
+ for (const connId in DevToolsServer._connections) {
+ DevToolsServer._connections[connId].onBulkPacket = on_bulk_packet;
+ }
+
+ DevToolsServer.on("connectionchange", type => {
+ if (type === "closed") {
+ serverResolve();
+ }
+ });
+
+ send_packets();
+ },
+
+ onError(packet) {
+ // The explode actor doesn't exist
+ Assert.equal(packet.from, "root");
+ Assert.equal(packet.error, "noSuchActor");
+ },
+
+ onTransportClosed() {
+ do_throw("Transport closed before we expected");
+ },
+ };
+
+ transport.ready();
+
+ return Promise.all([clientDeferred, serverDeferred]);
+};
+
+/** * Test Utils ***/
+
+function verify() {
+ const reallyLong = really_long();
+
+ const inputFile = getTestTempFile("bulk-input");
+ const outputFile = getTestTempFile("bulk-output");
+
+ Assert.equal(inputFile.fileSize, reallyLong.length);
+ Assert.equal(outputFile.fileSize, reallyLong.length);
+
+ // Ensure output file contents actually match
+ return new Promise(resolve => {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-output")),
+ loadUsingSystemPrincipal: true,
+ },
+ input => {
+ const outputData = NetUtil.readInputStreamToString(
+ input,
+ reallyLong.length
+ );
+ // Avoid do_check_eq here so we don't log the contents
+ Assert.ok(outputData === reallyLong);
+ input.close();
+ resolve();
+ }
+ );
+ }).then(cleanup_files);
+}
+
+function cleanup_files() {
+ const inputFile = getTestTempFile("bulk-input", true);
+ if (inputFile.exists()) {
+ inputFile.remove(false);
+ }
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ if (outputFile.exists()) {
+ outputFile.remove(false);
+ }
+}
diff --git a/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js b/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js
new file mode 100644
index 0000000000..137cbd2679
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/test_transport_bulk.js
@@ -0,0 +1,169 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+
+"use strict";
+
+var { FileUtils } = ChromeUtils.importESModule(
+ "resource://gre/modules/FileUtils.sys.mjs"
+);
+
+function run_test() {
+ initTestDevToolsServer();
+
+ add_task(async function () {
+ await test_bulk_transfer_transport(socket_transport);
+ await test_bulk_transfer_transport(local_transport);
+ DevToolsServer.destroy();
+ });
+
+ run_next_test();
+}
+
+/** * Tests ***/
+
+/**
+ * This tests a one-way bulk transfer at the transport layer.
+ */
+var test_bulk_transfer_transport = async function (transportFactory) {
+ info("Starting bulk transfer test at " + new Date().toTimeString());
+
+ let clientResolve;
+ const clientDeferred = new Promise(resolve => {
+ clientResolve = resolve;
+ });
+
+ let serverResolve;
+ const serverDeferred = new Promise(resolve => {
+ serverResolve = resolve;
+ });
+
+ // Ensure test files are not present from a failed run
+ cleanup_files();
+ const reallyLong = really_long();
+ writeTestTempFile("bulk-input", reallyLong);
+
+ Assert.equal(Object.keys(DevToolsServer._connections).length, 0);
+
+ const transport = await transportFactory();
+
+ // Sending from client to server
+ function write_data({ copyFrom }) {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-input")),
+ loadUsingSystemPrincipal: true,
+ },
+ function (input, status) {
+ copyFrom(input).then(() => {
+ input.close();
+ });
+ }
+ );
+ }
+
+ // Receiving on server from client
+ function on_bulk_packet({ actor, type, length, copyTo }) {
+ Assert.equal(actor, "root");
+ Assert.equal(type, "file-stream");
+ Assert.equal(length, reallyLong.length);
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ outputFile.create(Ci.nsIFile.NORMAL_FILE_TYPE, parseInt("666", 8));
+
+ const output = FileUtils.openSafeFileOutputStream(outputFile);
+
+ copyTo(output)
+ .then(() => {
+ FileUtils.closeSafeFileOutputStream(output);
+ return verify();
+ })
+ .then(() => {
+ // It's now safe to close
+ transport.hooks.onTransportClosed = () => {
+ clientResolve();
+ };
+ transport.close();
+ });
+ }
+
+ // Client
+ transport.hooks = {
+ onPacket(packet) {
+ // We've received the initial start up packet
+ Assert.equal(packet.from, "root");
+
+ // Server
+ Assert.equal(Object.keys(DevToolsServer._connections).length, 1);
+ info(Object.keys(DevToolsServer._connections));
+ for (const connId in DevToolsServer._connections) {
+ DevToolsServer._connections[connId].onBulkPacket = on_bulk_packet;
+ }
+
+ DevToolsServer.on("connectionchange", type => {
+ if (type === "closed") {
+ serverResolve();
+ }
+ });
+
+ transport
+ .startBulkSend({
+ actor: "root",
+ type: "file-stream",
+ length: reallyLong.length,
+ })
+ .then(write_data);
+ },
+
+ onTransportClosed() {
+ do_throw("Transport closed before we expected");
+ },
+ };
+
+ transport.ready();
+
+ return Promise.all([clientDeferred, serverDeferred]);
+};
+
+/** * Test Utils ***/
+
+function verify() {
+ const reallyLong = really_long();
+
+ const inputFile = getTestTempFile("bulk-input");
+ const outputFile = getTestTempFile("bulk-output");
+
+ Assert.equal(inputFile.fileSize, reallyLong.length);
+ Assert.equal(outputFile.fileSize, reallyLong.length);
+
+ // Ensure output file contents actually match
+ return new Promise(resolve => {
+ NetUtil.asyncFetch(
+ {
+ uri: NetUtil.newURI(getTestTempFile("bulk-output")),
+ loadUsingSystemPrincipal: true,
+ },
+ input => {
+ const outputData = NetUtil.readInputStreamToString(
+ input,
+ reallyLong.length
+ );
+ // Avoid do_check_eq here so we don't log the contents
+ Assert.ok(outputData === reallyLong);
+ input.close();
+ resolve();
+ }
+ );
+ }).then(cleanup_files);
+}
+
+function cleanup_files() {
+ const inputFile = getTestTempFile("bulk-input", true);
+ if (inputFile.exists()) {
+ inputFile.remove(false);
+ }
+
+ const outputFile = getTestTempFile("bulk-output", true);
+ if (outputFile.exists()) {
+ outputFile.remove(false);
+ }
+}
diff --git a/devtools/shared/transport/tests/xpcshell/testactors.js b/devtools/shared/transport/tests/xpcshell/testactors.js
new file mode 100644
index 0000000000..0a35f05287
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/testactors.js
@@ -0,0 +1,16 @@
+/* Any copyright is dedicated to the Public Domain.
+ http://creativecommons.org/publicdomain/zero/1.0/ */
+"use strict";
+
+const { RootActor } = require("resource://devtools/server/actors/root.js");
+const {
+ ActorRegistry,
+} = require("resource://devtools/server/actors/utils/actor-registry.js");
+
+exports.createRootActor = function createRootActor(connection) {
+ const root = new RootActor(connection, {
+ globalActorFactories: ActorRegistry.globalActorFactories,
+ });
+ root.applicationType = "xpcshell-tests";
+ return root;
+};
diff --git a/devtools/shared/transport/tests/xpcshell/xpcshell.ini b/devtools/shared/transport/tests/xpcshell/xpcshell.ini
new file mode 100644
index 0000000000..fc369615c2
--- /dev/null
+++ b/devtools/shared/transport/tests/xpcshell/xpcshell.ini
@@ -0,0 +1,17 @@
+[DEFAULT]
+tags = devtools
+head = head_dbg.js
+firefox-appdir = browser
+skip-if = toolkit == 'android'
+
+support-files =
+ testactors.js
+
+[test_bulk_error.js]
+[test_client_server_bulk.js]
+[test_dbgsocket.js]
+[test_dbgsocket_connection_drop.js]
+[test_delimited_read.js]
+[test_packet.js]
+[test_queue.js]
+[test_transport_bulk.js]
diff --git a/devtools/shared/transport/transport.js b/devtools/shared/transport/transport.js
new file mode 100644
index 0000000000..460f45206f
--- /dev/null
+++ b/devtools/shared/transport/transport.js
@@ -0,0 +1,499 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
+const { dumpn, dumpv } = DevToolsUtils;
+const flags = require("resource://devtools/shared/flags.js");
+const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
+const {
+ Packet,
+ JSONPacket,
+ BulkPacket,
+} = require("resource://devtools/shared/transport/packets.js");
+
+loader.lazyGetter(this, "ScriptableInputStream", () => {
+ return Components.Constructor(
+ "@mozilla.org/scriptableinputstream;1",
+ "nsIScriptableInputStream",
+ "init"
+ );
+});
+
+const PACKET_HEADER_MAX = 200;
+
+/**
+ * An adapter that handles data transfers between the devtools client and
+ * server. It can work with both nsIPipe and nsIServerSocket transports so
+ * long as the properly created input and output streams are specified.
+ * (However, for intra-process connections, LocalDebuggerTransport, below,
+ * is more efficient than using an nsIPipe pair with DebuggerTransport.)
+ *
+ * @param input nsIAsyncInputStream
+ * The input stream.
+ * @param output nsIAsyncOutputStream
+ * The output stream.
+ *
+ * Given a DebuggerTransport instance dt:
+ * 1) Set dt.hooks to a packet handler object (described below).
+ * 2) Call dt.ready() to begin watching for input packets.
+ * 3) Call dt.send() / dt.startBulkSend() to send packets.
+ * 4) Call dt.close() to close the connection, and disengage from the event
+ * loop.
+ *
+ * A packet handler is an object with the following methods:
+ *
+ * - onPacket(packet) - called when we have received a complete packet.
+ * |packet| is the parsed form of the packet --- a JavaScript value, not
+ * a JSON-syntax string.
+ *
+ * - onBulkPacket(packet) - called when we have switched to bulk packet
+ * receiving mode. |packet| is an object containing:
+ * * actor: Name of actor that will receive the packet
+ * * type: Name of actor's method that should be called on receipt
+ * * length: Size of the data to be read
+ * * stream: This input stream should only be used directly if you can ensure
+ * that you will read exactly |length| bytes and will not close the
+ * stream when reading is complete
+ * * done: If you use the stream directly (instead of |copyTo| below), you
+ * must signal completion by resolving / rejecting this deferred.
+ * If it's rejected, the transport will be closed. If an Error is
+ * supplied as a rejection value, it will be logged via |dumpn|.
+ * If you do use |copyTo|, resolving is taken care of for you when
+ * copying completes.
+ * * copyTo: A helper function for getting your data out of the stream that
+ * meets the stream handling requirements above, and has the
+ * following signature:
+ * @param output nsIAsyncOutputStream
+ * The stream to copy to.
+ * @return Promise
+ * The promise is resolved when copying completes or rejected if any
+ * (unexpected) errors occur.
+ * This object also emits "progress" events for each chunk that is
+ * copied. See stream-utils.js.
+ *
+ * - onTransportClosed(reason) - called when the connection is closed. |reason| is
+ * an optional nsresult or object, typically passed when the transport is
+ * closed due to some error in a underlying stream.
+ *
+ * See ./packets.js and the Remote Debugging Protocol specification for more
+ * details on the format of these packets.
+ */
+function DebuggerTransport(input, output) {
+ this._input = input;
+ this._scriptableInput = new ScriptableInputStream(input);
+ this._output = output;
+
+ // The current incoming (possibly partial) header, which will determine which
+ // type of Packet |_incoming| below will become.
+ this._incomingHeader = "";
+ // The current incoming Packet object
+ this._incoming = null;
+ // A queue of outgoing Packet objects
+ this._outgoing = [];
+
+ this.hooks = null;
+ this.active = false;
+
+ this._incomingEnabled = true;
+ this._outgoingEnabled = true;
+
+ this.close = this.close.bind(this);
+}
+
+DebuggerTransport.prototype = {
+ /**
+ * Transmit an object as a JSON packet.
+ *
+ * This method returns immediately, without waiting for the entire
+ * packet to be transmitted, registering event handlers as needed to
+ * transmit the entire packet. Packets are transmitted in the order
+ * they are passed to this method.
+ */
+ send(object) {
+ const packet = new JSONPacket(this);
+ packet.object = object;
+ this._outgoing.push(packet);
+ this._flushOutgoing();
+ },
+
+ /**
+ * Transmit streaming data via a bulk packet.
+ *
+ * This method initiates the bulk send process by queuing up the header data.
+ * The caller receives eventual access to a stream for writing.
+ *
+ * N.B.: Do *not* attempt to close the stream handed to you, as it will
+ * continue to be used by this transport afterwards. Most users should
+ * instead use the provided |copyFrom| function instead.
+ *
+ * @param header Object
+ * This is modeled after the format of JSON packets above, but does not
+ * actually contain the data, but is instead just a routing header:
+ * * actor: Name of actor that will receive the packet
+ * * type: Name of actor's method that should be called on receipt
+ * * length: Size of the data to be sent
+ * @return Promise
+ * The promise will be resolved when you are allowed to write to the
+ * stream with an object containing:
+ * * stream: This output stream should only be used directly if
+ * you can ensure that you will write exactly |length|
+ * bytes and will not close the stream when writing is
+ * complete
+ * * done: If you use the stream directly (instead of |copyFrom|
+ * below), you must signal completion by resolving /
+ * rejecting this deferred. If it's rejected, the
+ * transport will be closed. If an Error is supplied as
+ * a rejection value, it will be logged via |dumpn|. If
+ * you do use |copyFrom|, resolving is taken care of for
+ * you when copying completes.
+ * * copyFrom: A helper function for getting your data onto the
+ * stream that meets the stream handling requirements
+ * above, and has the following signature:
+ * @param input nsIAsyncInputStream
+ * The stream to copy from.
+ * @return Promise
+ * The promise is resolved when copying completes or
+ * rejected if any (unexpected) errors occur.
+ * This object also emits "progress" events for each chunk
+ * that is copied. See stream-utils.js.
+ */
+ startBulkSend(header) {
+ const packet = new BulkPacket(this);
+ packet.header = header;
+ this._outgoing.push(packet);
+ this._flushOutgoing();
+ return packet.streamReadyForWriting;
+ },
+
+ /**
+ * Close the transport.
+ * @param reason nsresult / object (optional)
+ * The status code or error message that corresponds to the reason for
+ * closing the transport (likely because a stream closed or failed).
+ */
+ close(reason) {
+ this.active = false;
+ this._input.close();
+ this._scriptableInput.close();
+ this._output.close();
+ this._destroyIncoming();
+ this._destroyAllOutgoing();
+ if (this.hooks) {
+ this.hooks.onTransportClosed(reason);
+ this.hooks = null;
+ }
+ if (reason) {
+ dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason));
+ } else {
+ dumpn("Transport closed.");
+ }
+ },
+
+ /**
+ * The currently outgoing packet (at the top of the queue).
+ */
+ get _currentOutgoing() {
+ return this._outgoing[0];
+ },
+
+ /**
+ * Flush data to the outgoing stream. Waits until the output stream notifies
+ * us that it is ready to be written to (via onOutputStreamReady).
+ */
+ _flushOutgoing() {
+ if (!this._outgoingEnabled || this._outgoing.length === 0) {
+ return;
+ }
+
+ // If the top of the packet queue has nothing more to send, remove it.
+ if (this._currentOutgoing.done) {
+ this._finishCurrentOutgoing();
+ }
+
+ if (this._outgoing.length) {
+ const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
+ this._output.asyncWait(this, 0, 0, threadManager.currentThread);
+ }
+ },
+
+ /**
+ * Pause this transport's attempts to write to the output stream. This is
+ * used when we've temporarily handed off our output stream for writing bulk
+ * data.
+ */
+ pauseOutgoing() {
+ this._outgoingEnabled = false;
+ },
+
+ /**
+ * Resume this transport's attempts to write to the output stream.
+ */
+ resumeOutgoing() {
+ this._outgoingEnabled = true;
+ this._flushOutgoing();
+ },
+
+ // nsIOutputStreamCallback
+ /**
+ * This is called when the output stream is ready for more data to be written.
+ * The current outgoing packet will attempt to write some amount of data, but
+ * may not complete.
+ */
+ onOutputStreamReady: DevToolsUtils.makeInfallible(function (stream) {
+ if (!this._outgoingEnabled || this._outgoing.length === 0) {
+ return;
+ }
+
+ try {
+ this._currentOutgoing.write(stream);
+ } catch (e) {
+ if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+ this.close(e.result);
+ return;
+ }
+ throw e;
+ }
+
+ this._flushOutgoing();
+ }, "DebuggerTransport.prototype.onOutputStreamReady"),
+
+ /**
+ * Remove the current outgoing packet from the queue upon completion.
+ */
+ _finishCurrentOutgoing() {
+ if (this._currentOutgoing) {
+ this._currentOutgoing.destroy();
+ this._outgoing.shift();
+ }
+ },
+
+ /**
+ * Clear the entire outgoing queue.
+ */
+ _destroyAllOutgoing() {
+ for (const packet of this._outgoing) {
+ packet.destroy();
+ }
+ this._outgoing = [];
+ },
+
+ /**
+ * Initialize the input stream for reading. Once this method has been called,
+ * we watch for packets on the input stream, and pass them to the appropriate
+ * handlers via this.hooks.
+ */
+ ready() {
+ this.active = true;
+ this._waitForIncoming();
+ },
+
+ /**
+ * Asks the input stream to notify us (via onInputStreamReady) when it is
+ * ready for reading.
+ */
+ _waitForIncoming() {
+ if (this._incomingEnabled) {
+ const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
+ this._input.asyncWait(this, 0, 0, threadManager.currentThread);
+ }
+ },
+
+ /**
+ * Pause this transport's attempts to read from the input stream. This is
+ * used when we've temporarily handed off our input stream for reading bulk
+ * data.
+ */
+ pauseIncoming() {
+ this._incomingEnabled = false;
+ },
+
+ /**
+ * Resume this transport's attempts to read from the input stream.
+ */
+ resumeIncoming() {
+ this._incomingEnabled = true;
+ this._flushIncoming();
+ this._waitForIncoming();
+ },
+
+ // nsIInputStreamCallback
+ /**
+ * Called when the stream is either readable or closed.
+ */
+ onInputStreamReady: DevToolsUtils.makeInfallible(function (stream) {
+ try {
+ while (
+ stream.available() &&
+ this._incomingEnabled &&
+ this._processIncoming(stream, stream.available())
+ ) {
+ // Loop until there is nothing more to process
+ }
+ this._waitForIncoming();
+ } catch (e) {
+ if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+ this.close(e.result);
+ } else {
+ throw e;
+ }
+ }
+ }, "DebuggerTransport.prototype.onInputStreamReady"),
+
+ /**
+ * Process the incoming data. Will create a new currently incoming Packet if
+ * needed. Tells the incoming Packet to read as much data as it can, but
+ * reading may not complete. The Packet signals that its data is ready for
+ * delivery by calling one of this transport's _on*Ready methods (see
+ * ./packets.js and the _on*Ready methods below).
+ * @return boolean
+ * Whether incoming stream processing should continue for any
+ * remaining data.
+ */
+ _processIncoming(stream, count) {
+ dumpv("Data available: " + count);
+
+ if (!count) {
+ dumpv("Nothing to read, skipping");
+ return false;
+ }
+
+ try {
+ if (!this._incoming) {
+ dumpv("Creating a new packet from incoming");
+
+ if (!this._readHeader(stream)) {
+ // Not enough data to read packet type
+ return false;
+ }
+
+ // Attempt to create a new Packet by trying to parse each possible
+ // header pattern.
+ this._incoming = Packet.fromHeader(this._incomingHeader, this);
+ if (!this._incoming) {
+ throw new Error(
+ "No packet types for header: " + this._incomingHeader
+ );
+ }
+ }
+
+ if (!this._incoming.done) {
+ // We have an incomplete packet, keep reading it.
+ dumpv("Existing packet incomplete, keep reading");
+ this._incoming.read(stream, this._scriptableInput);
+ }
+ } catch (e) {
+ const msg =
+ "Error reading incoming packet: (" + e + " - " + e.stack + ")";
+ dumpn(msg);
+
+ // Now in an invalid state, shut down the transport.
+ this.close();
+ return false;
+ }
+
+ if (!this._incoming.done) {
+ // Still not complete, we'll wait for more data.
+ dumpv("Packet not done, wait for more");
+ return true;
+ }
+
+ // Ready for next packet
+ this._flushIncoming();
+ return true;
+ },
+
+ /**
+ * Read as far as we can into the incoming data, attempting to build up a
+ * complete packet header (which terminates with ":"). We'll only read up to
+ * PACKET_HEADER_MAX characters.
+ * @return boolean
+ * True if we now have a complete header.
+ */
+ _readHeader() {
+ const amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
+ this._incomingHeader += StreamUtils.delimitedRead(
+ this._scriptableInput,
+ ":",
+ amountToRead
+ );
+ if (flags.wantVerbose) {
+ dumpv("Header read: " + this._incomingHeader);
+ }
+
+ if (this._incomingHeader.endsWith(":")) {
+ if (flags.wantVerbose) {
+ dumpv("Found packet header successfully: " + this._incomingHeader);
+ }
+ return true;
+ }
+
+ if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
+ throw new Error("Failed to parse packet header!");
+ }
+
+ // Not enough data yet.
+ return false;
+ },
+
+ /**
+ * If the incoming packet is done, log it as needed and clear the buffer.
+ */
+ _flushIncoming() {
+ if (!this._incoming.done) {
+ return;
+ }
+ if (flags.wantLogging) {
+ dumpn("Got: " + this._incoming);
+ }
+ this._destroyIncoming();
+ },
+
+ /**
+ * Handler triggered by an incoming JSONPacket completing it's |read| method.
+ * Delivers the packet to this.hooks.onPacket.
+ */
+ _onJSONObjectReady(object) {
+ DevToolsUtils.executeSoon(
+ DevToolsUtils.makeInfallible(() => {
+ // Ensure the transport is still alive by the time this runs.
+ if (this.active) {
+ this.hooks.onPacket(object);
+ }
+ }, "DebuggerTransport instance's this.hooks.onPacket")
+ );
+ },
+
+ /**
+ * Handler triggered by an incoming BulkPacket entering the |read| phase for
+ * the stream portion of the packet. Delivers info about the incoming
+ * streaming data to this.hooks.onBulkPacket. See the main comment on the
+ * transport at the top of this file for more details.
+ */
+ _onBulkReadReady(...args) {
+ DevToolsUtils.executeSoon(
+ DevToolsUtils.makeInfallible(() => {
+ // Ensure the transport is still alive by the time this runs.
+ if (this.active) {
+ this.hooks.onBulkPacket(...args);
+ }
+ }, "DebuggerTransport instance's this.hooks.onBulkPacket")
+ );
+ },
+
+ /**
+ * Remove all handlers and references related to the current incoming packet,
+ * either because it is now complete or because the transport is closing.
+ */
+ _destroyIncoming() {
+ if (this._incoming) {
+ this._incoming.destroy();
+ }
+ this._incomingHeader = "";
+ this._incoming = null;
+ },
+};
+
+exports.DebuggerTransport = DebuggerTransport;
diff --git a/devtools/shared/transport/websocket-transport.js b/devtools/shared/transport/websocket-transport.js
new file mode 100644
index 0000000000..b8255e0067
--- /dev/null
+++ b/devtools/shared/transport/websocket-transport.js
@@ -0,0 +1,86 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+const EventEmitter = require("resource://devtools/shared/event-emitter.js");
+
+function WebSocketDebuggerTransport(socket) {
+ EventEmitter.decorate(this);
+
+ this.active = false;
+ this.hooks = null;
+ this.socket = socket;
+}
+
+WebSocketDebuggerTransport.prototype = {
+ ready() {
+ if (this.active) {
+ return;
+ }
+
+ this.socket.addEventListener("message", this);
+ this.socket.addEventListener("close", this);
+
+ this.active = true;
+ },
+
+ send(object) {
+ this.emit("send", object);
+ if (this.socket) {
+ this.socket.send(JSON.stringify(object));
+ }
+ },
+
+ startBulkSend() {
+ throw new Error("Bulk send is not supported by WebSocket transport");
+ },
+
+ close() {
+ if (!this.socket) {
+ return;
+ }
+ this.emit("close");
+ this.active = false;
+
+ this.socket.removeEventListener("message", this);
+ this.socket.removeEventListener("close", this);
+ this.socket.close();
+ this.socket = null;
+
+ if (this.hooks) {
+ if (this.hooks.onTransportClosed) {
+ this.hooks.onTransportClosed();
+ }
+ this.hooks = null;
+ }
+ },
+
+ handleEvent(event) {
+ switch (event.type) {
+ case "message":
+ this.onMessage(event);
+ break;
+ case "close":
+ this.close();
+ break;
+ }
+ },
+
+ onMessage({ data }) {
+ if (typeof data !== "string") {
+ throw new Error(
+ "Binary messages are not supported by WebSocket transport"
+ );
+ }
+
+ const object = JSON.parse(data);
+ this.emit("packet", object);
+ if (this.hooks) {
+ this.hooks.onPacket(object);
+ }
+ },
+};
+
+module.exports = WebSocketDebuggerTransport;
diff --git a/devtools/shared/transport/worker-transport.js b/devtools/shared/transport/worker-transport.js
new file mode 100644
index 0000000000..903fd69cf4
--- /dev/null
+++ b/devtools/shared/transport/worker-transport.js
@@ -0,0 +1,113 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+"use strict";
+
+// Each worker debugger supports only a single connection to the main thread.
+// However, its theoretically possible for multiple servers to connect to the
+// same worker. Consequently, each transport has a connection id, to allow
+// messages from multiple connections to be multiplexed on a single channel.
+
+/**
+ * A transport that uses a WorkerDebugger to send packets from the main
+ * thread to a worker thread.
+ */
+class MainThreadWorkerDebuggerTransport {
+ constructor(dbg, id) {
+ this._dbg = dbg;
+ this._id = id;
+
+ this._dbgListener = {
+ onMessage: this._onMessage.bind(this),
+ };
+ }
+
+ ready() {
+ this._dbg.addListener(this._dbgListener);
+ }
+
+ close() {
+ if (this._dbgListener) {
+ this._dbg.removeListener(this._dbgListener);
+ }
+ this._dbgListener = null;
+ this.hooks?.onTransportClosed();
+ }
+
+ send(packet) {
+ this._dbg.postMessage(
+ JSON.stringify({
+ type: "message",
+ id: this._id,
+ message: packet,
+ })
+ );
+ }
+
+ startBulkSend() {
+ throw new Error("Can't send bulk data from worker threads!");
+ }
+
+ _onMessage(message) {
+ const packet = JSON.parse(message);
+ if (packet.type !== "message" || packet.id !== this._id || !this.hooks) {
+ return;
+ }
+
+ this.hooks.onPacket(packet.message);
+ }
+}
+
+exports.MainThreadWorkerDebuggerTransport = MainThreadWorkerDebuggerTransport;
+
+/**
+ * A transport that uses a WorkerDebuggerGlobalScope to send packets from a
+ * worker thread to the main thread.
+ */
+function WorkerThreadWorkerDebuggerTransport(scope, id) {
+ this._scope = scope;
+ this._id = id;
+ this._onMessage = this._onMessage.bind(this);
+}
+
+WorkerThreadWorkerDebuggerTransport.prototype = {
+ constructor: WorkerThreadWorkerDebuggerTransport,
+
+ ready() {
+ this._scope.addEventListener("message", this._onMessage);
+ },
+
+ close() {
+ this._scope.removeEventListener("message", this._onMessage);
+ this.hooks?.onTransportClosed();
+ },
+
+ send(packet) {
+ this._scope.postMessage(
+ JSON.stringify({
+ type: "message",
+ id: this._id,
+ message: packet,
+ })
+ );
+ },
+
+ startBulkSend() {
+ throw new Error("Can't send bulk data from worker threads!");
+ },
+
+ _onMessage(event) {
+ const packet = JSON.parse(event.data);
+ if (packet.type !== "message" || packet.id !== this._id) {
+ return;
+ }
+
+ if (this.hooks) {
+ this.hooks.onPacket(packet.message);
+ }
+ },
+};
+
+exports.WorkerThreadWorkerDebuggerTransport =
+ WorkerThreadWorkerDebuggerTransport;