summaryrefslogtreecommitdiffstats
path: root/toolkit/components/promiseworker/PromiseWorker.sys.mjs
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/promiseworker/PromiseWorker.sys.mjs')
-rw-r--r--toolkit/components/promiseworker/PromiseWorker.sys.mjs175
1 files changed, 106 insertions, 69 deletions
diff --git a/toolkit/components/promiseworker/PromiseWorker.sys.mjs b/toolkit/components/promiseworker/PromiseWorker.sys.mjs
index 59721e5663..bf5ae179cb 100644
--- a/toolkit/components/promiseworker/PromiseWorker.sys.mjs
+++ b/toolkit/components/promiseworker/PromiseWorker.sys.mjs
@@ -16,27 +16,6 @@
*/
/**
- * An implementation of queues (FIFO).
- *
- * The current implementation uses one array, runs in O(n ^ 2), and is optimized
- * for the case in which queues are generally short.
- */
-function Queue() {
- this._array = [];
-}
-Queue.prototype = {
- pop: function pop() {
- return this._array.shift();
- },
- push: function push(x) {
- return this._array.push(x);
- },
- isEmpty: function isEmpty() {
- return !this._array.length;
- },
-};
-
-/**
* Constructors for decoding standard exceptions received from the
* worker.
*/
@@ -114,14 +93,31 @@ const EXCEPTION_CONSTRUCTORS = {
*
* @param {WorkerOptions} options The option parameter for ChromeWorker.
*
+ * @param {Record<String, function>} functions Functions that the worker can call.
+ *
+ * Functions can be synchronous functions or promises and return a value
+ * that is sent back to the worker. The function can also send back a
+ * `BasePromiseWorker.Meta` object containing the data and an array of transferrable
+ * objects to transfer data to the worker with zero memory copy via `postMessage`.
+ *
+ * Example of sunch a function:
+ *
+ * async function threadFunction(message) {
+ * return new BasePromiseWorker.Meta(
+ * ["data1", "data2", someBuffer],
+ * {transfers: [someBuffer]}
+ * );
+ * }
+ *
* @constructor
*/
-export var BasePromiseWorker = function (url, options = {}) {
+export var BasePromiseWorker = function (url, options = {}, functions = {}) {
if (typeof url != "string") {
throw new TypeError("Expecting a string");
}
this._url = url;
this._options = options;
+ this._functions = functions;
/**
* A set of methods, with the following
@@ -137,24 +133,24 @@ export var BasePromiseWorker = function (url, options = {}) {
this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS);
/**
- * The queue of deferred, waiting for the completion of their
+ * The map of deferred, waiting for the completion of their
* respective job by the worker.
*
- * Each item in the list may contain an additional field |closure|,
+ * Each item in the map may contain an additional field |closure|,
* used to store strong references to value that must not be
* garbage-collected before the reply has been received (e.g.
* arrays).
*
- * @type {Queue<{deferred:deferred, closure:*=}>}
+ * @type {Map<string, {deferred, closure, id}>}
*/
- this._queue = new Queue();
+ this._deferredJobs = new Map();
/**
* The number of the current message.
*
* Used for debugging purposes.
*/
- this._id = 0;
+ this._deferredJobId = 0;
/**
* The instant at which the worker was launched.
@@ -172,6 +168,11 @@ BasePromiseWorker.prototype = {
// By Default, ignore all logs.
},
+ _generateDeferredJobId() {
+ this._deferredJobId += 1;
+ return "ThreadToWorker-" + this._deferredJobId;
+ },
+
/**
* Instantiate the worker lazily.
*/
@@ -205,9 +206,15 @@ BasePromiseWorker.prototype = {
error.filename,
error.lineno
);
+
error.preventDefault();
- let { deferred } = this._queue.pop();
- deferred.reject(error);
+
+ if (this._deferredJobs.size > 0) {
+ this._deferredJobs.forEach(job => {
+ job.deferred.reject(error);
+ });
+ this._deferredJobs.clear();
+ }
};
/**
@@ -218,41 +225,74 @@ BasePromiseWorker.prototype = {
* - {fail: some_error} in case of error, where
* some_error is an instance of |PromiseWorker.WorkerError|
*
- * Messages may also contain a field |id| to help
- * with debugging.
- *
- * Messages may also optionally contain a field |durationMs|, holding
- * the duration of the function call in milliseconds.
+ * Messages also contains the following fields:
+ * - |id| an integer matching the deferred function to resolve (mandatory)
+ * - |fun| a string matching a function to call (optional)
+ * - |durationMs| holding the duration of the function call in milliseconds. (optional)
*
* @param {*} msg The message received from the worker.
*/
worker.onmessage = msg => {
- this.log("Received message from worker", msg.data);
- let handler = this._queue.pop();
- let deferred = handler.deferred;
let data = msg.data;
- if (data.id != handler.id) {
- throw new Error(
- "Internal error: expecting msg " +
- handler.id +
- ", " +
- " got " +
- data.id +
- ": " +
- JSON.stringify(msg.data)
- );
- }
+ let messageId = data.id;
+
+ this.log(`Received message ${messageId} from worker`);
+
if ("timeStamps" in data) {
this.workerTimeStamps = data.timeStamps;
}
- if ("ok" in data) {
- // Pass the data to the listeners.
- deferred.resolve(data);
- } else if ("fail" in data) {
- // We have received an error that was serialized by the
- // worker.
- deferred.reject(new WorkerError(data.fail));
+
+ // If fun is provided by the worker, we look into the functions
+ if ("fun" in data) {
+ if (data.fun in this._functions) {
+ Promise.resolve(this._functions[data.fun](...data.args)).then(
+ ok => {
+ if (ok instanceof BasePromiseWorker.Meta) {
+ if ("transfers" in ok.meta) {
+ worker.postMessage(
+ { ok: ok.data, id: messageId },
+ ok.meta.transfers
+ );
+ } else {
+ worker.postMessage({ ok: ok.data, id: messageId });
+ }
+ } else {
+ worker.postMessage({ id: messageId, ok });
+ }
+ },
+ fail => {
+ worker.postMessage({ id: messageId, fail });
+ }
+ );
+ } else {
+ worker.postMessage({
+ id: messageId,
+ fail: `function ${data.fun} not found`,
+ });
+ }
+ return;
+ }
+
+ // If the message id matches one of the promise that waits, we resolve/reject with the data
+ if (this._deferredJobs.has(messageId)) {
+ let handler = this._deferredJobs.get(messageId);
+ let deferred = handler.deferred;
+
+ if ("ok" in data) {
+ // Pass the data to the listeners.
+ deferred.resolve(data);
+ } else if ("fail" in data) {
+ // We have received an error that was serialized by the
+ // worker.
+ deferred.reject(new WorkerError(data.fail));
+ }
+ return;
}
+
+ // in any other case, this is an unexpected message from the worker.
+ throw new Error(
+ `Unexpected message id ${messageId}, data: ${JSON.stringify(data)} `
+ );
};
return worker;
},
@@ -303,9 +343,9 @@ BasePromiseWorker.prototype = {
});
}
- let id = ++this._id;
+ let id = this._generateDeferredJobId();
let message = { fun, args, id };
- this.log("Posting message", message);
+ this.log("Posting message", JSON.stringify(message));
try {
this._worker.postMessage(message, ...[transfers]);
} catch (ex) {
@@ -320,15 +360,14 @@ BasePromiseWorker.prototype = {
}
let deferred = Promise.withResolvers();
- this._queue.push({ deferred, closure, id });
- this.log("Message posted");
+ this._deferredJobs.set(id, { deferred, closure, id });
let reply;
try {
this.log("Expecting reply");
reply = await deferred.promise;
} catch (error) {
- this.log("Got error", error);
+ this.log("Got error", JSON.stringify(error));
reply = error;
if (error instanceof WorkerError) {
@@ -389,7 +428,7 @@ BasePromiseWorker.prototype = {
/**
* Terminate the worker, if it has been created at all, and set things up to
* be instantiated lazily again on the next `post()`.
- * If there are pending Promises in the queue, we'll reject them and clear it.
+ * If there are pending Promises in the jobs, we'll reject them and clear it.
*/
terminate() {
if (!this.__worker) {
@@ -404,14 +443,12 @@ BasePromiseWorker.prototype = {
this.log("Error whilst terminating ChromeWorker: " + ex.message);
}
- let error;
- while (!this._queue.isEmpty()) {
- if (!error) {
- // We create this lazily, because error objects are not cheap.
- error = new Error("Internal error: worker terminated");
- }
- let { deferred } = this._queue.pop();
- deferred.reject(error);
+ if (this._deferredJobs.size) {
+ let error = new Error("Internal error: worker terminated");
+ this._deferredJobs.forEach(job => {
+ job.deferred.reject(error);
+ });
+ this._deferredJobs.clear();
}
},
};