diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:33 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 01:13:33 +0000 |
commit | 086c044dc34dfc0f74fbe41f4ecb402b2cd34884 (patch) | |
tree | a4f824bd33cb075dd5aa3eb5a0a94af221bbe83a /toolkit/components/promiseworker/PromiseWorker.sys.mjs | |
parent | Adding debian version 124.0.1-1. (diff) | |
download | firefox-086c044dc34dfc0f74fbe41f4ecb402b2cd34884.tar.xz firefox-086c044dc34dfc0f74fbe41f4ecb402b2cd34884.zip |
Merging upstream version 125.0.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'toolkit/components/promiseworker/PromiseWorker.sys.mjs')
-rw-r--r-- | toolkit/components/promiseworker/PromiseWorker.sys.mjs | 175 |
1 files changed, 106 insertions, 69 deletions
diff --git a/toolkit/components/promiseworker/PromiseWorker.sys.mjs b/toolkit/components/promiseworker/PromiseWorker.sys.mjs index 59721e5663..bf5ae179cb 100644 --- a/toolkit/components/promiseworker/PromiseWorker.sys.mjs +++ b/toolkit/components/promiseworker/PromiseWorker.sys.mjs @@ -16,27 +16,6 @@ */ /** - * An implementation of queues (FIFO). - * - * The current implementation uses one array, runs in O(n ^ 2), and is optimized - * for the case in which queues are generally short. - */ -function Queue() { - this._array = []; -} -Queue.prototype = { - pop: function pop() { - return this._array.shift(); - }, - push: function push(x) { - return this._array.push(x); - }, - isEmpty: function isEmpty() { - return !this._array.length; - }, -}; - -/** * Constructors for decoding standard exceptions received from the * worker. */ @@ -114,14 +93,31 @@ const EXCEPTION_CONSTRUCTORS = { * * @param {WorkerOptions} options The option parameter for ChromeWorker. * + * @param {Record<String, function>} functions Functions that the worker can call. + * + * Functions can be synchronous functions or promises and return a value + * that is sent back to the worker. The function can also send back a + * `BasePromiseWorker.Meta` object containing the data and an array of transferrable + * objects to transfer data to the worker with zero memory copy via `postMessage`. + * + * Example of sunch a function: + * + * async function threadFunction(message) { + * return new BasePromiseWorker.Meta( + * ["data1", "data2", someBuffer], + * {transfers: [someBuffer]} + * ); + * } + * * @constructor */ -export var BasePromiseWorker = function (url, options = {}) { +export var BasePromiseWorker = function (url, options = {}, functions = {}) { if (typeof url != "string") { throw new TypeError("Expecting a string"); } this._url = url; this._options = options; + this._functions = functions; /** * A set of methods, with the following @@ -137,24 +133,24 @@ export var BasePromiseWorker = function (url, options = {}) { this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS); /** - * The queue of deferred, waiting for the completion of their + * The map of deferred, waiting for the completion of their * respective job by the worker. * - * Each item in the list may contain an additional field |closure|, + * Each item in the map may contain an additional field |closure|, * used to store strong references to value that must not be * garbage-collected before the reply has been received (e.g. * arrays). * - * @type {Queue<{deferred:deferred, closure:*=}>} + * @type {Map<string, {deferred, closure, id}>} */ - this._queue = new Queue(); + this._deferredJobs = new Map(); /** * The number of the current message. * * Used for debugging purposes. */ - this._id = 0; + this._deferredJobId = 0; /** * The instant at which the worker was launched. @@ -172,6 +168,11 @@ BasePromiseWorker.prototype = { // By Default, ignore all logs. }, + _generateDeferredJobId() { + this._deferredJobId += 1; + return "ThreadToWorker-" + this._deferredJobId; + }, + /** * Instantiate the worker lazily. */ @@ -205,9 +206,15 @@ BasePromiseWorker.prototype = { error.filename, error.lineno ); + error.preventDefault(); - let { deferred } = this._queue.pop(); - deferred.reject(error); + + if (this._deferredJobs.size > 0) { + this._deferredJobs.forEach(job => { + job.deferred.reject(error); + }); + this._deferredJobs.clear(); + } }; /** @@ -218,41 +225,74 @@ BasePromiseWorker.prototype = { * - {fail: some_error} in case of error, where * some_error is an instance of |PromiseWorker.WorkerError| * - * Messages may also contain a field |id| to help - * with debugging. - * - * Messages may also optionally contain a field |durationMs|, holding - * the duration of the function call in milliseconds. + * Messages also contains the following fields: + * - |id| an integer matching the deferred function to resolve (mandatory) + * - |fun| a string matching a function to call (optional) + * - |durationMs| holding the duration of the function call in milliseconds. (optional) * * @param {*} msg The message received from the worker. */ worker.onmessage = msg => { - this.log("Received message from worker", msg.data); - let handler = this._queue.pop(); - let deferred = handler.deferred; let data = msg.data; - if (data.id != handler.id) { - throw new Error( - "Internal error: expecting msg " + - handler.id + - ", " + - " got " + - data.id + - ": " + - JSON.stringify(msg.data) - ); - } + let messageId = data.id; + + this.log(`Received message ${messageId} from worker`); + if ("timeStamps" in data) { this.workerTimeStamps = data.timeStamps; } - if ("ok" in data) { - // Pass the data to the listeners. - deferred.resolve(data); - } else if ("fail" in data) { - // We have received an error that was serialized by the - // worker. - deferred.reject(new WorkerError(data.fail)); + + // If fun is provided by the worker, we look into the functions + if ("fun" in data) { + if (data.fun in this._functions) { + Promise.resolve(this._functions[data.fun](...data.args)).then( + ok => { + if (ok instanceof BasePromiseWorker.Meta) { + if ("transfers" in ok.meta) { + worker.postMessage( + { ok: ok.data, id: messageId }, + ok.meta.transfers + ); + } else { + worker.postMessage({ ok: ok.data, id: messageId }); + } + } else { + worker.postMessage({ id: messageId, ok }); + } + }, + fail => { + worker.postMessage({ id: messageId, fail }); + } + ); + } else { + worker.postMessage({ + id: messageId, + fail: `function ${data.fun} not found`, + }); + } + return; + } + + // If the message id matches one of the promise that waits, we resolve/reject with the data + if (this._deferredJobs.has(messageId)) { + let handler = this._deferredJobs.get(messageId); + let deferred = handler.deferred; + + if ("ok" in data) { + // Pass the data to the listeners. + deferred.resolve(data); + } else if ("fail" in data) { + // We have received an error that was serialized by the + // worker. + deferred.reject(new WorkerError(data.fail)); + } + return; } + + // in any other case, this is an unexpected message from the worker. + throw new Error( + `Unexpected message id ${messageId}, data: ${JSON.stringify(data)} ` + ); }; return worker; }, @@ -303,9 +343,9 @@ BasePromiseWorker.prototype = { }); } - let id = ++this._id; + let id = this._generateDeferredJobId(); let message = { fun, args, id }; - this.log("Posting message", message); + this.log("Posting message", JSON.stringify(message)); try { this._worker.postMessage(message, ...[transfers]); } catch (ex) { @@ -320,15 +360,14 @@ BasePromiseWorker.prototype = { } let deferred = Promise.withResolvers(); - this._queue.push({ deferred, closure, id }); - this.log("Message posted"); + this._deferredJobs.set(id, { deferred, closure, id }); let reply; try { this.log("Expecting reply"); reply = await deferred.promise; } catch (error) { - this.log("Got error", error); + this.log("Got error", JSON.stringify(error)); reply = error; if (error instanceof WorkerError) { @@ -389,7 +428,7 @@ BasePromiseWorker.prototype = { /** * Terminate the worker, if it has been created at all, and set things up to * be instantiated lazily again on the next `post()`. - * If there are pending Promises in the queue, we'll reject them and clear it. + * If there are pending Promises in the jobs, we'll reject them and clear it. */ terminate() { if (!this.__worker) { @@ -404,14 +443,12 @@ BasePromiseWorker.prototype = { this.log("Error whilst terminating ChromeWorker: " + ex.message); } - let error; - while (!this._queue.isEmpty()) { - if (!error) { - // We create this lazily, because error objects are not cheap. - error = new Error("Internal error: worker terminated"); - } - let { deferred } = this._queue.pop(); - deferred.reject(error); + if (this._deferredJobs.size) { + let error = new Error("Internal error: worker terminated"); + this._deferredJobs.forEach(job => { + job.deferred.reject(error); + }); + this._deferredJobs.clear(); } }, }; |