495 lines
14 KiB
JavaScript
495 lines
14 KiB
JavaScript
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
const ArrayBufferInputStream = Components.Constructor(
|
|
"@mozilla.org/io/arraybuffer-input-stream;1",
|
|
"nsIArrayBufferInputStream"
|
|
);
|
|
const BinaryInputStream = Components.Constructor(
|
|
"@mozilla.org/binaryinputstream;1",
|
|
"nsIBinaryInputStream",
|
|
"setInputStream"
|
|
);
|
|
|
|
import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
|
|
|
|
const lazy = {};
|
|
|
|
XPCOMUtils.defineLazyServiceGetter(
|
|
lazy,
|
|
"gActivityDistributor",
|
|
"@mozilla.org/network/http-activity-distributor;1",
|
|
"nsIHttpActivityDistributor"
|
|
);
|
|
|
|
ChromeUtils.defineESModuleGetters(lazy, {
|
|
setTimeout: "resource://gre/modules/Timer.sys.mjs",
|
|
});
|
|
|
|
class NetworkThrottleListener {
|
|
#activities;
|
|
#offset;
|
|
#originalListener;
|
|
#pendingData;
|
|
#pendingException;
|
|
#queue;
|
|
#responseStarted;
|
|
|
|
/**
|
|
* Construct a new nsIStreamListener that buffers data and provides a
|
|
* method to notify another listener when data is available. This is
|
|
* used to throttle network data on a per-channel basis.
|
|
*
|
|
* After construction, @see setOriginalListener must be called on the
|
|
* new object.
|
|
*
|
|
* @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
|
|
* which status changes should be reported
|
|
*/
|
|
constructor(queue) {
|
|
this.#activities = {};
|
|
this.#offset = 0;
|
|
this.#pendingData = [];
|
|
this.#pendingException = null;
|
|
this.#queue = queue;
|
|
this.#responseStarted = false;
|
|
}
|
|
|
|
/**
|
|
* Set the original listener for this object. The original listener
|
|
* will receive requests from this object when the queue allows data
|
|
* through.
|
|
*
|
|
* @param {nsIStreamListener} originalListener the original listener
|
|
* for the channel, to which all requests will be sent
|
|
*/
|
|
setOriginalListener(originalListener) {
|
|
this.#originalListener = originalListener;
|
|
}
|
|
|
|
/**
|
|
* @see nsIStreamListener.onStartRequest.
|
|
*/
|
|
onStartRequest(request) {
|
|
this.#originalListener.onStartRequest(request);
|
|
this.#queue.start(this);
|
|
}
|
|
|
|
/**
|
|
* @see nsIStreamListener.onStopRequest.
|
|
*/
|
|
onStopRequest(request, statusCode) {
|
|
this.#pendingData.push({ request, statusCode });
|
|
this.#queue.dataAvailable(this);
|
|
}
|
|
|
|
/**
|
|
* @see nsIStreamListener.onDataAvailable.
|
|
*/
|
|
onDataAvailable(request, inputStream, offset, count) {
|
|
if (this.#pendingException) {
|
|
throw this.#pendingException;
|
|
}
|
|
|
|
const bin = new BinaryInputStream(inputStream);
|
|
const bytes = new ArrayBuffer(count);
|
|
bin.readArrayBuffer(count, bytes);
|
|
|
|
const stream = new ArrayBufferInputStream();
|
|
stream.setData(bytes, 0, count);
|
|
|
|
this.#pendingData.push({ request, stream, count });
|
|
this.#queue.dataAvailable(this);
|
|
}
|
|
|
|
/**
|
|
* Allow some buffered data from this object to be forwarded to this
|
|
* object's originalListener.
|
|
*
|
|
* @param {Number} bytesPermitted The maximum number of bytes
|
|
* permitted to be sent.
|
|
* @return {Object} an object of the form {length, done}, where
|
|
* |length| is the number of bytes actually forwarded, and
|
|
* |done| is a boolean indicating whether this particular
|
|
* request has been completed. (A NetworkThrottleListener
|
|
* may be queued multiple times, so this does not mean that
|
|
* all available data has been sent.)
|
|
*/
|
|
sendSomeData(bytesPermitted) {
|
|
if (this.#pendingData.length === 0) {
|
|
// Shouldn't happen.
|
|
return { length: 0, done: true };
|
|
}
|
|
|
|
const { request, stream, count, statusCode } = this.#pendingData[0];
|
|
|
|
if (statusCode !== undefined) {
|
|
this.#pendingData.shift();
|
|
this.#originalListener.onStopRequest(request, statusCode);
|
|
return { length: 0, done: true };
|
|
}
|
|
|
|
if (bytesPermitted > count) {
|
|
bytesPermitted = count;
|
|
}
|
|
|
|
try {
|
|
this.#originalListener.onDataAvailable(
|
|
request,
|
|
stream,
|
|
this.#offset,
|
|
bytesPermitted
|
|
);
|
|
} catch (e) {
|
|
this.#pendingException = e;
|
|
}
|
|
|
|
let done = false;
|
|
if (bytesPermitted === count) {
|
|
this.#pendingData.shift();
|
|
done = true;
|
|
} else {
|
|
this.#pendingData[0].count -= bytesPermitted;
|
|
}
|
|
|
|
this.#offset += bytesPermitted;
|
|
// Maybe our state has changed enough to emit an event.
|
|
this.#maybeEmitEvents();
|
|
|
|
return { length: bytesPermitted, done };
|
|
}
|
|
|
|
/**
|
|
* Return the number of pending data requests available for this
|
|
* listener.
|
|
*/
|
|
pendingCount() {
|
|
return this.#pendingData.length;
|
|
}
|
|
|
|
/**
|
|
* This is called when an http activity event is delivered. This
|
|
* object delays the event until the appropriate moment.
|
|
*/
|
|
addActivityCallback(
|
|
callback,
|
|
httpActivity,
|
|
channel,
|
|
activityType,
|
|
activitySubtype,
|
|
timestamp,
|
|
extraSizeData,
|
|
extraStringData
|
|
) {
|
|
const datum = {
|
|
callback,
|
|
httpActivity,
|
|
channel,
|
|
activityType,
|
|
activitySubtype,
|
|
extraSizeData,
|
|
extraStringData,
|
|
};
|
|
this.#activities[activitySubtype] = datum;
|
|
|
|
if (
|
|
activitySubtype ===
|
|
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
|
|
) {
|
|
this.totalSize = extraSizeData;
|
|
}
|
|
|
|
this.#maybeEmitEvents();
|
|
}
|
|
|
|
/**
|
|
* This is called for a download throttler when the latency timeout
|
|
* has ended.
|
|
*/
|
|
responseStart() {
|
|
this.#responseStarted = true;
|
|
this.#maybeEmitEvents();
|
|
}
|
|
|
|
/**
|
|
* Check our internal state and emit any http activity events as
|
|
* needed. Note that we wait until both our internal state has
|
|
* changed and we've received the real http activity event from
|
|
* platform. This approach ensures we can both pass on the correct
|
|
* data from the original event, and update the reported time to be
|
|
* consistent with the delay we're introducing.
|
|
*/
|
|
#maybeEmitEvents() {
|
|
if (this.#responseStarted) {
|
|
this.#maybeEmit(
|
|
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START
|
|
);
|
|
this.#maybeEmit(
|
|
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER
|
|
);
|
|
}
|
|
|
|
if (this.totalSize !== undefined && this.#offset >= this.totalSize) {
|
|
this.#maybeEmit(
|
|
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
|
|
);
|
|
this.#maybeEmit(
|
|
lazy.gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Emit an event for |code|, if the appropriate entry in
|
|
* |activities| is defined.
|
|
*/
|
|
#maybeEmit(code) {
|
|
if (this.#activities[code] !== undefined) {
|
|
const {
|
|
callback,
|
|
httpActivity,
|
|
channel,
|
|
activityType,
|
|
activitySubtype,
|
|
extraSizeData,
|
|
extraStringData,
|
|
} = this.#activities[code];
|
|
const now = Date.now() * 1000;
|
|
callback(
|
|
httpActivity,
|
|
channel,
|
|
activityType,
|
|
activitySubtype,
|
|
now,
|
|
extraSizeData,
|
|
extraStringData
|
|
);
|
|
this.#activities[code] = undefined;
|
|
}
|
|
}
|
|
|
|
QueryInterface = ChromeUtils.generateQI([
|
|
"nsIStreamListener",
|
|
"nsIInterfaceRequestor",
|
|
]);
|
|
}
|
|
|
|
class NetworkThrottleQueue {
|
|
#downloadQueue;
|
|
#latencyMax;
|
|
#latencyMean;
|
|
#maxBPS;
|
|
#meanBPS;
|
|
#pendingRequests;
|
|
#previousReads;
|
|
#pumping;
|
|
|
|
/**
|
|
* Construct a new queue that can be used to throttle the network for
|
|
* a group of related network requests.
|
|
*
|
|
* meanBPS {Number} Mean bytes per second.
|
|
* maxBPS {Number} Maximum bytes per second.
|
|
* latencyMean {Number} Mean latency in milliseconds.
|
|
* latencyMax {Number} Maximum latency in milliseconds.
|
|
*/
|
|
constructor(meanBPS, maxBPS, latencyMean, latencyMax) {
|
|
this.#meanBPS = meanBPS;
|
|
this.#maxBPS = maxBPS;
|
|
this.#latencyMean = latencyMean;
|
|
this.#latencyMax = latencyMax;
|
|
|
|
this.#pendingRequests = new Set();
|
|
this.#downloadQueue = [];
|
|
this.#previousReads = [];
|
|
|
|
this.#pumping = false;
|
|
}
|
|
|
|
/**
|
|
* A helper function that lets the indicating listener start sending
|
|
* data. This is called after the initial round trip time for the
|
|
* listener has elapsed.
|
|
*/
|
|
#allowDataFrom(throttleListener) {
|
|
throttleListener.responseStart();
|
|
this.#pendingRequests.delete(throttleListener);
|
|
const count = throttleListener.pendingCount();
|
|
for (let i = 0; i < count; ++i) {
|
|
this.#downloadQueue.push(throttleListener);
|
|
}
|
|
this.#pump();
|
|
}
|
|
|
|
/**
|
|
* An internal function that permits individual listeners to send
|
|
* data.
|
|
*/
|
|
#pump() {
|
|
// A redirect will cause two NetworkThrottleListeners to be on a
|
|
// listener chain. In this case, we might recursively call into
|
|
// this method. Avoid infinite recursion here.
|
|
if (this.#pumping) {
|
|
return;
|
|
}
|
|
this.#pumping = true;
|
|
|
|
const now = Date.now();
|
|
const oneSecondAgo = now - 1000;
|
|
|
|
while (
|
|
this.#previousReads.length &&
|
|
this.#previousReads[0].when < oneSecondAgo
|
|
) {
|
|
this.#previousReads.shift();
|
|
}
|
|
|
|
const totalBytes = this.#previousReads.reduce((sum, elt) => {
|
|
return sum + elt.numBytes;
|
|
}, 0);
|
|
|
|
let thisSliceBytes = this.#random(this.#meanBPS, this.#maxBPS);
|
|
if (totalBytes < thisSliceBytes) {
|
|
thisSliceBytes -= totalBytes;
|
|
let readThisTime = 0;
|
|
while (thisSliceBytes > 0 && this.#downloadQueue.length) {
|
|
const { length, done } =
|
|
this.#downloadQueue[0].sendSomeData(thisSliceBytes);
|
|
thisSliceBytes -= length;
|
|
readThisTime += length;
|
|
if (done) {
|
|
this.#downloadQueue.shift();
|
|
}
|
|
}
|
|
this.#previousReads.push({ when: now, numBytes: readThisTime });
|
|
}
|
|
|
|
// If there is more data to download, then schedule ourselves for
|
|
// one second after the oldest previous read.
|
|
if (this.#downloadQueue.length) {
|
|
const when = this.#previousReads[0].when + 1000;
|
|
lazy.setTimeout(this.#pump.bind(this), when - now);
|
|
}
|
|
|
|
this.#pumping = false;
|
|
}
|
|
|
|
/**
|
|
* A helper function that, given a mean and a maximum, returns a
|
|
* random integer between (mean - (max - mean)) and max.
|
|
*/
|
|
#random(mean, max) {
|
|
return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
|
|
}
|
|
|
|
/**
|
|
* Notice a new listener object. This is called by the
|
|
* NetworkThrottleListener when the request has started. Initially
|
|
* a new listener object is put into a "pending" state, until the
|
|
* round-trip time has elapsed. This is used to simulate latency.
|
|
*
|
|
* @param {NetworkThrottleListener} throttleListener the new listener
|
|
*/
|
|
start(throttleListener) {
|
|
this.#pendingRequests.add(throttleListener);
|
|
const delay = this.#random(this.#latencyMean, this.#latencyMax);
|
|
if (delay > 0) {
|
|
lazy.setTimeout(() => this.#allowDataFrom(throttleListener), delay);
|
|
} else {
|
|
this.#allowDataFrom(throttleListener);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Note that new data is available for a given listener. Each time
|
|
* data is available, the listener will be re-queued.
|
|
*
|
|
* @param {NetworkThrottleListener} throttleListener the listener
|
|
* which has data available.
|
|
*/
|
|
dataAvailable(throttleListener) {
|
|
if (!this.#pendingRequests.has(throttleListener)) {
|
|
this.#downloadQueue.push(throttleListener);
|
|
this.#pump();
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Construct a new object that can be used to throttle the network for
|
|
* a group of related network requests.
|
|
*
|
|
* @param {Object} An object with the following attributes:
|
|
* latencyMean {Number} Mean latency in milliseconds.
|
|
* latencyMax {Number} Maximum latency in milliseconds.
|
|
* downloadBPSMean {Number} Mean bytes per second for downloads.
|
|
* downloadBPSMax {Number} Maximum bytes per second for downloads.
|
|
* uploadBPSMean {Number} Mean bytes per second for uploads.
|
|
* uploadBPSMax {Number} Maximum bytes per second for uploads.
|
|
*
|
|
* Download throttling will not be done if downloadBPSMean and
|
|
* downloadBPSMax are <= 0. Upload throttling will not be done if
|
|
* uploadBPSMean and uploadBPSMax are <= 0.
|
|
*/
|
|
export class NetworkThrottleManager {
|
|
#downloadQueue;
|
|
|
|
constructor({
|
|
latencyMean,
|
|
latencyMax,
|
|
downloadBPSMean,
|
|
downloadBPSMax,
|
|
uploadBPSMean,
|
|
uploadBPSMax,
|
|
}) {
|
|
if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
|
|
this.#downloadQueue = null;
|
|
} else {
|
|
this.#downloadQueue = new NetworkThrottleQueue(
|
|
downloadBPSMean,
|
|
downloadBPSMax,
|
|
latencyMean,
|
|
latencyMax
|
|
);
|
|
}
|
|
if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
|
|
this.uploadQueue = null;
|
|
} else {
|
|
this.uploadQueue = Cc[
|
|
"@mozilla.org/network/throttlequeue;1"
|
|
].createInstance(Ci.nsIInputChannelThrottleQueue);
|
|
this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a new NetworkThrottleListener for a given channel and
|
|
* install it using |setNewListener|.
|
|
*
|
|
* @param {nsITraceableChannel} channel the channel to manage
|
|
* @return {NetworkThrottleListener} the new listener, or null if
|
|
* download throttling is not being done.
|
|
*/
|
|
manage(channel) {
|
|
if (this.#downloadQueue) {
|
|
const listener = new NetworkThrottleListener(this.#downloadQueue);
|
|
const originalListener = channel.setNewListener(listener);
|
|
listener.setOriginalListener(originalListener);
|
|
return listener;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Throttle uploads taking place on the given channel.
|
|
*
|
|
* @param {nsITraceableChannel} channel the channel to manage
|
|
*/
|
|
manageUpload(channel) {
|
|
if (this.uploadQueue) {
|
|
channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
|
|
channel.throttleQueue = this.uploadQueue;
|
|
}
|
|
}
|
|
}
|