From fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 03:14:29 +0200 Subject: Merging upstream version 125.0.1. Signed-off-by: Daniel Baumann --- .../components/promiseworker/PromiseWorker.sys.mjs | 175 +++++++++++++-------- .../promiseworker/tests/xpcshell/data/worker.js | 13 +- .../promiseworker/tests/xpcshell/test_Promise.js | 48 +++++- .../worker/PromiseWorker.template.worker.js | 40 ++++- 4 files changed, 203 insertions(+), 73 deletions(-) (limited to 'toolkit/components/promiseworker') diff --git a/toolkit/components/promiseworker/PromiseWorker.sys.mjs b/toolkit/components/promiseworker/PromiseWorker.sys.mjs index 59721e5663..bf5ae179cb 100644 --- a/toolkit/components/promiseworker/PromiseWorker.sys.mjs +++ b/toolkit/components/promiseworker/PromiseWorker.sys.mjs @@ -15,27 +15,6 @@ * counterpart PromiseWorker.js. */ -/** - * An implementation of queues (FIFO). - * - * The current implementation uses one array, runs in O(n ^ 2), and is optimized - * for the case in which queues are generally short. - */ -function Queue() { - this._array = []; -} -Queue.prototype = { - pop: function pop() { - return this._array.shift(); - }, - push: function push(x) { - return this._array.push(x); - }, - isEmpty: function isEmpty() { - return !this._array.length; - }, -}; - /** * Constructors for decoding standard exceptions received from the * worker. @@ -114,14 +93,31 @@ const EXCEPTION_CONSTRUCTORS = { * * @param {WorkerOptions} options The option parameter for ChromeWorker. * + * @param {Record} functions Functions that the worker can call. + * + * Functions can be synchronous functions or promises and return a value + * that is sent back to the worker. The function can also send back a + * `BasePromiseWorker.Meta` object containing the data and an array of transferrable + * objects to transfer data to the worker with zero memory copy via `postMessage`. + * + * Example of sunch a function: + * + * async function threadFunction(message) { + * return new BasePromiseWorker.Meta( + * ["data1", "data2", someBuffer], + * {transfers: [someBuffer]} + * ); + * } + * * @constructor */ -export var BasePromiseWorker = function (url, options = {}) { +export var BasePromiseWorker = function (url, options = {}, functions = {}) { if (typeof url != "string") { throw new TypeError("Expecting a string"); } this._url = url; this._options = options; + this._functions = functions; /** * A set of methods, with the following @@ -137,24 +133,24 @@ export var BasePromiseWorker = function (url, options = {}) { this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS); /** - * The queue of deferred, waiting for the completion of their + * The map of deferred, waiting for the completion of their * respective job by the worker. * - * Each item in the list may contain an additional field |closure|, + * Each item in the map may contain an additional field |closure|, * used to store strong references to value that must not be * garbage-collected before the reply has been received (e.g. * arrays). * - * @type {Queue<{deferred:deferred, closure:*=}>} + * @type {Map} */ - this._queue = new Queue(); + this._deferredJobs = new Map(); /** * The number of the current message. * * Used for debugging purposes. */ - this._id = 0; + this._deferredJobId = 0; /** * The instant at which the worker was launched. @@ -172,6 +168,11 @@ BasePromiseWorker.prototype = { // By Default, ignore all logs. }, + _generateDeferredJobId() { + this._deferredJobId += 1; + return "ThreadToWorker-" + this._deferredJobId; + }, + /** * Instantiate the worker lazily. */ @@ -205,9 +206,15 @@ BasePromiseWorker.prototype = { error.filename, error.lineno ); + error.preventDefault(); - let { deferred } = this._queue.pop(); - deferred.reject(error); + + if (this._deferredJobs.size > 0) { + this._deferredJobs.forEach(job => { + job.deferred.reject(error); + }); + this._deferredJobs.clear(); + } }; /** @@ -218,41 +225,74 @@ BasePromiseWorker.prototype = { * - {fail: some_error} in case of error, where * some_error is an instance of |PromiseWorker.WorkerError| * - * Messages may also contain a field |id| to help - * with debugging. - * - * Messages may also optionally contain a field |durationMs|, holding - * the duration of the function call in milliseconds. + * Messages also contains the following fields: + * - |id| an integer matching the deferred function to resolve (mandatory) + * - |fun| a string matching a function to call (optional) + * - |durationMs| holding the duration of the function call in milliseconds. (optional) * * @param {*} msg The message received from the worker. */ worker.onmessage = msg => { - this.log("Received message from worker", msg.data); - let handler = this._queue.pop(); - let deferred = handler.deferred; let data = msg.data; - if (data.id != handler.id) { - throw new Error( - "Internal error: expecting msg " + - handler.id + - ", " + - " got " + - data.id + - ": " + - JSON.stringify(msg.data) - ); - } + let messageId = data.id; + + this.log(`Received message ${messageId} from worker`); + if ("timeStamps" in data) { this.workerTimeStamps = data.timeStamps; } - if ("ok" in data) { - // Pass the data to the listeners. - deferred.resolve(data); - } else if ("fail" in data) { - // We have received an error that was serialized by the - // worker. - deferred.reject(new WorkerError(data.fail)); + + // If fun is provided by the worker, we look into the functions + if ("fun" in data) { + if (data.fun in this._functions) { + Promise.resolve(this._functions[data.fun](...data.args)).then( + ok => { + if (ok instanceof BasePromiseWorker.Meta) { + if ("transfers" in ok.meta) { + worker.postMessage( + { ok: ok.data, id: messageId }, + ok.meta.transfers + ); + } else { + worker.postMessage({ ok: ok.data, id: messageId }); + } + } else { + worker.postMessage({ id: messageId, ok }); + } + }, + fail => { + worker.postMessage({ id: messageId, fail }); + } + ); + } else { + worker.postMessage({ + id: messageId, + fail: `function ${data.fun} not found`, + }); + } + return; + } + + // If the message id matches one of the promise that waits, we resolve/reject with the data + if (this._deferredJobs.has(messageId)) { + let handler = this._deferredJobs.get(messageId); + let deferred = handler.deferred; + + if ("ok" in data) { + // Pass the data to the listeners. + deferred.resolve(data); + } else if ("fail" in data) { + // We have received an error that was serialized by the + // worker. + deferred.reject(new WorkerError(data.fail)); + } + return; } + + // in any other case, this is an unexpected message from the worker. + throw new Error( + `Unexpected message id ${messageId}, data: ${JSON.stringify(data)} ` + ); }; return worker; }, @@ -303,9 +343,9 @@ BasePromiseWorker.prototype = { }); } - let id = ++this._id; + let id = this._generateDeferredJobId(); let message = { fun, args, id }; - this.log("Posting message", message); + this.log("Posting message", JSON.stringify(message)); try { this._worker.postMessage(message, ...[transfers]); } catch (ex) { @@ -320,15 +360,14 @@ BasePromiseWorker.prototype = { } let deferred = Promise.withResolvers(); - this._queue.push({ deferred, closure, id }); - this.log("Message posted"); + this._deferredJobs.set(id, { deferred, closure, id }); let reply; try { this.log("Expecting reply"); reply = await deferred.promise; } catch (error) { - this.log("Got error", error); + this.log("Got error", JSON.stringify(error)); reply = error; if (error instanceof WorkerError) { @@ -389,7 +428,7 @@ BasePromiseWorker.prototype = { /** * Terminate the worker, if it has been created at all, and set things up to * be instantiated lazily again on the next `post()`. - * If there are pending Promises in the queue, we'll reject them and clear it. + * If there are pending Promises in the jobs, we'll reject them and clear it. */ terminate() { if (!this.__worker) { @@ -404,14 +443,12 @@ BasePromiseWorker.prototype = { this.log("Error whilst terminating ChromeWorker: " + ex.message); } - let error; - while (!this._queue.isEmpty()) { - if (!error) { - // We create this lazily, because error objects are not cheap. - error = new Error("Internal error: worker terminated"); - } - let { deferred } = this._queue.pop(); - deferred.reject(error); + if (this._deferredJobs.size) { + let error = new Error("Internal error: worker terminated"); + this._deferredJobs.forEach(job => { + job.deferred.reject(error); + }); + this._deferredJobs.clear(); } }, }; diff --git a/toolkit/components/promiseworker/tests/xpcshell/data/worker.js b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js index 30087bdc4a..94b6dd17ad 100644 --- a/toolkit/components/promiseworker/tests/xpcshell/data/worker.js +++ b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js @@ -12,8 +12,9 @@ importScripts("resource://gre/modules/workers/require.js"); var PromiseWorker = require("resource://gre/modules/workers/PromiseWorker.js"); var worker = new PromiseWorker.AbstractWorker(); -worker.dispatch = function (method, args = []) { - return Agent[method](...args); + +worker.dispatch = async function (method, args = []) { + return await Agent[method](...args); }; worker.postMessage = function (...args) { self.postMessage(...args); @@ -34,6 +35,14 @@ var Agent = { return args; }, + async bounceWithExtraCalls(...args) { + let result = await worker.callMainThread("echo", [ + "Posting something unrelated", + ]); + args.push(result.ok); + return args; + }, + throwError(msg, ...args) { throw new Error(msg); }, diff --git a/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js index f7581b664f..f9091a2b85 100644 --- a/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js +++ b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js @@ -16,7 +16,22 @@ const { setTimeout } = ChromeUtils.importESModule( var WORKER_SOURCE_URI = "chrome://promiseworker/content/worker.js"; do_load_manifest("data/chrome.manifest"); -var worker = new BasePromiseWorker(WORKER_SOURCE_URI); + +const UUID = crypto.randomUUID(); + +const SOME_ARRAY = new Uint8Array(4); +for (let i = 0; i < 4; ++i) { + SOME_ARRAY[i] = i; +} + +async function echo(message) { + return new BasePromiseWorker.Meta([message, UUID, SOME_ARRAY.buffer], { + transfers: [SOME_ARRAY.buffer], + }); +} + +var worker = new BasePromiseWorker(WORKER_SOURCE_URI, {}, { echo }); + worker.log = function (...args) { info("Controller: " + args.join(" ")); }; @@ -166,3 +181,34 @@ add_task(async function test_terminate() { "ChromeWorker instances should differ" ); }); + +function cloneArrayBuffer(original) { + const clone = new ArrayBuffer(original.byteLength); + const originalView = new Uint8Array(original); + const cloneView = new Uint8Array(clone); + cloneView.set(originalView); + return clone; +} + +add_task(async function test_bidirectional() { + // Before we transfer the array, we clone it + const arrayCopy = cloneArrayBuffer(SOME_ARRAY.buffer); + + let message = ["test_simple_args", Math.random()]; + + // Checking the array buffer size + Assert.equal( + SOME_ARRAY.buffer.byteLength, + 4, + "The buffer is not detached yet" + ); + let result = await worker.post("bounceWithExtraCalls", message); + + // After the post call, the array was transferred and SOME_ARRAY should be empty + Assert.equal(SOME_ARRAY.buffer.byteLength, 0, "The buffer has been detached"); + + // The echo() function in the worker adds to the message a string, an uuid and has the transferred array + message.push(["Posting something unrelated", UUID, arrayCopy]); + + Assert.deepEqual(result, message); +}); diff --git a/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js index c8203db48f..9ea26df9f5 100644 --- a/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js +++ b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js @@ -102,19 +102,57 @@ function Meta(data, meta) { */ function AbstractWorker(agent) { this._agent = agent; + this._deferredJobs = new Map(); + this._deferredJobId = 0; } + AbstractWorker.prototype = { // Default logger: discard all messages log() {}, + _generateDeferredJobId() { + this._deferredJobId += 1; + return "WorkerToThread-" + this._deferredJobId; + }, + + /** + * Post and wait for an answer from the thread. + */ + callMainThread(funcName, args) { + const messageId = this._generateDeferredJobId(); + + const message = { + id: messageId, + fun: funcName, + args, + }; + + return new Promise((resolve, reject) => { + this._deferredJobs.set(messageId, { resolve, reject }); + this.postMessage(message); + }); + }, + /** * Handle a message. */ async handleMessage(msg) { let data = msg.data; - this.log("Received message", data); let id = data.id; + // if the id is found in _deferredJobs, we proceed with the message + if (this._deferredJobs.has(id)) { + const { resolve, reject } = this._deferredJobs.get(id); + + if ("ok" in data) { + resolve(data); + } else if ("fail" in data) { + reject(data); + } + this._deferredJobs.delete(id); + return; + } + let start; let options; if (data.args) { -- cgit v1.2.3