summaryrefslogtreecommitdiffstats
path: root/toolkit/components/promiseworker
diff options
context:
space:
mode:
Diffstat (limited to 'toolkit/components/promiseworker')
-rw-r--r--toolkit/components/promiseworker/PromiseWorker.sys.mjs175
-rw-r--r--toolkit/components/promiseworker/tests/xpcshell/data/worker.js13
-rw-r--r--toolkit/components/promiseworker/tests/xpcshell/test_Promise.js48
-rw-r--r--toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js40
4 files changed, 203 insertions, 73 deletions
diff --git a/toolkit/components/promiseworker/PromiseWorker.sys.mjs b/toolkit/components/promiseworker/PromiseWorker.sys.mjs
index 59721e5663..bf5ae179cb 100644
--- a/toolkit/components/promiseworker/PromiseWorker.sys.mjs
+++ b/toolkit/components/promiseworker/PromiseWorker.sys.mjs
@@ -16,27 +16,6 @@
*/
/**
- * An implementation of queues (FIFO).
- *
- * The current implementation uses one array, runs in O(n ^ 2), and is optimized
- * for the case in which queues are generally short.
- */
-function Queue() {
- this._array = [];
-}
-Queue.prototype = {
- pop: function pop() {
- return this._array.shift();
- },
- push: function push(x) {
- return this._array.push(x);
- },
- isEmpty: function isEmpty() {
- return !this._array.length;
- },
-};
-
-/**
* Constructors for decoding standard exceptions received from the
* worker.
*/
@@ -114,14 +93,31 @@ const EXCEPTION_CONSTRUCTORS = {
*
* @param {WorkerOptions} options The option parameter for ChromeWorker.
*
+ * @param {Record<String, function>} functions Functions that the worker can call.
+ *
+ * Functions can be synchronous functions or promises and return a value
+ * that is sent back to the worker. The function can also send back a
+ * `BasePromiseWorker.Meta` object containing the data and an array of transferrable
+ * objects to transfer data to the worker with zero memory copy via `postMessage`.
+ *
+ * Example of sunch a function:
+ *
+ * async function threadFunction(message) {
+ * return new BasePromiseWorker.Meta(
+ * ["data1", "data2", someBuffer],
+ * {transfers: [someBuffer]}
+ * );
+ * }
+ *
* @constructor
*/
-export var BasePromiseWorker = function (url, options = {}) {
+export var BasePromiseWorker = function (url, options = {}, functions = {}) {
if (typeof url != "string") {
throw new TypeError("Expecting a string");
}
this._url = url;
this._options = options;
+ this._functions = functions;
/**
* A set of methods, with the following
@@ -137,24 +133,24 @@ export var BasePromiseWorker = function (url, options = {}) {
this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS);
/**
- * The queue of deferred, waiting for the completion of their
+ * The map of deferred, waiting for the completion of their
* respective job by the worker.
*
- * Each item in the list may contain an additional field |closure|,
+ * Each item in the map may contain an additional field |closure|,
* used to store strong references to value that must not be
* garbage-collected before the reply has been received (e.g.
* arrays).
*
- * @type {Queue<{deferred:deferred, closure:*=}>}
+ * @type {Map<string, {deferred, closure, id}>}
*/
- this._queue = new Queue();
+ this._deferredJobs = new Map();
/**
* The number of the current message.
*
* Used for debugging purposes.
*/
- this._id = 0;
+ this._deferredJobId = 0;
/**
* The instant at which the worker was launched.
@@ -172,6 +168,11 @@ BasePromiseWorker.prototype = {
// By Default, ignore all logs.
},
+ _generateDeferredJobId() {
+ this._deferredJobId += 1;
+ return "ThreadToWorker-" + this._deferredJobId;
+ },
+
/**
* Instantiate the worker lazily.
*/
@@ -205,9 +206,15 @@ BasePromiseWorker.prototype = {
error.filename,
error.lineno
);
+
error.preventDefault();
- let { deferred } = this._queue.pop();
- deferred.reject(error);
+
+ if (this._deferredJobs.size > 0) {
+ this._deferredJobs.forEach(job => {
+ job.deferred.reject(error);
+ });
+ this._deferredJobs.clear();
+ }
};
/**
@@ -218,41 +225,74 @@ BasePromiseWorker.prototype = {
* - {fail: some_error} in case of error, where
* some_error is an instance of |PromiseWorker.WorkerError|
*
- * Messages may also contain a field |id| to help
- * with debugging.
- *
- * Messages may also optionally contain a field |durationMs|, holding
- * the duration of the function call in milliseconds.
+ * Messages also contains the following fields:
+ * - |id| an integer matching the deferred function to resolve (mandatory)
+ * - |fun| a string matching a function to call (optional)
+ * - |durationMs| holding the duration of the function call in milliseconds. (optional)
*
* @param {*} msg The message received from the worker.
*/
worker.onmessage = msg => {
- this.log("Received message from worker", msg.data);
- let handler = this._queue.pop();
- let deferred = handler.deferred;
let data = msg.data;
- if (data.id != handler.id) {
- throw new Error(
- "Internal error: expecting msg " +
- handler.id +
- ", " +
- " got " +
- data.id +
- ": " +
- JSON.stringify(msg.data)
- );
- }
+ let messageId = data.id;
+
+ this.log(`Received message ${messageId} from worker`);
+
if ("timeStamps" in data) {
this.workerTimeStamps = data.timeStamps;
}
- if ("ok" in data) {
- // Pass the data to the listeners.
- deferred.resolve(data);
- } else if ("fail" in data) {
- // We have received an error that was serialized by the
- // worker.
- deferred.reject(new WorkerError(data.fail));
+
+ // If fun is provided by the worker, we look into the functions
+ if ("fun" in data) {
+ if (data.fun in this._functions) {
+ Promise.resolve(this._functions[data.fun](...data.args)).then(
+ ok => {
+ if (ok instanceof BasePromiseWorker.Meta) {
+ if ("transfers" in ok.meta) {
+ worker.postMessage(
+ { ok: ok.data, id: messageId },
+ ok.meta.transfers
+ );
+ } else {
+ worker.postMessage({ ok: ok.data, id: messageId });
+ }
+ } else {
+ worker.postMessage({ id: messageId, ok });
+ }
+ },
+ fail => {
+ worker.postMessage({ id: messageId, fail });
+ }
+ );
+ } else {
+ worker.postMessage({
+ id: messageId,
+ fail: `function ${data.fun} not found`,
+ });
+ }
+ return;
+ }
+
+ // If the message id matches one of the promise that waits, we resolve/reject with the data
+ if (this._deferredJobs.has(messageId)) {
+ let handler = this._deferredJobs.get(messageId);
+ let deferred = handler.deferred;
+
+ if ("ok" in data) {
+ // Pass the data to the listeners.
+ deferred.resolve(data);
+ } else if ("fail" in data) {
+ // We have received an error that was serialized by the
+ // worker.
+ deferred.reject(new WorkerError(data.fail));
+ }
+ return;
}
+
+ // in any other case, this is an unexpected message from the worker.
+ throw new Error(
+ `Unexpected message id ${messageId}, data: ${JSON.stringify(data)} `
+ );
};
return worker;
},
@@ -303,9 +343,9 @@ BasePromiseWorker.prototype = {
});
}
- let id = ++this._id;
+ let id = this._generateDeferredJobId();
let message = { fun, args, id };
- this.log("Posting message", message);
+ this.log("Posting message", JSON.stringify(message));
try {
this._worker.postMessage(message, ...[transfers]);
} catch (ex) {
@@ -320,15 +360,14 @@ BasePromiseWorker.prototype = {
}
let deferred = Promise.withResolvers();
- this._queue.push({ deferred, closure, id });
- this.log("Message posted");
+ this._deferredJobs.set(id, { deferred, closure, id });
let reply;
try {
this.log("Expecting reply");
reply = await deferred.promise;
} catch (error) {
- this.log("Got error", error);
+ this.log("Got error", JSON.stringify(error));
reply = error;
if (error instanceof WorkerError) {
@@ -389,7 +428,7 @@ BasePromiseWorker.prototype = {
/**
* Terminate the worker, if it has been created at all, and set things up to
* be instantiated lazily again on the next `post()`.
- * If there are pending Promises in the queue, we'll reject them and clear it.
+ * If there are pending Promises in the jobs, we'll reject them and clear it.
*/
terminate() {
if (!this.__worker) {
@@ -404,14 +443,12 @@ BasePromiseWorker.prototype = {
this.log("Error whilst terminating ChromeWorker: " + ex.message);
}
- let error;
- while (!this._queue.isEmpty()) {
- if (!error) {
- // We create this lazily, because error objects are not cheap.
- error = new Error("Internal error: worker terminated");
- }
- let { deferred } = this._queue.pop();
- deferred.reject(error);
+ if (this._deferredJobs.size) {
+ let error = new Error("Internal error: worker terminated");
+ this._deferredJobs.forEach(job => {
+ job.deferred.reject(error);
+ });
+ this._deferredJobs.clear();
}
},
};
diff --git a/toolkit/components/promiseworker/tests/xpcshell/data/worker.js b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js
index 30087bdc4a..94b6dd17ad 100644
--- a/toolkit/components/promiseworker/tests/xpcshell/data/worker.js
+++ b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js
@@ -12,8 +12,9 @@ importScripts("resource://gre/modules/workers/require.js");
var PromiseWorker = require("resource://gre/modules/workers/PromiseWorker.js");
var worker = new PromiseWorker.AbstractWorker();
-worker.dispatch = function (method, args = []) {
- return Agent[method](...args);
+
+worker.dispatch = async function (method, args = []) {
+ return await Agent[method](...args);
};
worker.postMessage = function (...args) {
self.postMessage(...args);
@@ -34,6 +35,14 @@ var Agent = {
return args;
},
+ async bounceWithExtraCalls(...args) {
+ let result = await worker.callMainThread("echo", [
+ "Posting something unrelated",
+ ]);
+ args.push(result.ok);
+ return args;
+ },
+
throwError(msg, ...args) {
throw new Error(msg);
},
diff --git a/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js
index f7581b664f..f9091a2b85 100644
--- a/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js
+++ b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js
@@ -16,7 +16,22 @@ const { setTimeout } = ChromeUtils.importESModule(
var WORKER_SOURCE_URI = "chrome://promiseworker/content/worker.js";
do_load_manifest("data/chrome.manifest");
-var worker = new BasePromiseWorker(WORKER_SOURCE_URI);
+
+const UUID = crypto.randomUUID();
+
+const SOME_ARRAY = new Uint8Array(4);
+for (let i = 0; i < 4; ++i) {
+ SOME_ARRAY[i] = i;
+}
+
+async function echo(message) {
+ return new BasePromiseWorker.Meta([message, UUID, SOME_ARRAY.buffer], {
+ transfers: [SOME_ARRAY.buffer],
+ });
+}
+
+var worker = new BasePromiseWorker(WORKER_SOURCE_URI, {}, { echo });
+
worker.log = function (...args) {
info("Controller: " + args.join(" "));
};
@@ -166,3 +181,34 @@ add_task(async function test_terminate() {
"ChromeWorker instances should differ"
);
});
+
+function cloneArrayBuffer(original) {
+ const clone = new ArrayBuffer(original.byteLength);
+ const originalView = new Uint8Array(original);
+ const cloneView = new Uint8Array(clone);
+ cloneView.set(originalView);
+ return clone;
+}
+
+add_task(async function test_bidirectional() {
+ // Before we transfer the array, we clone it
+ const arrayCopy = cloneArrayBuffer(SOME_ARRAY.buffer);
+
+ let message = ["test_simple_args", Math.random()];
+
+ // Checking the array buffer size
+ Assert.equal(
+ SOME_ARRAY.buffer.byteLength,
+ 4,
+ "The buffer is not detached yet"
+ );
+ let result = await worker.post("bounceWithExtraCalls", message);
+
+ // After the post call, the array was transferred and SOME_ARRAY should be empty
+ Assert.equal(SOME_ARRAY.buffer.byteLength, 0, "The buffer has been detached");
+
+ // The echo() function in the worker adds to the message a string, an uuid and has the transferred array
+ message.push(["Posting something unrelated", UUID, arrayCopy]);
+
+ Assert.deepEqual(result, message);
+});
diff --git a/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js
index c8203db48f..9ea26df9f5 100644
--- a/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js
+++ b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js
@@ -102,19 +102,57 @@ function Meta(data, meta) {
*/
function AbstractWorker(agent) {
this._agent = agent;
+ this._deferredJobs = new Map();
+ this._deferredJobId = 0;
}
+
AbstractWorker.prototype = {
// Default logger: discard all messages
log() {},
+ _generateDeferredJobId() {
+ this._deferredJobId += 1;
+ return "WorkerToThread-" + this._deferredJobId;
+ },
+
+ /**
+ * Post and wait for an answer from the thread.
+ */
+ callMainThread(funcName, args) {
+ const messageId = this._generateDeferredJobId();
+
+ const message = {
+ id: messageId,
+ fun: funcName,
+ args,
+ };
+
+ return new Promise((resolve, reject) => {
+ this._deferredJobs.set(messageId, { resolve, reject });
+ this.postMessage(message);
+ });
+ },
+
/**
* Handle a message.
*/
async handleMessage(msg) {
let data = msg.data;
- this.log("Received message", data);
let id = data.id;
+ // if the id is found in _deferredJobs, we proceed with the message
+ if (this._deferredJobs.has(id)) {
+ const { resolve, reject } = this._deferredJobs.get(id);
+
+ if ("ok" in data) {
+ resolve(data);
+ } else if ("fail" in data) {
+ reject(data);
+ }
+ this._deferredJobs.delete(id);
+ return;
+ }
+
let start;
let options;
if (data.args) {