summaryrefslogtreecommitdiffstats
path: root/comm/calendar/base/src/CalReadableStreamFactory.jsm
diff options
context:
space:
mode:
Diffstat (limited to 'comm/calendar/base/src/CalReadableStreamFactory.jsm')
-rw-r--r--comm/calendar/base/src/CalReadableStreamFactory.jsm314
1 files changed, 314 insertions, 0 deletions
diff --git a/comm/calendar/base/src/CalReadableStreamFactory.jsm b/comm/calendar/base/src/CalReadableStreamFactory.jsm
new file mode 100644
index 0000000000..c41d20dc8f
--- /dev/null
+++ b/comm/calendar/base/src/CalReadableStreamFactory.jsm
@@ -0,0 +1,314 @@
+/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+
+/* global ReadableStream */
+
+const EXPORTED_SYMBOLS = ["CalReadableStreamFactory"];
+
+var { cal } = ChromeUtils.import("resource:///modules/calendar/calUtils.jsm");
+
+/**
+ * Function used to transform each value received from a stream.
+ *
+ * @callback MapStreamFunction
+ * @param {any} value
+ * @returns {Promise<any>|any}
+ */
+
+/**
+ * A version of UnderlyingSource that accepts a CalBoundedReadableStreamController
+ * as the controller argument.
+ *
+ * @typedef {object} CalBoundedReadableStreamUnderlyingSource
+ */
+
+/**
+ * Wrapper class for a ReadableStreamDefaultController that keeps track of how
+ * many items have been added to the queue before closing. This controller also
+ * buffers items to reduce the amount of times items are added to the queue.
+ */
+class CalBoundedReadableStreamController {
+ /**
+ * @type {ReadableStreamDefaultController}
+ */
+ _controller = null;
+
+ /**
+ * @type {CalBoundedReadableStreamUnderlyingSource}
+ */
+ _src = null;
+
+ /**
+ * @type {number}
+ */
+ _maxTotalItems;
+
+ /**
+ * @type {number}
+ */
+ _maxQueuedItems;
+
+ /**
+ * @type {calIItemBase[]}
+ */
+ _buffer = [];
+
+ /**
+ * @type {boolean}
+ */
+ _closed = false;
+
+ /**
+ * The count of items enqueued so far.
+ *
+ * @type {number}
+ */
+ count = 0;
+
+ /**
+ * @param {number} maxTotalItems
+ * @param {number} maxQueuedItems
+ * @param {CalBoundedReadableStreamUnderlyingSource} src
+ */
+ constructor(maxTotalItems, maxQueuedItems, src) {
+ this._maxTotalItems = maxTotalItems;
+ this._maxQueuedItems = maxQueuedItems;
+ this._src = src;
+ }
+
+ /**
+ * Indicates whether the maximum number of items have been added to the queue
+ * after which no more will be allowed.
+ *
+ * @type {number}
+ */
+ get maxTotalItemsReached() {
+ return this._maxTotalItems && this.count >= this._maxTotalItems;
+ }
+
+ /**
+ * Indicates whether the queue is full or not.
+ *
+ * @type {boolean}
+ */
+ get queueFull() {
+ return this._buffer.length >= this._maxQueuedItems;
+ }
+
+ /**
+ * Indicates how many more items can be enqueued based on the internal count
+ * kept.
+ *
+ * @type {number}
+ */
+ get remainingItemCount() {
+ return this._maxTotalItems ? this._maxTotalItems - this.count : Infinity;
+ }
+
+ /**
+ * Provides the value of the same property from the controller.
+ *
+ * @type {number}
+ */
+ get desiredSize() {
+ return this._controller.desiredSize;
+ }
+
+ /**
+ * Called by the ReadableStream to begin queueing items. This delegates to
+ * the provided underlying source.
+ *
+ * @param {ReadableStreamDefaultController} controller
+ */
+ async start(controller) {
+ this._controller = controller;
+ if (this._src.start) {
+ await this._src.start(this);
+ }
+ }
+
+ /**
+ * Called by the ReadableStream to receive more items when the queue has not
+ * been filled.
+ */
+ async pull() {
+ if (this._src.pull) {
+ await this._src.pull(this);
+ }
+ }
+
+ /**
+ * Called by the ReadableStream when reading has been cancelled.
+ *
+ * @param {string} reason
+ */
+ async cancel(reason) {
+ this._closed = true;
+ if (this._src.cancel) {
+ await this._src.cancel(reason);
+ }
+ }
+
+ /**
+ * Called by start() of the underlying source to add items to the queue. Items
+ * will only be added if maxTotalItemsReached returns false at which point
+ * the stream is automatically closed.
+ *
+ * @param {calIItemBase[]} items
+ */
+ enqueue(items) {
+ for (let item of items) {
+ if (this.queueFull) {
+ this.flush();
+ }
+ if (this.maxTotalItemsReached) {
+ return;
+ }
+ this._buffer.push(item);
+ }
+ this.flush();
+ }
+
+ /**
+ * Flushes the internal buffer if the number of buffered items have reached
+ * the threshold.
+ *
+ * @param {boolean} [force] - If true, will flush all items regardless of the
+ * threshold.
+ */
+ flush(force) {
+ if (force || this.queueFull) {
+ if (this.maxTotalItemsReached) {
+ return;
+ }
+ let buffer = this._buffer.slice(0, this.remainingItemCount);
+ this._controller.enqueue(buffer);
+ this.count += buffer.length;
+ this._buffer = [];
+ if (this.maxTotalItemsReached) {
+ this._controller.close();
+ }
+ }
+ }
+
+ /**
+ * Puts the stream in the error state.
+ *
+ * @param {Error} err
+ */
+ error(err) {
+ this._closed = true;
+ this._controller.error(err);
+ }
+
+ /**
+ * Closes the stream preventing any further items from being added to the queue.
+ */
+ close() {
+ if (!this._closed) {
+ if (this._buffer.length) {
+ this.flush(true);
+ }
+ this._closed = true;
+ this._controller.close();
+ }
+ }
+}
+
+/**
+ * Factory object for creating ReadableStreams of calIItemBase instances. This
+ * is used by the providers to satisfy getItems() calls from their respective
+ * backing stores.
+ */
+class CalReadableStreamFactory {
+ /**
+ * The default amount of items to queue before providing via the reader.
+ */
+ static defaultQueueSize = 10;
+
+ /**
+ * Creates a generic ReadableStream using the passed object as the
+ * UnderlyingSource. Use this method instead of creating streams directly
+ * until the API is more stable.
+ *
+ * @param {UnderlyingSource} src
+ *
+ * @returns {ReadableStream}
+ */
+ static createReadableStream(src) {
+ return new ReadableStream(src);
+ }
+
+ /**
+ * Creates a ReadableStream of calIItemBase items that tracks how many
+ * have been added to the queue. If maxTotalItems or more are enqueued, the
+ * stream will close ignoring further additions.
+ *
+ * @param {number} maxTotalItems
+ * @param {number} maxQueuedItems
+ * @param {UnderlyingSource} src
+ *
+ * @returns {ReadableStream<calIItemBase>}
+ */
+ static createBoundedReadableStream(maxTotalItems, maxQueuedItems, src) {
+ return new ReadableStream(
+ new CalBoundedReadableStreamController(maxTotalItems, maxQueuedItems, src)
+ );
+ }
+
+ /**
+ * Creates a ReadableStream that will provide no actual items.
+ *
+ * @returns {ReadableStream<calIItemBase>}
+ */
+ static createEmptyReadableStream() {
+ return new ReadableStream({
+ start(controller) {
+ controller.close();
+ },
+ });
+ }
+
+ /**
+ * Creates a ReadableStream that uses the one or more provided ReadableStreams
+ * for the source of its data. Each stream is read to completion one at a time
+ * and an error occurring while reading any will cause the main stream to end
+ * with in an error state.
+ *
+ * @param {ReadableStream[]} streams
+ * @returns {ReadableStream}
+ */
+ static createCombinedReadableStream(streams) {
+ return new ReadableStream({
+ async start(controller) {
+ for (let stream of streams) {
+ for await (let chunk of cal.iterate.streamValues(stream)) {
+ controller.enqueue(chunk);
+ }
+ }
+ controller.close();
+ },
+ });
+ }
+
+ /**
+ * Creates a ReadableStream from another stream where each chunk of the source
+ * stream is passed to a MapStreamFunction before enqueuing in the final stream.
+ *
+ * @param {ReadableStream}
+ * @param {MapStreamFunction}
+ *
+ * @returns {ReadableStream}
+ */
+ static createMappedReadableStream(stream, func) {
+ return new ReadableStream({
+ async start(controller) {
+ for await (let chunk of cal.iterate.streamValues(stream)) {
+ controller.enqueue(await func(chunk));
+ }
+ controller.close();
+ },
+ });
+ }
+}