summaryrefslogtreecommitdiffstats
path: root/devtools/shared/network-observer/NetworkThrottleManager.sys.mjs
diff options
context:
space:
mode:
Diffstat (limited to 'devtools/shared/network-observer/NetworkThrottleManager.sys.mjs')
-rw-r--r--devtools/shared/network-observer/NetworkThrottleManager.sys.mjs496
1 files changed, 496 insertions, 0 deletions
diff --git a/devtools/shared/network-observer/NetworkThrottleManager.sys.mjs b/devtools/shared/network-observer/NetworkThrottleManager.sys.mjs
new file mode 100644
index 0000000000..3845ded8f7
--- /dev/null
+++ b/devtools/shared/network-observer/NetworkThrottleManager.sys.mjs
@@ -0,0 +1,496 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+const ArrayBufferInputStream = Components.Constructor(
+ "@mozilla.org/io/arraybuffer-input-stream;1",
+ "nsIArrayBufferInputStream"
+);
+const BinaryInputStream = Components.Constructor(
+ "@mozilla.org/binaryinputstream;1",
+ "nsIBinaryInputStream",
+ "setInputStream"
+);
+
+import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
+
+const lazy = {};
+
+XPCOMUtils.defineLazyServiceGetter(
+ lazy,
+ "gActivityDistributor",
+ "@mozilla.org/network/http-activity-distributor;1",
+ "nsIHttpActivityDistributor"
+);
+
+ChromeUtils.defineESModuleGetters(lazy, {
+ setTimeout: "resource://gre/modules/Timer.sys.mjs",
+});
+
+class NetworkThrottleListener {
+ #activities;
+ #offset;
+ #originalListener;
+ #pendingData;
+ #pendingException;
+ #queue;
+ #responseStarted;
+
+ /**
+ * Construct a new nsIStreamListener that buffers data and provides a
+ * method to notify another listener when data is available. This is
+ * used to throttle network data on a per-channel basis.
+ *
+ * After construction, @see setOriginalListener must be called on the
+ * new object.
+ *
+ * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
+ * which status changes should be reported
+ */
+ constructor(queue) {
+ this.#activities = {};
+ this.#offset = 0;
+ this.#pendingData = [];
+ this.#pendingException = null;
+ this.#queue = queue;
+ this.#responseStarted = false;
+ }
+
+ /**
+ * Set the original listener for this object. The original listener
+ * will receive requests from this object when the queue allows data
+ * through.
+ *
+ * @param {nsIStreamListener} originalListener the original listener
+ * for the channel, to which all requests will be sent
+ */
+ setOriginalListener(originalListener) {
+ this.#originalListener = originalListener;
+ }
+
+ /**
+ * @see nsIStreamListener.onStartRequest.
+ */
+ onStartRequest(request) {
+ this.#originalListener.onStartRequest(request);
+ this.#queue.start(this);
+ }
+
+ /**
+ * @see nsIStreamListener.onStopRequest.
+ */
+ onStopRequest(request, statusCode) {
+ this.#pendingData.push({ request, statusCode });
+ this.#queue.dataAvailable(this);
+ }
+
+ /**
+ * @see nsIStreamListener.onDataAvailable.
+ */
+ onDataAvailable(request, inputStream, offset, count) {
+ if (this.#pendingException) {
+ throw this.#pendingException;
+ }
+
+ const bin = new BinaryInputStream(inputStream);
+ const bytes = new ArrayBuffer(count);
+ bin.readArrayBuffer(count, bytes);
+
+ const stream = new ArrayBufferInputStream();
+ stream.setData(bytes, 0, count);
+
+ this.#pendingData.push({ request, stream, count });
+ this.#queue.dataAvailable(this);
+ }
+
+ /**
+ * Allow some buffered data from this object to be forwarded to this
+ * object's originalListener.
+ *
+ * @param {Number} bytesPermitted The maximum number of bytes
+ * permitted to be sent.
+ * @return {Object} an object of the form {length, done}, where
+ * |length| is the number of bytes actually forwarded, and
+ * |done| is a boolean indicating whether this particular
+ * request has been completed. (A NetworkThrottleListener
+ * may be queued multiple times, so this does not mean that
+ * all available data has been sent.)
+ */
+ sendSomeData(bytesPermitted) {
+ if (this.#pendingData.length === 0) {
+ // Shouldn't happen.
+ return { length: 0, done: true };
+ }
+
+ const { request, stream, count, statusCode } = this.#pendingData[0];
+
+ if (statusCode !== undefined) {
+ this.#pendingData.shift();
+ this.#originalListener.onStopRequest(request, statusCode);
+ return { length: 0, done: true };
+ }
+
+ if (bytesPermitted > count) {
+ bytesPermitted = count;
+ }
+
+ try {
+ this.#originalListener.onDataAvailable(
+ request,
+ stream,
+ this.#offset,
+ bytesPermitted
+ );
+ } catch (e) {
+ this.#pendingException = e;
+ }
+
+ let done = false;
+ if (bytesPermitted === count) {
+ this.#pendingData.shift();
+ done = true;
+ } else {
+ this.#pendingData[0].count -= bytesPermitted;
+ }
+
+ this.#offset += bytesPermitted;
+ // Maybe our state has changed enough to emit an event.
+ this.#maybeEmitEvents();
+
+ return { length: bytesPermitted, done };
+ }
+
+ /**
+ * Return the number of pending data requests available for this
+ * listener.
+ */
+ pendingCount() {
+ return this.#pendingData.length;
+ }
+
+ /**
+ * This is called when an http activity event is delivered. This
+ * object delays the event until the appropriate moment.
+ */
+ addActivityCallback(
+ callback,
+ httpActivity,
+ channel,
+ activityType,
+ activitySubtype,
+ timestamp,
+ extraSizeData,
+ extraStringData
+ ) {
+ const datum = {
+ callback,
+ httpActivity,
+ channel,
+ activityType,
+ activitySubtype,
+ extraSizeData,
+ extraStringData,
+ };
+ this.#activities[activitySubtype] = datum;
+
+ if (
+ activitySubtype ===
+ lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
+ ) {
+ this.totalSize = extraSizeData;
+ }
+
+ this.#maybeEmitEvents();
+ }
+
+ /**
+ * This is called for a download throttler when the latency timeout
+ * has ended.
+ */
+ responseStart() {
+ this.#responseStarted = true;
+ this.#maybeEmitEvents();
+ }
+
+ /**
+ * Check our internal state and emit any http activity events as
+ * needed. Note that we wait until both our internal state has
+ * changed and we've received the real http activity event from
+ * platform. This approach ensures we can both pass on the correct
+ * data from the original event, and update the reported time to be
+ * consistent with the delay we're introducing.
+ */
+ #maybeEmitEvents() {
+ if (this.#responseStarted) {
+ this.#maybeEmit(
+ lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START
+ );
+ this.#maybeEmit(
+ lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER
+ );
+ }
+
+ if (this.totalSize !== undefined && this.#offset >= this.totalSize) {
+ this.#maybeEmit(
+ lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
+ );
+ this.#maybeEmit(
+ lazy.gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE
+ );
+ }
+ }
+
+ /**
+ * Emit an event for |code|, if the appropriate entry in
+ * |activities| is defined.
+ */
+ #maybeEmit(code) {
+ if (this.#activities[code] !== undefined) {
+ const {
+ callback,
+ httpActivity,
+ channel,
+ activityType,
+ activitySubtype,
+ extraSizeData,
+ extraStringData,
+ } = this.#activities[code];
+ const now = Date.now() * 1000;
+ callback(
+ httpActivity,
+ channel,
+ activityType,
+ activitySubtype,
+ now,
+ extraSizeData,
+ extraStringData
+ );
+ this.#activities[code] = undefined;
+ }
+ }
+
+ QueryInterface = ChromeUtils.generateQI([
+ "nsIStreamListener",
+ "nsIInterfaceRequestor",
+ ]);
+}
+
+class NetworkThrottleQueue {
+ #downloadQueue;
+ #latencyMax;
+ #latencyMean;
+ #maxBPS;
+ #meanBPS;
+ #pendingRequests;
+ #previousReads;
+ #pumping;
+
+ /**
+ * Construct a new queue that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * meanBPS {Number} Mean bytes per second.
+ * maxBPS {Number} Maximum bytes per second.
+ * latencyMean {Number} Mean latency in milliseconds.
+ * latencyMax {Number} Maximum latency in milliseconds.
+ */
+ constructor(meanBPS, maxBPS, latencyMean, latencyMax) {
+ this.#meanBPS = meanBPS;
+ this.#maxBPS = maxBPS;
+ this.#latencyMean = latencyMean;
+ this.#latencyMax = latencyMax;
+
+ this.#pendingRequests = new Set();
+ this.#downloadQueue = [];
+ this.#previousReads = [];
+
+ this.#pumping = false;
+ }
+
+ /**
+ * A helper function that lets the indicating listener start sending
+ * data. This is called after the initial round trip time for the
+ * listener has elapsed.
+ */
+ #allowDataFrom(throttleListener) {
+ throttleListener.responseStart();
+ this.#pendingRequests.delete(throttleListener);
+ const count = throttleListener.pendingCount();
+ for (let i = 0; i < count; ++i) {
+ this.#downloadQueue.push(throttleListener);
+ }
+ this.#pump();
+ }
+
+ /**
+ * An internal function that permits individual listeners to send
+ * data.
+ */
+ #pump() {
+ // A redirect will cause two NetworkThrottleListeners to be on a
+ // listener chain. In this case, we might recursively call into
+ // this method. Avoid infinite recursion here.
+ if (this.#pumping) {
+ return;
+ }
+ this.#pumping = true;
+
+ const now = Date.now();
+ const oneSecondAgo = now - 1000;
+
+ while (
+ this.#previousReads.length &&
+ this.#previousReads[0].when < oneSecondAgo
+ ) {
+ this.#previousReads.shift();
+ }
+
+ const totalBytes = this.#previousReads.reduce((sum, elt) => {
+ return sum + elt.numBytes;
+ }, 0);
+
+ let thisSliceBytes = this.#random(this.#meanBPS, this.#maxBPS);
+ if (totalBytes < thisSliceBytes) {
+ thisSliceBytes -= totalBytes;
+ let readThisTime = 0;
+ while (thisSliceBytes > 0 && this.#downloadQueue.length) {
+ const { length, done } = this.#downloadQueue[0].sendSomeData(
+ thisSliceBytes
+ );
+ thisSliceBytes -= length;
+ readThisTime += length;
+ if (done) {
+ this.#downloadQueue.shift();
+ }
+ }
+ this.#previousReads.push({ when: now, numBytes: readThisTime });
+ }
+
+ // If there is more data to download, then schedule ourselves for
+ // one second after the oldest previous read.
+ if (this.#downloadQueue.length) {
+ const when = this.#previousReads[0].when + 1000;
+ lazy.setTimeout(this.#pump.bind(this), when - now);
+ }
+
+ this.#pumping = false;
+ }
+
+ /**
+ * A helper function that, given a mean and a maximum, returns a
+ * random integer between (mean - (max - mean)) and max.
+ */
+ #random(mean, max) {
+ return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
+ }
+
+ /**
+ * Notice a new listener object. This is called by the
+ * NetworkThrottleListener when the request has started. Initially
+ * a new listener object is put into a "pending" state, until the
+ * round-trip time has elapsed. This is used to simulate latency.
+ *
+ * @param {NetworkThrottleListener} throttleListener the new listener
+ */
+ start(throttleListener) {
+ this.#pendingRequests.add(throttleListener);
+ const delay = this.#random(this.#latencyMean, this.#latencyMax);
+ if (delay > 0) {
+ lazy.setTimeout(() => this.#allowDataFrom(throttleListener), delay);
+ } else {
+ this.#allowDataFrom(throttleListener);
+ }
+ }
+
+ /**
+ * Note that new data is available for a given listener. Each time
+ * data is available, the listener will be re-queued.
+ *
+ * @param {NetworkThrottleListener} throttleListener the listener
+ * which has data available.
+ */
+ dataAvailable(throttleListener) {
+ if (!this.#pendingRequests.has(throttleListener)) {
+ this.#downloadQueue.push(throttleListener);
+ this.#pump();
+ }
+ }
+}
+
+/**
+ * Construct a new object that can be used to throttle the network for
+ * a group of related network requests.
+ *
+ * @param {Object} An object with the following attributes:
+ * latencyMean {Number} Mean latency in milliseconds.
+ * latencyMax {Number} Maximum latency in milliseconds.
+ * downloadBPSMean {Number} Mean bytes per second for downloads.
+ * downloadBPSMax {Number} Maximum bytes per second for downloads.
+ * uploadBPSMean {Number} Mean bytes per second for uploads.
+ * uploadBPSMax {Number} Maximum bytes per second for uploads.
+ *
+ * Download throttling will not be done if downloadBPSMean and
+ * downloadBPSMax are <= 0. Upload throttling will not be done if
+ * uploadBPSMean and uploadBPSMax are <= 0.
+ */
+export class NetworkThrottleManager {
+ #downloadQueue;
+
+ constructor({
+ latencyMean,
+ latencyMax,
+ downloadBPSMean,
+ downloadBPSMax,
+ uploadBPSMean,
+ uploadBPSMax,
+ }) {
+ if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
+ this.#downloadQueue = null;
+ } else {
+ this.#downloadQueue = new NetworkThrottleQueue(
+ downloadBPSMean,
+ downloadBPSMax,
+ latencyMean,
+ latencyMax
+ );
+ }
+ if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
+ this.uploadQueue = null;
+ } else {
+ this.uploadQueue = Cc[
+ "@mozilla.org/network/throttlequeue;1"
+ ].createInstance(Ci.nsIInputChannelThrottleQueue);
+ this.uploadQueue.init(uploadBPSMean, uploadBPSMax);
+ }
+ }
+
+ /**
+ * Create a new NetworkThrottleListener for a given channel and
+ * install it using |setNewListener|.
+ *
+ * @param {nsITraceableChannel} channel the channel to manage
+ * @return {NetworkThrottleListener} the new listener, or null if
+ * download throttling is not being done.
+ */
+ manage(channel) {
+ if (this.#downloadQueue) {
+ const listener = new NetworkThrottleListener(this.#downloadQueue);
+ const originalListener = channel.setNewListener(listener);
+ listener.setOriginalListener(originalListener);
+ return listener;
+ }
+ return null;
+ }
+
+ /**
+ * Throttle uploads taking place on the given channel.
+ *
+ * @param {nsITraceableChannel} channel the channel to manage
+ */
+ manageUpload(channel) {
+ if (this.uploadQueue) {
+ channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
+ channel.throttleQueue = this.uploadQueue;
+ }
+ }
+}