/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";

const lazy = {};

ChromeUtils.defineESModuleGetters(lazy, {
  EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",

  BulkPacket: "chrome://remote/content/marionette/packets.sys.mjs",
  executeSoon: "chrome://remote/content/marionette/sync.sys.mjs",
  JSONPacket: "chrome://remote/content/marionette/packets.sys.mjs",
  Packet: "chrome://remote/content/marionette/packets.sys.mjs",
  StreamUtils: "chrome://remote/content/marionette/stream-utils.sys.mjs",
});

XPCOMUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
  return Components.Constructor(
    "@mozilla.org/scriptableinputstream;1",
    "nsIScriptableInputStream",
    "init"
  );
});

const flags = { wantVerbose: false, wantLogging: false };

const dumpv = flags.wantVerbose
  ? function (msg) {
      dump(msg + "\n");
    }
  : function () {};

const PACKET_HEADER_MAX = 200;

/**
 * An adapter that handles data transfers between the debugger client
 * and server. It can work with both nsIPipe and nsIServerSocket
 * transports so long as the properly created input and output streams
 * are specified.  (However, for intra-process connections,
 * LocalDebuggerTransport, below, is more efficient than using an nsIPipe
 * pair with DebuggerTransport.)
 *
 * @param {nsIAsyncInputStream} input
 *     The input stream.
 * @param {nsIAsyncOutputStream} output
 *     The output stream.
 *
 * Given a DebuggerTransport instance dt:
 * 1) Set dt.hooks to a packet handler object (described below).
 * 2) Call dt.ready() to begin watching for input packets.
 * 3) Call dt.send() / dt.startBulkSend() to send packets.
 * 4) Call dt.close() to close the connection, and disengage from
 *    the event loop.
 *
 * A packet handler is an object with the following methods:
 *
 * - onPacket(packet) - called when we have received a complete packet.
 *   |packet| is the parsed form of the packet --- a JavaScript value, not
 *   a JSON-syntax string.
 *
 * - onBulkPacket(packet) - called when we have switched to bulk packet
 *   receiving mode. |packet| is an object containing:
 *   actor:  Name of actor that will receive the packet
 *   type:   Name of actor's method that should be called on receipt
 *   length: Size of the data to be read
 *   stream: This input stream should only be used directly if you
 *             can ensure that you will read exactly |length| bytes and
 *             will not close the stream when reading is complete
 *   done:   If you use the stream directly (instead of |copyTo|
 *             below), you must signal completion by resolving/rejecting
 *             this deferred.  If it's rejected, the transport will
 *             be closed.  If an Error is supplied as a rejection value,
 *             it will be logged via |dump|.  If you do use |copyTo|,
 *             resolving is taken care of for you when copying completes.
 *   copyTo: A helper function for getting your data out of the
 *             stream that meets the stream handling requirements above,
 *             and has the following signature:
 *
 *             - params
 *                {nsIAsyncOutputStream} output
 *                  The stream to copy to.
 *             - returns {Promise}
 *                  The promise is resolved when copying completes or
 *                  rejected if any (unexpected) errors occur.  This object
 *                  also emits "progress" events for each chunk that is
 *                  copied.  See stream-utils.js.
 *
 * - onClosed(reason) - called when the connection is closed. |reason|
 *   is an optional nsresult or object, typically passed when the
 *   transport is closed due to some error in a underlying stream.
 *
 * See ./packets.js and the Remote Debugging Protocol specification for
 * more details on the format of these packets.
 *
 * @class
 */
export function DebuggerTransport(input, output) {
  lazy.EventEmitter.decorate(this);

  this._input = input;
  this._scriptableInput = new lazy.ScriptableInputStream(input);
  this._output = output;

  // The current incoming (possibly partial) header, which will determine
  // which type of Packet |_incoming| below will become.
  this._incomingHeader = "";
  // The current incoming Packet object
  this._incoming = null;
  // A queue of outgoing Packet objects
  this._outgoing = [];

  this.hooks = null;
  this.active = false;

  this._incomingEnabled = true;
  this._outgoingEnabled = true;

  this.close = this.close.bind(this);
}

DebuggerTransport.prototype = {
  /**
   * Transmit an object as a JSON packet.
   *
   * This method returns immediately, without waiting for the entire
   * packet to be transmitted, registering event handlers as needed to
   * transmit the entire packet. Packets are transmitted in the order they
   * are passed to this method.
   */
  send(object) {
    this.emit("send", object);

    let packet = new lazy.JSONPacket(this);
    packet.object = object;
    this._outgoing.push(packet);
    this._flushOutgoing();
  },

  /**
   * Transmit streaming data via a bulk packet.
   *
   * This method initiates the bulk send process by queuing up the header
   * data.  The caller receives eventual access to a stream for writing.
   *
   * N.B.: Do *not* attempt to close the stream handed to you, as it
   * will continue to be used by this transport afterwards.  Most users
   * should instead use the provided |copyFrom| function instead.
   *
   * @param {object} header
   *     This is modeled after the format of JSON packets above, but does
   *     not actually contain the data, but is instead just a routing
   *     header:
   *
   *       - actor:  Name of actor that will receive the packet
   *       - type:   Name of actor's method that should be called on receipt
   *       - length: Size of the data to be sent
   *
   * @returns {Promise}
   *     The promise will be resolved when you are allowed to write to
   *     the stream with an object containing:
   *
   *       - stream:   This output stream should only be used directly
   *                   if you can ensure that you will write exactly
   *                   |length| bytes and will not close the stream when
   *                    writing is complete.
   *       - done:     If you use the stream directly (instead of
   *                   |copyFrom| below), you must signal completion by
   *                   resolving/rejecting this deferred.  If it's
   *                   rejected, the transport will be closed.  If an
   *                   Error is supplied as a rejection value, it will
   *                   be logged via |dump|.  If you do use |copyFrom|,
   *                   resolving is taken care of for you when copying
   *                   completes.
   *       - copyFrom: A helper function for getting your data onto the
   *                   stream that meets the stream handling requirements
   *                   above, and has the following signature:
   *
   *                   - params
   *                     {nsIAsyncInputStream} input
   *                       The stream to copy from.
   *                   - returns {Promise}
   *                       The promise is resolved when copying completes
   *                       or rejected if any (unexpected) errors occur.
   *                       This object also emits "progress" events for
   *                       each chunkthat is copied.  See stream-utils.js.
   */
  startBulkSend(header) {
    this.emit("startbulksend", header);

    let packet = new lazy.BulkPacket(this);
    packet.header = header;
    this._outgoing.push(packet);
    this._flushOutgoing();
    return packet.streamReadyForWriting;
  },

  /**
   * Close the transport.
   *
   * @param {(nsresult|object)=} reason
   *     The status code or error message that corresponds to the reason
   *     for closing the transport (likely because a stream closed
   *     or failed).
   */
  close(reason) {
    this.emit("close", reason);

    this.active = false;
    this._input.close();
    this._scriptableInput.close();
    this._output.close();
    this._destroyIncoming();
    this._destroyAllOutgoing();
    if (this.hooks) {
      this.hooks.onClosed(reason);
      this.hooks = null;
    }
    if (reason) {
      dumpv("Transport closed: " + reason);
    } else {
      dumpv("Transport closed.");
    }
  },

  /**
   * The currently outgoing packet (at the top of the queue).
   */
  get _currentOutgoing() {
    return this._outgoing[0];
  },

  /**
   * Flush data to the outgoing stream.  Waits until the output
   * stream notifies us that it is ready to be written to (via
   * onOutputStreamReady).
   */
  _flushOutgoing() {
    if (!this._outgoingEnabled || this._outgoing.length === 0) {
      return;
    }

    // If the top of the packet queue has nothing more to send, remove it.
    if (this._currentOutgoing.done) {
      this._finishCurrentOutgoing();
    }

    if (this._outgoing.length) {
      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
      this._output.asyncWait(this, 0, 0, threadManager.currentThread);
    }
  },

  /**
   * Pause this transport's attempts to write to the output stream.
   * This is used when we've temporarily handed off our output stream for
   * writing bulk data.
   */
  pauseOutgoing() {
    this._outgoingEnabled = false;
  },

  /**
   * Resume this transport's attempts to write to the output stream.
   */
  resumeOutgoing() {
    this._outgoingEnabled = true;
    this._flushOutgoing();
  },

  // nsIOutputStreamCallback
  /**
   * This is called when the output stream is ready for more data to
   * be written.  The current outgoing packet will attempt to write some
   * amount of data, but may not complete.
   */
  onOutputStreamReady(stream) {
    if (!this._outgoingEnabled || this._outgoing.length === 0) {
      return;
    }

    try {
      this._currentOutgoing.write(stream);
    } catch (e) {
      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
        this.close(e.result);
        return;
      }
      throw e;
    }

    this._flushOutgoing();
  },

  /**
   * Remove the current outgoing packet from the queue upon completion.
   */
  _finishCurrentOutgoing() {
    if (this._currentOutgoing) {
      this._currentOutgoing.destroy();
      this._outgoing.shift();
    }
  },

  /**
   * Clear the entire outgoing queue.
   */
  _destroyAllOutgoing() {
    for (let packet of this._outgoing) {
      packet.destroy();
    }
    this._outgoing = [];
  },

  /**
   * Initialize the input stream for reading. Once this method has been
   * called, we watch for packets on the input stream, and pass them to
   * the appropriate handlers via this.hooks.
   */
  ready() {
    this.active = true;
    this._waitForIncoming();
  },

  /**
   * Asks the input stream to notify us (via onInputStreamReady) when it is
   * ready for reading.
   */
  _waitForIncoming() {
    if (this._incomingEnabled) {
      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
      this._input.asyncWait(this, 0, 0, threadManager.currentThread);
    }
  },

  /**
   * Pause this transport's attempts to read from the input stream.
   * This is used when we've temporarily handed off our input stream for
   * reading bulk data.
   */
  pauseIncoming() {
    this._incomingEnabled = false;
  },

  /**
   * Resume this transport's attempts to read from the input stream.
   */
  resumeIncoming() {
    this._incomingEnabled = true;
    this._flushIncoming();
    this._waitForIncoming();
  },

  // nsIInputStreamCallback
  /**
   * Called when the stream is either readable or closed.
   */
  onInputStreamReady(stream) {
    try {
      while (
        stream.available() &&
        this._incomingEnabled &&
        this._processIncoming(stream, stream.available())
      ) {
        // Loop until there is nothing more to process
      }
      this._waitForIncoming();
    } catch (e) {
      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
        this.close(e.result);
      } else {
        throw e;
      }
    }
  },

  /**
   * Process the incoming data.  Will create a new currently incoming
   * Packet if needed.  Tells the incoming Packet to read as much data
   * as it can, but reading may not complete.  The Packet signals that
   * its data is ready for delivery by calling one of this transport's
   * _on*Ready methods (see ./packets.js and the _on*Ready methods below).
   *
   * @returns {boolean}
   *     Whether incoming stream processing should continue for any
   *     remaining data.
   */
  _processIncoming(stream, count) {
    dumpv("Data available: " + count);

    if (!count) {
      dumpv("Nothing to read, skipping");
      return false;
    }

    try {
      if (!this._incoming) {
        dumpv("Creating a new packet from incoming");

        if (!this._readHeader(stream)) {
          // Not enough data to read packet type
          return false;
        }

        // Attempt to create a new Packet by trying to parse each possible
        // header pattern.
        this._incoming = lazy.Packet.fromHeader(this._incomingHeader, this);
        if (!this._incoming) {
          throw new Error(
            "No packet types for header: " + this._incomingHeader
          );
        }
      }

      if (!this._incoming.done) {
        // We have an incomplete packet, keep reading it.
        dumpv("Existing packet incomplete, keep reading");
        this._incoming.read(stream, this._scriptableInput);
      }
    } catch (e) {
      dump(`Error reading incoming packet: (${e} - ${e.stack})\n`);

      // Now in an invalid state, shut down the transport.
      this.close();
      return false;
    }

    if (!this._incoming.done) {
      // Still not complete, we'll wait for more data.
      dumpv("Packet not done, wait for more");
      return true;
    }

    // Ready for next packet
    this._flushIncoming();
    return true;
  },

  /**
   * Read as far as we can into the incoming data, attempting to build
   * up a complete packet header (which terminates with ":").  We'll only
   * read up to PACKET_HEADER_MAX characters.
   *
   * @returns {boolean}
   *     True if we now have a complete header.
   */
  _readHeader() {
    let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
    this._incomingHeader += lazy.StreamUtils.delimitedRead(
      this._scriptableInput,
      ":",
      amountToRead
    );
    if (flags.wantVerbose) {
      dumpv("Header read: " + this._incomingHeader);
    }

    if (this._incomingHeader.endsWith(":")) {
      if (flags.wantVerbose) {
        dumpv("Found packet header successfully: " + this._incomingHeader);
      }
      return true;
    }

    if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
      throw new Error("Failed to parse packet header!");
    }

    // Not enough data yet.
    return false;
  },

  /**
   * If the incoming packet is done, log it as needed and clear the buffer.
   */
  _flushIncoming() {
    if (!this._incoming.done) {
      return;
    }
    if (flags.wantLogging) {
      dumpv("Got: " + this._incoming);
    }
    this._destroyIncoming();
  },

  /**
   * Handler triggered by an incoming JSONPacket completing it's |read|
   * method.  Delivers the packet to this.hooks.onPacket.
   */
  _onJSONObjectReady(object) {
    lazy.executeSoon(() => {
      // Ensure the transport is still alive by the time this runs.
      if (this.active) {
        this.emit("packet", object);
        this.hooks.onPacket(object);
      }
    });
  },

  /**
   * Handler triggered by an incoming BulkPacket entering the |read|
   * phase for the stream portion of the packet.  Delivers info about the
   * incoming streaming data to this.hooks.onBulkPacket.  See the main
   * comment on the transport at the top of this file for more details.
   */
  _onBulkReadReady(...args) {
    lazy.executeSoon(() => {
      // Ensure the transport is still alive by the time this runs.
      if (this.active) {
        this.emit("bulkpacket", ...args);
        this.hooks.onBulkPacket(...args);
      }
    });
  },

  /**
   * Remove all handlers and references related to the current incoming
   * packet, either because it is now complete or because the transport
   * is closing.
   */
  _destroyIncoming() {
    if (this._incoming) {
      this._incoming.destroy();
    }
    this._incomingHeader = "";
    this._incoming = null;
  },
};