summaryrefslogtreecommitdiffstats
path: root/remote/marionette/stream-utils.sys.mjs
diff options
context:
space:
mode:
Diffstat (limited to 'remote/marionette/stream-utils.sys.mjs')
-rw-r--r--remote/marionette/stream-utils.sys.mjs256
1 files changed, 256 insertions, 0 deletions
diff --git a/remote/marionette/stream-utils.sys.mjs b/remote/marionette/stream-utils.sys.mjs
new file mode 100644
index 0000000000..9a4437966c
--- /dev/null
+++ b/remote/marionette/stream-utils.sys.mjs
@@ -0,0 +1,256 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
+
+const lazy = {};
+
+ChromeUtils.defineESModuleGetters(lazy, {
+ EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
+});
+
+XPCOMUtils.defineLazyServiceGetter(
+ lazy,
+ "IOUtil",
+ "@mozilla.org/io-util;1",
+ "nsIIOUtil"
+);
+
+XPCOMUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
+ return Components.Constructor(
+ "@mozilla.org/scriptableinputstream;1",
+ "nsIScriptableInputStream",
+ "init"
+ );
+});
+
+const BUFFER_SIZE = 0x8000;
+
+/**
+ * This helper function (and its companion object) are used by bulk
+ * senders and receivers to read and write data in and out of other streams.
+ * Functions that make use of this tool are passed to callers when it is
+ * time to read or write bulk data. It is highly recommended to use these
+ * copier functions instead of the stream directly because the copier
+ * enforces the agreed upon length. Since bulk mode reuses an existing
+ * stream, the sender and receiver must write and read exactly the agreed
+ * upon amount of data, or else the entire transport will be left in a
+ * invalid state. Additionally, other methods of stream copying (such as
+ * NetUtil.asyncCopy) close the streams involved, which would terminate
+ * the debugging transport, and so it is avoided here.
+ *
+ * Overall, this *works*, but clearly the optimal solution would be
+ * able to just use the streams directly. If it were possible to fully
+ * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
+ * be created to enforce the length and avoid closing, and consumers could
+ * use familiar stream utilities like NetUtil.asyncCopy.
+ *
+ * The function takes two async streams and copies a precise number
+ * of bytes from one to the other. Copying begins immediately, but may
+ * complete at some future time depending on data size. Use the returned
+ * promise to know when it's complete.
+ *
+ * @param {nsIAsyncInputStream} input
+ * Stream to copy from.
+ * @param {nsIAsyncOutputStream} output
+ * Stream to copy to.
+ * @param {number} length
+ * Amount of data that needs to be copied.
+ *
+ * @returns {Promise}
+ * Promise is resolved when copying completes or rejected if any
+ * (unexpected) errors occur.
+ */
+function copyStream(input, output, length) {
+ let copier = new StreamCopier(input, output, length);
+ return copier.copy();
+}
+
+/** @class */
+function StreamCopier(input, output, length) {
+ lazy.EventEmitter.decorate(this);
+ this._id = StreamCopier._nextId++;
+ this.input = input;
+ // Save off the base output stream, since we know it's async as we've
+ // required
+ this.baseAsyncOutput = output;
+ if (lazy.IOUtil.outputStreamIsBuffered(output)) {
+ this.output = output;
+ } else {
+ this.output = Cc[
+ "@mozilla.org/network/buffered-output-stream;1"
+ ].createInstance(Ci.nsIBufferedOutputStream);
+ this.output.init(output, BUFFER_SIZE);
+ }
+ this._length = length;
+ this._amountLeft = length;
+ this._deferred = {
+ promise: new Promise((resolve, reject) => {
+ this._deferred.resolve = resolve;
+ this._deferred.reject = reject;
+ }),
+ };
+
+ this._copy = this._copy.bind(this);
+ this._flush = this._flush.bind(this);
+ this._destroy = this._destroy.bind(this);
+
+ // Copy promise's then method up to this object.
+ //
+ // Allows the copier to offer a promise interface for the simple succeed
+ // or fail scenarios, but also emit events (due to the EventEmitter)
+ // for other states, like progress.
+ this.then = this._deferred.promise.then.bind(this._deferred.promise);
+ this.then(this._destroy, this._destroy);
+
+ // Stream ready callback starts as |_copy|, but may switch to |_flush|
+ // at end if flushing would block the output stream.
+ this._streamReadyCallback = this._copy;
+}
+StreamCopier._nextId = 0;
+
+StreamCopier.prototype = {
+ copy() {
+ // Dispatch to the next tick so that it's possible to attach a progress
+ // event listener, even for extremely fast copies (like when testing).
+ Services.tm.currentThread.dispatch(() => {
+ try {
+ this._copy();
+ } catch (e) {
+ this._deferred.reject(e);
+ }
+ }, 0);
+ return this;
+ },
+
+ _copy() {
+ let bytesAvailable = this.input.available();
+ let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
+ this._debug("Trying to copy: " + amountToCopy);
+
+ let bytesCopied;
+ try {
+ bytesCopied = this.output.writeFrom(this.input, amountToCopy);
+ } catch (e) {
+ if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
+ this._debug("Base stream would block, will retry");
+ this._debug("Waiting for output stream");
+ this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+ return;
+ }
+ throw e;
+ }
+
+ this._amountLeft -= bytesCopied;
+ this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
+ this._emitProgress();
+
+ if (this._amountLeft === 0) {
+ this._debug("Copy done!");
+ this._flush();
+ return;
+ }
+
+ this._debug("Waiting for input stream");
+ this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
+ },
+
+ _emitProgress() {
+ this.emit("progress", {
+ bytesSent: this._length - this._amountLeft,
+ totalBytes: this._length,
+ });
+ },
+
+ _flush() {
+ try {
+ this.output.flush();
+ } catch (e) {
+ if (
+ e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
+ e.result == Cr.NS_ERROR_FAILURE
+ ) {
+ this._debug("Flush would block, will retry");
+ this._streamReadyCallback = this._flush;
+ this._debug("Waiting for output stream");
+ this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
+ return;
+ }
+ throw e;
+ }
+ this._deferred.resolve();
+ },
+
+ _destroy() {
+ this._destroy = null;
+ this._copy = null;
+ this._flush = null;
+ this.input = null;
+ this.output = null;
+ },
+
+ // nsIInputStreamCallback
+ onInputStreamReady() {
+ this._streamReadyCallback();
+ },
+
+ // nsIOutputStreamCallback
+ onOutputStreamReady() {
+ this._streamReadyCallback();
+ },
+
+ _debug() {},
+};
+
+/**
+ * Read from a stream, one byte at a time, up to the next
+ * <var>delimiter</var> character, but stopping if we've read |count|
+ * without finding it. Reading also terminates early if there are less
+ * than <var>count</var> bytes available on the stream. In that case,
+ * we only read as many bytes as the stream currently has to offer.
+ *
+ * @param {nsIInputStream} stream
+ * Input stream to read from.
+ * @param {string} delimiter
+ * Character we're trying to find.
+ * @param {number} count
+ * Max number of characters to read while searching.
+ *
+ * @returns {string}
+ * Collected data. If the delimiter was found, this string will
+ * end with it.
+ */
+// TODO: This implementation could be removed if bug 984651 is fixed,
+// which provides a native version of the same idea.
+function delimitedRead(stream, delimiter, count) {
+ let scriptableStream;
+ if (stream instanceof Ci.nsIScriptableInputStream) {
+ scriptableStream = stream;
+ } else {
+ scriptableStream = new lazy.ScriptableInputStream(stream);
+ }
+
+ let data = "";
+
+ // Don't exceed what's available on the stream
+ count = Math.min(count, stream.available());
+
+ if (count <= 0) {
+ return data;
+ }
+
+ let char;
+ while (char !== delimiter && count > 0) {
+ char = scriptableStream.readBytes(1);
+ count--;
+ data += char;
+ }
+
+ return data;
+}
+
+export const StreamUtils = {
+ copyStream,
+ delimitedRead,
+};