diff options
Diffstat (limited to 'remote/marionette/transport.sys.mjs')
-rw-r--r-- | remote/marionette/transport.sys.mjs | 529 |
1 files changed, 529 insertions, 0 deletions
diff --git a/remote/marionette/transport.sys.mjs b/remote/marionette/transport.sys.mjs new file mode 100644 index 0000000000..98f6a524f6 --- /dev/null +++ b/remote/marionette/transport.sys.mjs @@ -0,0 +1,529 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs", + + BulkPacket: "chrome://remote/content/marionette/packets.sys.mjs", + executeSoon: "chrome://remote/content/marionette/sync.sys.mjs", + JSONPacket: "chrome://remote/content/marionette/packets.sys.mjs", + Packet: "chrome://remote/content/marionette/packets.sys.mjs", + StreamUtils: "chrome://remote/content/marionette/stream-utils.sys.mjs", +}); + +XPCOMUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => { + return Components.Constructor( + "@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", + "init" + ); +}); + +const flags = { wantVerbose: false, wantLogging: false }; + +const dumpv = flags.wantVerbose + ? function (msg) { + dump(msg + "\n"); + } + : function () {}; + +const PACKET_HEADER_MAX = 200; + +/** + * An adapter that handles data transfers between the debugger client + * and server. It can work with both nsIPipe and nsIServerSocket + * transports so long as the properly created input and output streams + * are specified. (However, for intra-process connections, + * LocalDebuggerTransport, below, is more efficient than using an nsIPipe + * pair with DebuggerTransport.) + * + * @param {nsIAsyncInputStream} input + * The input stream. + * @param {nsIAsyncOutputStream} output + * The output stream. + * + * Given a DebuggerTransport instance dt: + * 1) Set dt.hooks to a packet handler object (described below). + * 2) Call dt.ready() to begin watching for input packets. + * 3) Call dt.send() / dt.startBulkSend() to send packets. + * 4) Call dt.close() to close the connection, and disengage from + * the event loop. + * + * A packet handler is an object with the following methods: + * + * - onPacket(packet) - called when we have received a complete packet. + * |packet| is the parsed form of the packet --- a JavaScript value, not + * a JSON-syntax string. + * + * - onBulkPacket(packet) - called when we have switched to bulk packet + * receiving mode. |packet| is an object containing: + * actor: Name of actor that will receive the packet + * type: Name of actor's method that should be called on receipt + * length: Size of the data to be read + * stream: This input stream should only be used directly if you + * can ensure that you will read exactly |length| bytes and + * will not close the stream when reading is complete + * done: If you use the stream directly (instead of |copyTo| + * below), you must signal completion by resolving/rejecting + * this deferred. If it's rejected, the transport will + * be closed. If an Error is supplied as a rejection value, + * it will be logged via |dump|. If you do use |copyTo|, + * resolving is taken care of for you when copying completes. + * copyTo: A helper function for getting your data out of the + * stream that meets the stream handling requirements above, + * and has the following signature: + * + * - params + * {nsIAsyncOutputStream} output + * The stream to copy to. + * - returns {Promise} + * The promise is resolved when copying completes or + * rejected if any (unexpected) errors occur. This object + * also emits "progress" events for each chunk that is + * copied. See stream-utils.js. + * + * - onClosed(reason) - called when the connection is closed. |reason| + * is an optional nsresult or object, typically passed when the + * transport is closed due to some error in a underlying stream. + * + * See ./packets.js and the Remote Debugging Protocol specification for + * more details on the format of these packets. + * + * @class + */ +export function DebuggerTransport(input, output) { + lazy.EventEmitter.decorate(this); + + this._input = input; + this._scriptableInput = new lazy.ScriptableInputStream(input); + this._output = output; + + // The current incoming (possibly partial) header, which will determine + // which type of Packet |_incoming| below will become. + this._incomingHeader = ""; + // The current incoming Packet object + this._incoming = null; + // A queue of outgoing Packet objects + this._outgoing = []; + + this.hooks = null; + this.active = false; + + this._incomingEnabled = true; + this._outgoingEnabled = true; + + this.close = this.close.bind(this); +} + +DebuggerTransport.prototype = { + /** + * Transmit an object as a JSON packet. + * + * This method returns immediately, without waiting for the entire + * packet to be transmitted, registering event handlers as needed to + * transmit the entire packet. Packets are transmitted in the order they + * are passed to this method. + */ + send(object) { + this.emit("send", object); + + let packet = new lazy.JSONPacket(this); + packet.object = object; + this._outgoing.push(packet); + this._flushOutgoing(); + }, + + /** + * Transmit streaming data via a bulk packet. + * + * This method initiates the bulk send process by queuing up the header + * data. The caller receives eventual access to a stream for writing. + * + * N.B.: Do *not* attempt to close the stream handed to you, as it + * will continue to be used by this transport afterwards. Most users + * should instead use the provided |copyFrom| function instead. + * + * @param {object} header + * This is modeled after the format of JSON packets above, but does + * not actually contain the data, but is instead just a routing + * header: + * + * - actor: Name of actor that will receive the packet + * - type: Name of actor's method that should be called on receipt + * - length: Size of the data to be sent + * + * @returns {Promise} + * The promise will be resolved when you are allowed to write to + * the stream with an object containing: + * + * - stream: This output stream should only be used directly + * if you can ensure that you will write exactly + * |length| bytes and will not close the stream when + * writing is complete. + * - done: If you use the stream directly (instead of + * |copyFrom| below), you must signal completion by + * resolving/rejecting this deferred. If it's + * rejected, the transport will be closed. If an + * Error is supplied as a rejection value, it will + * be logged via |dump|. If you do use |copyFrom|, + * resolving is taken care of for you when copying + * completes. + * - copyFrom: A helper function for getting your data onto the + * stream that meets the stream handling requirements + * above, and has the following signature: + * + * - params + * {nsIAsyncInputStream} input + * The stream to copy from. + * - returns {Promise} + * The promise is resolved when copying completes + * or rejected if any (unexpected) errors occur. + * This object also emits "progress" events for + * each chunkthat is copied. See stream-utils.js. + */ + startBulkSend(header) { + this.emit("startbulksend", header); + + let packet = new lazy.BulkPacket(this); + packet.header = header; + this._outgoing.push(packet); + this._flushOutgoing(); + return packet.streamReadyForWriting; + }, + + /** + * Close the transport. + * + * @param {(nsresult|object)=} reason + * The status code or error message that corresponds to the reason + * for closing the transport (likely because a stream closed + * or failed). + */ + close(reason) { + this.emit("close", reason); + + this.active = false; + this._input.close(); + this._scriptableInput.close(); + this._output.close(); + this._destroyIncoming(); + this._destroyAllOutgoing(); + if (this.hooks) { + this.hooks.onClosed(reason); + this.hooks = null; + } + if (reason) { + dumpv("Transport closed: " + reason); + } else { + dumpv("Transport closed."); + } + }, + + /** + * The currently outgoing packet (at the top of the queue). + */ + get _currentOutgoing() { + return this._outgoing[0]; + }, + + /** + * Flush data to the outgoing stream. Waits until the output + * stream notifies us that it is ready to be written to (via + * onOutputStreamReady). + */ + _flushOutgoing() { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + // If the top of the packet queue has nothing more to send, remove it. + if (this._currentOutgoing.done) { + this._finishCurrentOutgoing(); + } + + if (this._outgoing.length) { + let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._output.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to write to the output stream. + * This is used when we've temporarily handed off our output stream for + * writing bulk data. + */ + pauseOutgoing() { + this._outgoingEnabled = false; + }, + + /** + * Resume this transport's attempts to write to the output stream. + */ + resumeOutgoing() { + this._outgoingEnabled = true; + this._flushOutgoing(); + }, + + // nsIOutputStreamCallback + /** + * This is called when the output stream is ready for more data to + * be written. The current outgoing packet will attempt to write some + * amount of data, but may not complete. + */ + onOutputStreamReady(stream) { + if (!this._outgoingEnabled || this._outgoing.length === 0) { + return; + } + + try { + this._currentOutgoing.write(stream); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + return; + } + throw e; + } + + this._flushOutgoing(); + }, + + /** + * Remove the current outgoing packet from the queue upon completion. + */ + _finishCurrentOutgoing() { + if (this._currentOutgoing) { + this._currentOutgoing.destroy(); + this._outgoing.shift(); + } + }, + + /** + * Clear the entire outgoing queue. + */ + _destroyAllOutgoing() { + for (let packet of this._outgoing) { + packet.destroy(); + } + this._outgoing = []; + }, + + /** + * Initialize the input stream for reading. Once this method has been + * called, we watch for packets on the input stream, and pass them to + * the appropriate handlers via this.hooks. + */ + ready() { + this.active = true; + this._waitForIncoming(); + }, + + /** + * Asks the input stream to notify us (via onInputStreamReady) when it is + * ready for reading. + */ + _waitForIncoming() { + if (this._incomingEnabled) { + let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); + this._input.asyncWait(this, 0, 0, threadManager.currentThread); + } + }, + + /** + * Pause this transport's attempts to read from the input stream. + * This is used when we've temporarily handed off our input stream for + * reading bulk data. + */ + pauseIncoming() { + this._incomingEnabled = false; + }, + + /** + * Resume this transport's attempts to read from the input stream. + */ + resumeIncoming() { + this._incomingEnabled = true; + this._flushIncoming(); + this._waitForIncoming(); + }, + + // nsIInputStreamCallback + /** + * Called when the stream is either readable or closed. + */ + onInputStreamReady(stream) { + try { + while ( + stream.available() && + this._incomingEnabled && + this._processIncoming(stream, stream.available()) + ) { + // Loop until there is nothing more to process + } + this._waitForIncoming(); + } catch (e) { + if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this.close(e.result); + } else { + throw e; + } + } + }, + + /** + * Process the incoming data. Will create a new currently incoming + * Packet if needed. Tells the incoming Packet to read as much data + * as it can, but reading may not complete. The Packet signals that + * its data is ready for delivery by calling one of this transport's + * _on*Ready methods (see ./packets.js and the _on*Ready methods below). + * + * @returns {boolean} + * Whether incoming stream processing should continue for any + * remaining data. + */ + _processIncoming(stream, count) { + dumpv("Data available: " + count); + + if (!count) { + dumpv("Nothing to read, skipping"); + return false; + } + + try { + if (!this._incoming) { + dumpv("Creating a new packet from incoming"); + + if (!this._readHeader(stream)) { + // Not enough data to read packet type + return false; + } + + // Attempt to create a new Packet by trying to parse each possible + // header pattern. + this._incoming = lazy.Packet.fromHeader(this._incomingHeader, this); + if (!this._incoming) { + throw new Error( + "No packet types for header: " + this._incomingHeader + ); + } + } + + if (!this._incoming.done) { + // We have an incomplete packet, keep reading it. + dumpv("Existing packet incomplete, keep reading"); + this._incoming.read(stream, this._scriptableInput); + } + } catch (e) { + dump(`Error reading incoming packet: (${e} - ${e.stack})\n`); + + // Now in an invalid state, shut down the transport. + this.close(); + return false; + } + + if (!this._incoming.done) { + // Still not complete, we'll wait for more data. + dumpv("Packet not done, wait for more"); + return true; + } + + // Ready for next packet + this._flushIncoming(); + return true; + }, + + /** + * Read as far as we can into the incoming data, attempting to build + * up a complete packet header (which terminates with ":"). We'll only + * read up to PACKET_HEADER_MAX characters. + * + * @returns {boolean} + * True if we now have a complete header. + */ + _readHeader() { + let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length; + this._incomingHeader += lazy.StreamUtils.delimitedRead( + this._scriptableInput, + ":", + amountToRead + ); + if (flags.wantVerbose) { + dumpv("Header read: " + this._incomingHeader); + } + + if (this._incomingHeader.endsWith(":")) { + if (flags.wantVerbose) { + dumpv("Found packet header successfully: " + this._incomingHeader); + } + return true; + } + + if (this._incomingHeader.length >= PACKET_HEADER_MAX) { + throw new Error("Failed to parse packet header!"); + } + + // Not enough data yet. + return false; + }, + + /** + * If the incoming packet is done, log it as needed and clear the buffer. + */ + _flushIncoming() { + if (!this._incoming.done) { + return; + } + if (flags.wantLogging) { + dumpv("Got: " + this._incoming); + } + this._destroyIncoming(); + }, + + /** + * Handler triggered by an incoming JSONPacket completing it's |read| + * method. Delivers the packet to this.hooks.onPacket. + */ + _onJSONObjectReady(object) { + lazy.executeSoon(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.emit("packet", object); + this.hooks.onPacket(object); + } + }); + }, + + /** + * Handler triggered by an incoming BulkPacket entering the |read| + * phase for the stream portion of the packet. Delivers info about the + * incoming streaming data to this.hooks.onBulkPacket. See the main + * comment on the transport at the top of this file for more details. + */ + _onBulkReadReady(...args) { + lazy.executeSoon(() => { + // Ensure the transport is still alive by the time this runs. + if (this.active) { + this.emit("bulkpacket", ...args); + this.hooks.onBulkPacket(...args); + } + }); + }, + + /** + * Remove all handlers and references related to the current incoming + * packet, either because it is now complete or because the transport + * is closing. + */ + _destroyIncoming() { + if (this._incoming) { + this._incoming.destroy(); + } + this._incomingHeader = ""; + this._incoming = null; + }, +}; |