/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
const lazy = {};
ChromeUtils.defineESModuleGetters(lazy, {
EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
});
XPCOMUtils.defineLazyServiceGetter(
lazy,
"IOUtil",
"@mozilla.org/io-util;1",
"nsIIOUtil"
);
XPCOMUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
return Components.Constructor(
"@mozilla.org/scriptableinputstream;1",
"nsIScriptableInputStream",
"init"
);
});
const BUFFER_SIZE = 0x8000;
/**
* This helper function (and its companion object) are used by bulk
* senders and receivers to read and write data in and out of other streams.
* Functions that make use of this tool are passed to callers when it is
* time to read or write bulk data. It is highly recommended to use these
* copier functions instead of the stream directly because the copier
* enforces the agreed upon length. Since bulk mode reuses an existing
* stream, the sender and receiver must write and read exactly the agreed
* upon amount of data, or else the entire transport will be left in a
* invalid state. Additionally, other methods of stream copying (such as
* NetUtil.asyncCopy) close the streams involved, which would terminate
* the debugging transport, and so it is avoided here.
*
* Overall, this *works*, but clearly the optimal solution would be
* able to just use the streams directly. If it were possible to fully
* implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
* be created to enforce the length and avoid closing, and consumers could
* use familiar stream utilities like NetUtil.asyncCopy.
*
* The function takes two async streams and copies a precise number
* of bytes from one to the other. Copying begins immediately, but may
* complete at some future time depending on data size. Use the returned
* promise to know when it's complete.
*
* @param {nsIAsyncInputStream} input
* Stream to copy from.
* @param {nsIAsyncOutputStream} output
* Stream to copy to.
* @param {number} length
* Amount of data that needs to be copied.
*
* @return {Promise}
* Promise is resolved when copying completes or rejected if any
* (unexpected) errors occur.
*/
function copyStream(input, output, length) {
let copier = new StreamCopier(input, output, length);
return copier.copy();
}
/** @class */
function StreamCopier(input, output, length) {
lazy.EventEmitter.decorate(this);
this._id = StreamCopier._nextId++;
this.input = input;
// Save off the base output stream, since we know it's async as we've
// required
this.baseAsyncOutput = output;
if (lazy.IOUtil.outputStreamIsBuffered(output)) {
this.output = output;
} else {
this.output = Cc[
"@mozilla.org/network/buffered-output-stream;1"
].createInstance(Ci.nsIBufferedOutputStream);
this.output.init(output, BUFFER_SIZE);
}
this._length = length;
this._amountLeft = length;
this._deferred = {
promise: new Promise((resolve, reject) => {
this._deferred.resolve = resolve;
this._deferred.reject = reject;
}),
};
this._copy = this._copy.bind(this);
this._flush = this._flush.bind(this);
this._destroy = this._destroy.bind(this);
// Copy promise's then method up to this object.
//
// Allows the copier to offer a promise interface for the simple succeed
// or fail scenarios, but also emit events (due to the EventEmitter)
// for other states, like progress.
this.then = this._deferred.promise.then.bind(this._deferred.promise);
this.then(this._destroy, this._destroy);
// Stream ready callback starts as |_copy|, but may switch to |_flush|
// at end if flushing would block the output stream.
this._streamReadyCallback = this._copy;
}
StreamCopier._nextId = 0;
StreamCopier.prototype = {
copy() {
// Dispatch to the next tick so that it's possible to attach a progress
// event listener, even for extremely fast copies (like when testing).
Services.tm.currentThread.dispatch(() => {
try {
this._copy();
} catch (e) {
this._deferred.reject(e);
}
}, 0);
return this;
},
_copy() {
let bytesAvailable = this.input.available();
let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
this._debug("Trying to copy: " + amountToCopy);
let bytesCopied;
try {
bytesCopied = this.output.writeFrom(this.input, amountToCopy);
} catch (e) {
if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
this._debug("Base stream would block, will retry");
this._debug("Waiting for output stream");
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
return;
}
throw e;
}
this._amountLeft -= bytesCopied;
this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
this._emitProgress();
if (this._amountLeft === 0) {
this._debug("Copy done!");
this._flush();
return;
}
this._debug("Waiting for input stream");
this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
},
_emitProgress() {
this.emit("progress", {
bytesSent: this._length - this._amountLeft,
totalBytes: this._length,
});
},
_flush() {
try {
this.output.flush();
} catch (e) {
if (
e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
e.result == Cr.NS_ERROR_FAILURE
) {
this._debug("Flush would block, will retry");
this._streamReadyCallback = this._flush;
this._debug("Waiting for output stream");
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
return;
}
throw e;
}
this._deferred.resolve();
},
_destroy() {
this._destroy = null;
this._copy = null;
this._flush = null;
this.input = null;
this.output = null;
},
// nsIInputStreamCallback
onInputStreamReady() {
this._streamReadyCallback();
},
// nsIOutputStreamCallback
onOutputStreamReady() {
this._streamReadyCallback();
},
_debug() {},
};
/**
* Read from a stream, one byte at a time, up to the next
* delimiter character, but stopping if we've read |count|
* without finding it. Reading also terminates early if there are less
* than count bytes available on the stream. In that case,
* we only read as many bytes as the stream currently has to offer.
*
* @param {nsIInputStream} stream
* Input stream to read from.
* @param {string} delimiter
* Character we're trying to find.
* @param {number} count
* Max number of characters to read while searching.
*
* @return {string}
* Collected data. If the delimiter was found, this string will
* end with it.
*/
// TODO: This implementation could be removed if bug 984651 is fixed,
// which provides a native version of the same idea.
function delimitedRead(stream, delimiter, count) {
let scriptableStream;
if (stream instanceof Ci.nsIScriptableInputStream) {
scriptableStream = stream;
} else {
scriptableStream = new lazy.ScriptableInputStream(stream);
}
let data = "";
// Don't exceed what's available on the stream
count = Math.min(count, stream.available());
if (count <= 0) {
return data;
}
let char;
while (char !== delimiter && count > 0) {
char = scriptableStream.readBytes(1);
count--;
data += char;
}
return data;
}
export const StreamUtils = {
copyStream,
delimitedRead,
};