diff options
Diffstat (limited to 'remote/marionette/stream-utils.sys.mjs')
-rw-r--r-- | remote/marionette/stream-utils.sys.mjs | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/remote/marionette/stream-utils.sys.mjs b/remote/marionette/stream-utils.sys.mjs new file mode 100644 index 0000000000..9a4437966c --- /dev/null +++ b/remote/marionette/stream-utils.sys.mjs @@ -0,0 +1,256 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; + +const lazy = {}; + +ChromeUtils.defineESModuleGetters(lazy, { + EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs", +}); + +XPCOMUtils.defineLazyServiceGetter( + lazy, + "IOUtil", + "@mozilla.org/io-util;1", + "nsIIOUtil" +); + +XPCOMUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => { + return Components.Constructor( + "@mozilla.org/scriptableinputstream;1", + "nsIScriptableInputStream", + "init" + ); +}); + +const BUFFER_SIZE = 0x8000; + +/** + * This helper function (and its companion object) are used by bulk + * senders and receivers to read and write data in and out of other streams. + * Functions that make use of this tool are passed to callers when it is + * time to read or write bulk data. It is highly recommended to use these + * copier functions instead of the stream directly because the copier + * enforces the agreed upon length. Since bulk mode reuses an existing + * stream, the sender and receiver must write and read exactly the agreed + * upon amount of data, or else the entire transport will be left in a + * invalid state. Additionally, other methods of stream copying (such as + * NetUtil.asyncCopy) close the streams involved, which would terminate + * the debugging transport, and so it is avoided here. + * + * Overall, this *works*, but clearly the optimal solution would be + * able to just use the streams directly. If it were possible to fully + * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could + * be created to enforce the length and avoid closing, and consumers could + * use familiar stream utilities like NetUtil.asyncCopy. + * + * The function takes two async streams and copies a precise number + * of bytes from one to the other. Copying begins immediately, but may + * complete at some future time depending on data size. Use the returned + * promise to know when it's complete. + * + * @param {nsIAsyncInputStream} input + * Stream to copy from. + * @param {nsIAsyncOutputStream} output + * Stream to copy to. + * @param {number} length + * Amount of data that needs to be copied. + * + * @returns {Promise} + * Promise is resolved when copying completes or rejected if any + * (unexpected) errors occur. + */ +function copyStream(input, output, length) { + let copier = new StreamCopier(input, output, length); + return copier.copy(); +} + +/** @class */ +function StreamCopier(input, output, length) { + lazy.EventEmitter.decorate(this); + this._id = StreamCopier._nextId++; + this.input = input; + // Save off the base output stream, since we know it's async as we've + // required + this.baseAsyncOutput = output; + if (lazy.IOUtil.outputStreamIsBuffered(output)) { + this.output = output; + } else { + this.output = Cc[ + "@mozilla.org/network/buffered-output-stream;1" + ].createInstance(Ci.nsIBufferedOutputStream); + this.output.init(output, BUFFER_SIZE); + } + this._length = length; + this._amountLeft = length; + this._deferred = { + promise: new Promise((resolve, reject) => { + this._deferred.resolve = resolve; + this._deferred.reject = reject; + }), + }; + + this._copy = this._copy.bind(this); + this._flush = this._flush.bind(this); + this._destroy = this._destroy.bind(this); + + // Copy promise's then method up to this object. + // + // Allows the copier to offer a promise interface for the simple succeed + // or fail scenarios, but also emit events (due to the EventEmitter) + // for other states, like progress. + this.then = this._deferred.promise.then.bind(this._deferred.promise); + this.then(this._destroy, this._destroy); + + // Stream ready callback starts as |_copy|, but may switch to |_flush| + // at end if flushing would block the output stream. + this._streamReadyCallback = this._copy; +} +StreamCopier._nextId = 0; + +StreamCopier.prototype = { + copy() { + // Dispatch to the next tick so that it's possible to attach a progress + // event listener, even for extremely fast copies (like when testing). + Services.tm.currentThread.dispatch(() => { + try { + this._copy(); + } catch (e) { + this._deferred.reject(e); + } + }, 0); + return this; + }, + + _copy() { + let bytesAvailable = this.input.available(); + let amountToCopy = Math.min(bytesAvailable, this._amountLeft); + this._debug("Trying to copy: " + amountToCopy); + + let bytesCopied; + try { + bytesCopied = this.output.writeFrom(this.input, amountToCopy); + } catch (e) { + if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { + this._debug("Base stream would block, will retry"); + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } + throw e; + } + + this._amountLeft -= bytesCopied; + this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft); + this._emitProgress(); + + if (this._amountLeft === 0) { + this._debug("Copy done!"); + this._flush(); + return; + } + + this._debug("Waiting for input stream"); + this.input.asyncWait(this, 0, 0, Services.tm.currentThread); + }, + + _emitProgress() { + this.emit("progress", { + bytesSent: this._length - this._amountLeft, + totalBytes: this._length, + }); + }, + + _flush() { + try { + this.output.flush(); + } catch (e) { + if ( + e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK || + e.result == Cr.NS_ERROR_FAILURE + ) { + this._debug("Flush would block, will retry"); + this._streamReadyCallback = this._flush; + this._debug("Waiting for output stream"); + this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); + return; + } + throw e; + } + this._deferred.resolve(); + }, + + _destroy() { + this._destroy = null; + this._copy = null; + this._flush = null; + this.input = null; + this.output = null; + }, + + // nsIInputStreamCallback + onInputStreamReady() { + this._streamReadyCallback(); + }, + + // nsIOutputStreamCallback + onOutputStreamReady() { + this._streamReadyCallback(); + }, + + _debug() {}, +}; + +/** + * Read from a stream, one byte at a time, up to the next + * <var>delimiter</var> character, but stopping if we've read |count| + * without finding it. Reading also terminates early if there are less + * than <var>count</var> bytes available on the stream. In that case, + * we only read as many bytes as the stream currently has to offer. + * + * @param {nsIInputStream} stream + * Input stream to read from. + * @param {string} delimiter + * Character we're trying to find. + * @param {number} count + * Max number of characters to read while searching. + * + * @returns {string} + * Collected data. If the delimiter was found, this string will + * end with it. + */ +// TODO: This implementation could be removed if bug 984651 is fixed, +// which provides a native version of the same idea. +function delimitedRead(stream, delimiter, count) { + let scriptableStream; + if (stream instanceof Ci.nsIScriptableInputStream) { + scriptableStream = stream; + } else { + scriptableStream = new lazy.ScriptableInputStream(stream); + } + + let data = ""; + + // Don't exceed what's available on the stream + count = Math.min(count, stream.available()); + + if (count <= 0) { + return data; + } + + let char; + while (char !== delimiter && count > 0) { + char = scriptableStream.readBytes(1); + count--; + data += char; + } + + return data; +} + +export const StreamUtils = { + copyStream, + delimitedRead, +}; |