diff options
Diffstat (limited to 'toolkit/components/promiseworker')
11 files changed, 1074 insertions, 0 deletions
diff --git a/toolkit/components/promiseworker/PromiseWorker.sys.mjs b/toolkit/components/promiseworker/PromiseWorker.sys.mjs new file mode 100644 index 0000000000..59721e5663 --- /dev/null +++ b/toolkit/components/promiseworker/PromiseWorker.sys.mjs @@ -0,0 +1,444 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * A wrapper around ChromeWorker with extended capabilities designed + * to simplify main thread-to-worker thread asynchronous function calls. + * + * This wrapper: + * - groups requests and responses as a method `post` that returns a `Promise`; + * - ensures that exceptions thrown on the worker thread are correctly deserialized; + * - provides some utilities for benchmarking various operations. + * + * Generally, you should use PromiseWorker.sys.mjs along with its worker-side + * counterpart PromiseWorker.js. + */ + +/** + * An implementation of queues (FIFO). + * + * The current implementation uses one array, runs in O(n ^ 2), and is optimized + * for the case in which queues are generally short. + */ +function Queue() { + this._array = []; +} +Queue.prototype = { + pop: function pop() { + return this._array.shift(); + }, + push: function push(x) { + return this._array.push(x); + }, + isEmpty: function isEmpty() { + return !this._array.length; + }, +}; + +/** + * Constructors for decoding standard exceptions received from the + * worker. + */ +const EXCEPTION_CONSTRUCTORS = { + EvalError(error) { + let result = new EvalError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + InternalError(error) { + let result = new InternalError( + error.message, + error.fileName, + error.lineNumber + ); + result.stack = error.stack; + return result; + }, + RangeError(error) { + let result = new RangeError( + error.message, + error.fileName, + error.lineNumber + ); + result.stack = error.stack; + return result; + }, + ReferenceError(error) { + let result = new ReferenceError( + error.message, + error.fileName, + error.lineNumber + ); + result.stack = error.stack; + return result; + }, + SyntaxError(error) { + let result = new SyntaxError( + error.message, + error.fileName, + error.lineNumber + ); + result.stack = error.stack; + return result; + }, + TypeError(error) { + let result = new TypeError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + URIError(error) { + let result = new URIError(error.message, error.fileName, error.lineNumber); + result.stack = error.stack; + return result; + }, + DOMException(error) { + let result = new DOMException(error.message, error.name); + return result; + }, +}; + +/** + * An object responsible for dispatching messages to a chrome worker + * and routing the responses. + * + * Instances of this constructor who need logging may provide a method + * `log: function(...args) { ... }` in charge of printing out (or + * discarding) logs. + * + * Instances of this constructor may add exception handlers to + * `this.ExceptionHandlers`, if they need to handle custom exceptions. + * + * @param {string} url The url containing the source code for this worker, + * as in constructor ChromeWorker. + * + * @param {WorkerOptions} options The option parameter for ChromeWorker. + * + * @constructor + */ +export var BasePromiseWorker = function (url, options = {}) { + if (typeof url != "string") { + throw new TypeError("Expecting a string"); + } + this._url = url; + this._options = options; + + /** + * A set of methods, with the following + * + * ConstructorName: function({message, fileName, lineNumber}) { + * // Construct a new instance of ConstructorName based on + * // `message`, `fileName`, `lineNumber` + * } + * + * By default, this covers EvalError, InternalError, RangeError, + * ReferenceError, SyntaxError, TypeError, URIError. + */ + this.ExceptionHandlers = Object.create(EXCEPTION_CONSTRUCTORS); + + /** + * The queue of deferred, waiting for the completion of their + * respective job by the worker. + * + * Each item in the list may contain an additional field |closure|, + * used to store strong references to value that must not be + * garbage-collected before the reply has been received (e.g. + * arrays). + * + * @type {Queue<{deferred:deferred, closure:*=}>} + */ + this._queue = new Queue(); + + /** + * The number of the current message. + * + * Used for debugging purposes. + */ + this._id = 0; + + /** + * The instant at which the worker was launched. + */ + this.launchTimeStamp = null; + + /** + * Timestamps provided by the worker for statistics purposes. + */ + this.workerTimeStamps = null; +}; + +BasePromiseWorker.prototype = { + log() { + // By Default, ignore all logs. + }, + + /** + * Instantiate the worker lazily. + */ + get _worker() { + if (this.__worker) { + return this.__worker; + } + + let worker = (this.__worker = new ChromeWorker(this._url, this._options)); + + // We assume that we call to _worker for the purpose of calling + // postMessage(). + this.launchTimeStamp = Date.now(); + + /** + * Receive errors that have been serialized by the built-in mechanism + * of DOM/Chrome Workers. + * + * PromiseWorker.js knows how to serialize a number of errors + * without losing information. These are treated by + * |worker.onmessage|. However, for other errors, we rely on + * DOM's mechanism for serializing errors, which transmits these + * errors through |worker.onerror|. + * + * @param {Error} error Some JS error. + */ + worker.onerror = error => { + this.log( + "Received uncaught error from worker", + error.message, + error.filename, + error.lineno + ); + error.preventDefault(); + let { deferred } = this._queue.pop(); + deferred.reject(error); + }; + + /** + * Receive messages from the worker, propagate them to the listeners. + * + * Messages must have one of the following shapes: + * - {ok: some_value} in case of success + * - {fail: some_error} in case of error, where + * some_error is an instance of |PromiseWorker.WorkerError| + * + * Messages may also contain a field |id| to help + * with debugging. + * + * Messages may also optionally contain a field |durationMs|, holding + * the duration of the function call in milliseconds. + * + * @param {*} msg The message received from the worker. + */ + worker.onmessage = msg => { + this.log("Received message from worker", msg.data); + let handler = this._queue.pop(); + let deferred = handler.deferred; + let data = msg.data; + if (data.id != handler.id) { + throw new Error( + "Internal error: expecting msg " + + handler.id + + ", " + + " got " + + data.id + + ": " + + JSON.stringify(msg.data) + ); + } + if ("timeStamps" in data) { + this.workerTimeStamps = data.timeStamps; + } + if ("ok" in data) { + // Pass the data to the listeners. + deferred.resolve(data); + } else if ("fail" in data) { + // We have received an error that was serialized by the + // worker. + deferred.reject(new WorkerError(data.fail)); + } + }; + return worker; + }, + + /** + * Post a message to a worker. + * + * @param {string} fun The name of the function to call. + * @param {Array} args The arguments to pass to `fun`. If any + * of the arguments is a Promise, it is resolved before posting the + * message. If any of the arguments needs to be transfered instead + * of copied, this may be specified by making the argument an instance + * of `BasePromiseWorker.Meta` or by using the `transfers` argument. + * By convention, the last argument may be an object `options` + * with some of the following fields: + * - {number|null} outExecutionDuration A parameter to be filled with the + * duration of the off main thread execution for this call. + * @param {*=} closure An object holding references that should not be + * garbage-collected before the message treatment is complete. + * @param {Array=} transfers An array of objects that should be transfered + * to the worker instead of being copied. If any of the objects is a Promise, + * it is resolved before posting the message. + * + * @return {promise} + */ + post(fun, args, closure, transfers) { + return async function postMessage() { + // Normalize in case any of the arguments is a promise + if (args) { + args = await Promise.resolve(Promise.all(args)); + } + if (transfers) { + transfers = await Promise.resolve(Promise.all(transfers)); + } else { + transfers = []; + } + + if (args) { + // Extract `Meta` data + args = args.map(arg => { + if (arg instanceof BasePromiseWorker.Meta) { + if (arg.meta && "transfers" in arg.meta) { + transfers.push(...arg.meta.transfers); + } + return arg.data; + } + return arg; + }); + } + + let id = ++this._id; + let message = { fun, args, id }; + this.log("Posting message", message); + try { + this._worker.postMessage(message, ...[transfers]); + } catch (ex) { + if (typeof ex == "number") { + this.log("Could not post message", message, "due to xpcom error", ex); + // handle raw xpcom errors (see eg bug 961317) + throw new Components.Exception("Error in postMessage", ex); + } + + this.log("Could not post message", message, "due to error", ex); + throw ex; + } + + let deferred = Promise.withResolvers(); + this._queue.push({ deferred, closure, id }); + this.log("Message posted"); + + let reply; + try { + this.log("Expecting reply"); + reply = await deferred.promise; + } catch (error) { + this.log("Got error", error); + reply = error; + + if (error instanceof WorkerError) { + // We know how to deserialize most well-known errors + throw this.ExceptionHandlers[error.data.exn](error.data); + } + + if (ErrorEvent.isInstance(error)) { + // Other errors get propagated as instances of ErrorEvent + this.log( + "Error serialized by DOM", + error.message, + error.filename, + error.lineno + ); + throw new Error(error.message, error.filename, error.lineno); + } + + // We don't know about this kind of error + throw error; + } + + // By convention, the last argument may be an object `options`. + let options = null; + if (args) { + options = args[args.length - 1]; + } + + // Check for duration and return result. + if ( + !options || + typeof options !== "object" || + !("outExecutionDuration" in options) + ) { + return reply.ok; + } + // If reply.durationMs is not present, just return the result, + // without updating durations (there was an error in the method + // dispatch). + if (!("durationMs" in reply)) { + return reply.ok; + } + // Bug 874425 demonstrates that two successive calls to Date.now() + // can actually produce an interval with negative duration. + // We assume that this is due to an operation that is so short + // that Date.now() is not monotonic, so we round this up to 0. + let durationMs = Math.max(0, reply.durationMs); + // Accumulate (or initialize) outExecutionDuration + if (typeof options.outExecutionDuration == "number") { + options.outExecutionDuration += durationMs; + } else { + options.outExecutionDuration = durationMs; + } + return reply.ok; + }.bind(this)(); + }, + + /** + * Terminate the worker, if it has been created at all, and set things up to + * be instantiated lazily again on the next `post()`. + * If there are pending Promises in the queue, we'll reject them and clear it. + */ + terminate() { + if (!this.__worker) { + return; + } + + try { + this.__worker.terminate(); + delete this.__worker; + } catch (ex) { + // Ignore exceptions, only log them. + this.log("Error whilst terminating ChromeWorker: " + ex.message); + } + + let error; + while (!this._queue.isEmpty()) { + if (!error) { + // We create this lazily, because error objects are not cheap. + error = new Error("Internal error: worker terminated"); + } + let { deferred } = this._queue.pop(); + deferred.reject(error); + } + }, +}; + +/** + * An error that has been serialized by the worker. + * + * @constructor + */ +function WorkerError(data) { + this.data = data; +} + +/** + * A constructor used to send data to the worker thread while + * with special treatment (e.g. transmitting data instead of + * copying it). + * + * @param {object=} data The data to send to the caller thread. + * @param {object=} meta Additional instructions, as an object + * that may contain the following fields: + * - {Array} transfers An array of objects that should be transferred + * instead of being copied. + * + * @constructor + */ +BasePromiseWorker.Meta = function (data, meta) { + this.data = data; + this.meta = meta; +}; diff --git a/toolkit/components/promiseworker/moz.build b/toolkit/components/promiseworker/moz.build new file mode 100644 index 0000000000..3be16b9bfd --- /dev/null +++ b/toolkit/components/promiseworker/moz.build @@ -0,0 +1,16 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +DIRS += ["worker"] + +EXTRA_JS_MODULES += [ + "PromiseWorker.sys.mjs", +] + +XPCSHELL_TESTS_MANIFESTS += ["tests/xpcshell/xpcshell.toml"] + +with Files("**"): + BUG_COMPONENT = ("Toolkit", "Async Tooling") diff --git a/toolkit/components/promiseworker/tests/xpcshell/data/chrome.manifest b/toolkit/components/promiseworker/tests/xpcshell/data/chrome.manifest new file mode 100644 index 0000000000..9e5dd29b22 --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/data/chrome.manifest @@ -0,0 +1 @@ +content promiseworker ./ diff --git a/toolkit/components/promiseworker/tests/xpcshell/data/worker.js b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js new file mode 100644 index 0000000000..30087bdc4a --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/data/worker.js @@ -0,0 +1,40 @@ +/* Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ */ + +/* eslint-env worker */ + +"use strict"; + +// Trivial worker definition + +/* import-globals-from /toolkit/components/workerloader/require.js */ +importScripts("resource://gre/modules/workers/require.js"); +var PromiseWorker = require("resource://gre/modules/workers/PromiseWorker.js"); + +var worker = new PromiseWorker.AbstractWorker(); +worker.dispatch = function (method, args = []) { + return Agent[method](...args); +}; +worker.postMessage = function (...args) { + self.postMessage(...args); +}; +worker.close = function () { + self.close(); +}; +worker.log = function (...args) { + dump("Worker: " + args.join(" ") + "\n"); +}; +self.addEventListener("message", msg => worker.handleMessage(msg)); +self.addEventListener("unhandledrejection", function (error) { + throw error.reason; +}); + +var Agent = { + bounce(...args) { + return args; + }, + + throwError(msg, ...args) { + throw new Error(msg); + }, +}; diff --git a/toolkit/components/promiseworker/tests/xpcshell/data/worker.mjs b/toolkit/components/promiseworker/tests/xpcshell/data/worker.mjs new file mode 100644 index 0000000000..98a4b0ee5b --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/data/worker.mjs @@ -0,0 +1,36 @@ +/* Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ */ + +/* eslint-env mozilla/worker */ + +// Trivial worker definition + +import { PromiseWorker } from "resource://gre/modules/workers/PromiseWorker.mjs"; + +var worker = new PromiseWorker.AbstractWorker(); +worker.dispatch = function (method, args = []) { + return Agent[method](...args); +}; +worker.postMessage = function (...args) { + self.postMessage(...args); +}; +worker.close = function () { + self.close(); +}; +worker.log = function (...args) { + dump("Worker: " + args.join(" ") + "\n"); +}; +self.addEventListener("message", msg => worker.handleMessage(msg)); +self.addEventListener("unhandledrejection", function (error) { + throw error.reason; +}); + +var Agent = { + bounce(...args) { + return args; + }, + + throwError(msg, ...args) { + throw new Error(msg); + }, +}; diff --git a/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js new file mode 100644 index 0000000000..f7581b664f --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/test_Promise.js @@ -0,0 +1,168 @@ +/* Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ */ +/* eslint-disable mozilla/no-arbitrary-setTimeout */ + +"use strict"; + +const { BasePromiseWorker } = ChromeUtils.importESModule( + "resource://gre/modules/PromiseWorker.sys.mjs" +); +const { setTimeout } = ChromeUtils.importESModule( + "resource://gre/modules/Timer.sys.mjs" +); + +// Worker must be loaded from a chrome:// uri, not a file:// +// uri, so we first need to load it. + +var WORKER_SOURCE_URI = "chrome://promiseworker/content/worker.js"; +do_load_manifest("data/chrome.manifest"); +var worker = new BasePromiseWorker(WORKER_SOURCE_URI); +worker.log = function (...args) { + info("Controller: " + args.join(" ")); +}; + +// Test that simple messages work +add_task(async function test_simple_args() { + let message = ["test_simple_args", Math.random()]; + let result = await worker.post("bounce", message); + Assert.equal(JSON.stringify(result), JSON.stringify(message)); +}); + +// Test that it works when we don't provide a message +add_task(async function test_no_args() { + let result = await worker.post("bounce"); + Assert.equal(JSON.stringify(result), JSON.stringify([])); +}); + +// Test that messages with promise work +add_task(async function test_promise_args() { + let message = ["test_promise_args", Promise.resolve(Math.random())]; + let stringified = JSON.stringify(await Promise.resolve(Promise.all(message))); + let result = await worker.post("bounce", message); + Assert.equal(JSON.stringify(result), stringified); +}); + +// Test that messages with delayed promise work +add_task(async function test_delayed_promise_args() { + let promise = new Promise(resolve => + setTimeout(() => resolve(Math.random()), 10) + ); + let message = ["test_delayed_promise_args", promise]; + let stringified = JSON.stringify(await Promise.resolve(Promise.all(message))); + let result = await worker.post("bounce", message); + Assert.equal(JSON.stringify(result), stringified); +}); + +// Test that messages with rejected promise cause appropriate errors +add_task(async function test_rejected_promise_args() { + let error = new Error(); + let message = ["test_promise_args", Promise.reject(error)]; + try { + await worker.post("bounce", message); + do_throw("I shound have thrown an error by now"); + } catch (ex) { + if (ex != error) { + throw ex; + } + info("I threw the right error"); + } +}); + +// Test that we can transfer to the worker using argument `transfer` +add_task(async function test_transfer_args() { + let array = new Uint8Array(4); + for (let i = 0; i < 4; ++i) { + array[i] = i; + } + Assert.equal(array.buffer.byteLength, 4, "The buffer is not detached yet"); + + let result = ( + await worker.post("bounce", [array.buffer], [], [array.buffer]) + )[0]; + + // Check that the buffer has been sent + Assert.equal(array.buffer.byteLength, 0, "The buffer has been detached"); + + // Check that the result is correct + Assert.equal(result.byteLength, 4, "The result has the right size"); + let array2 = new Uint8Array(result); + for (let i = 0; i < 4; ++i) { + Assert.equal(array2[i], i); + } +}); + +// Test that we can transfer to the worker using an instance of `Meta` +add_task(async function test_transfer_with_meta() { + let array = new Uint8Array(4); + for (let i = 0; i < 4; ++i) { + array[i] = i; + } + Assert.equal(array.buffer.byteLength, 4, "The buffer is not detached yet"); + + let message = new BasePromiseWorker.Meta(array, { + transfers: [array.buffer], + }); + let result = (await worker.post("bounce", [message]))[0]; + + // Check that the buffer has been sent + Assert.equal(array.buffer.byteLength, 0, "The buffer has been detached"); + + // Check that the result is correct + Assert.equal( + Object.prototype.toString.call(result), + "[object Uint8Array]", + "The result appears to be a Typed Array" + ); + Assert.equal(result.byteLength, 4, "The result has the right size"); + + for (let i = 0; i < 4; ++i) { + Assert.equal(result[i], i); + } +}); + +add_task(async function test_throw_error() { + try { + await worker.post("throwError", ["error message"]); + Assert.ok(false, "should have thrown"); + } catch (ex) { + Assert.equal(ex.message, "Error: error message"); + } +}); + +add_task(async function test_terminate() { + let previousWorker = worker._worker; + + // Send two messages that we'll expect to be rejected. + let message = ["test_simple_args", Math.random()]; + let promise1 = worker.post("bounce", message); + let promise2 = worker.post("throwError", ["error message"]); + // Skip a few beats so we can be sure that the two messages are in the queue. + await Promise.resolve(); + await Promise.resolve(); + + worker.terminate(); + + await Assert.rejects( + promise1, + /worker terminated/, + "Pending promise should be rejected" + ); + await Assert.rejects( + promise2, + /worker terminated/, + "Pending promise should be rejected" + ); + + // Unfortunately, there's no real way to check whether a terminate worked from + // the JS API. We'll just have to assume it worked. + + // Post and test a simple message to ensure that the worker has been re-instantiated. + message = ["test_simple_args", Math.random()]; + let result = await worker.post("bounce", message); + Assert.equal(JSON.stringify(result), JSON.stringify(message)); + Assert.notEqual( + worker._worker, + previousWorker, + "ChromeWorker instances should differ" + ); +}); diff --git a/toolkit/components/promiseworker/tests/xpcshell/test_PromiseESM.js b/toolkit/components/promiseworker/tests/xpcshell/test_PromiseESM.js new file mode 100644 index 0000000000..71eb046f6c --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/test_PromiseESM.js @@ -0,0 +1,21 @@ +/* Any copyright is dedicated to the Public Domain. + * http://creativecommons.org/publicdomain/zero/1.0/ */ + +"use strict"; + +const { BasePromiseWorker } = ChromeUtils.importESModule( + "resource://gre/modules/PromiseWorker.sys.mjs" +); + +// Worker must be loaded from a chrome:// uri, not a file:// +// uri, so we first need to load it. + +var WORKER_SOURCE_URI = "chrome://promiseworker/content/worker.mjs"; +do_load_manifest("data/chrome.manifest"); +var worker = new BasePromiseWorker(WORKER_SOURCE_URI, { type: "module" }); + +add_task(async function test_simple_args() { + let message = ["test_simple_args", Math.random()]; + let result = await worker.post("bounce", message); + Assert.equal(JSON.stringify(result), JSON.stringify(message)); +}); diff --git a/toolkit/components/promiseworker/tests/xpcshell/xpcshell.toml b/toolkit/components/promiseworker/tests/xpcshell/xpcshell.toml new file mode 100644 index 0000000000..b87eac707f --- /dev/null +++ b/toolkit/components/promiseworker/tests/xpcshell/xpcshell.toml @@ -0,0 +1,12 @@ +[DEFAULT] +head = "" +skip-if = ["os == 'android'"] +support-files = [ + "data/worker.js", + "data/worker.mjs", + "data/chrome.manifest", +] + +["test_Promise.js"] + +["test_PromiseESM.js"] diff --git a/toolkit/components/promiseworker/worker/GeneratePromiseWorkerScript.py b/toolkit/components/promiseworker/worker/GeneratePromiseWorkerScript.py new file mode 100644 index 0000000000..49ffe4aa05 --- /dev/null +++ b/toolkit/components/promiseworker/worker/GeneratePromiseWorkerScript.py @@ -0,0 +1,69 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +import re + +begin_script_only_re = re.compile("^// #BEGIN_SCRIPT_ONLY") +end_script_only_re = re.compile("^// #END_SCRIPT_ONLY") +export_re = re.compile("^// #EXPORT (.+)") + + +def process_file(template_js, kind): + lines = [] + is_script_only = False + exports = [] + + with open(template_js, "r") as f: + for line in f: + if kind == "module": + if is_script_only: + m = end_script_only_re.match(line) + if m: + is_script_only = False + + # NOTE: Put an empty line to keep the line number same. + lines.append("\n") + continue + else: + m = begin_script_only_re.match(line) + if m: + is_script_only = True + lines.append("\n") + continue + else: + m = end_script_only_re.match(line) + if m: + lines.append("\n") + continue + + m = begin_script_only_re.match(line) + if m: + lines.append("\n") + continue + + m = export_re.match(line) + if m: + name = m.group(1) + + if kind == "script": + lines.append(f"exports.{name} = {name};\n") + else: + exports.append(name) + lines.append("\n") + continue + + lines.append(line) + + if kind == "module": + lines.append("export const PromiseWorker = { " + ", ".join(exports) + " };\n") + + return "".join(lines) + + +def generate_script(output, template_js): + output.write(process_file(template_js, "script")) + + +def generate_module(output, template_js): + output.write(process_file(template_js, "module")) diff --git a/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js new file mode 100644 index 0000000000..c8203db48f --- /dev/null +++ b/toolkit/components/promiseworker/worker/PromiseWorker.template.worker.js @@ -0,0 +1,243 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/* eslint-env commonjs */ + +/** + * A wrapper around `self` with extended capabilities designed + * to simplify main thread-to-worker thread asynchronous function calls. + * + * This wrapper: + * - groups requests and responses as a method `post` that returns a `Promise`; + * - ensures that exceptions thrown on the worker thread are correctly serialized; + * - provides some utilities for benchmarking various operations. + * + * Generally, you should use PromiseWorker.js or PromiseWorker.mjs along with + * its main thread-side counterpart PromiseWorker.sys.mjs. + */ + +// #BEGIN_SCRIPT_ONLY +"use strict"; +// #END_SCRIPT_ONLY + +if (typeof Components != "undefined") { + throw new Error("This module is meant to be used from the worker thread"); +} +// #BEGIN_SCRIPT_ONLY +if (typeof require == "undefined" || typeof module == "undefined") { + throw new Error( + "this module is meant to be imported using the implementation of require() at resource://gre/modules/workers/require.js" + ); +} + +/* import-globals-from /toolkit/components/workerloader/require.js */ +importScripts("resource://gre/modules/workers/require.js"); +// #END_SCRIPT_ONLY + +/** + * Built-in JavaScript exceptions that may be serialized without + * loss of information. + */ +const EXCEPTION_NAMES = { + EvalError: "EvalError", + InternalError: "InternalError", + RangeError: "RangeError", + ReferenceError: "ReferenceError", + SyntaxError: "SyntaxError", + TypeError: "TypeError", + URIError: "URIError", +}; + +/** + * A constructor used to return data to the caller thread while + * also executing some specific treatment (e.g. shutting down + * the current thread, transmitting data instead of copying it). + * + * @param {object=} data The data to return to the caller thread. + * @param {object=} meta Additional instructions, as an object + * that may contain the following fields: + * - {bool} shutdown If |true|, shut down the current thread after + * having sent the result. + * - {Array} transfers An array of objects that should be transferred + * instead of being copied. + * + * @constructor + */ +function Meta(data, meta) { + this.data = data; + this.meta = meta; +} +// #EXPORT Meta + +/** + * Base class for a worker. + * + * Derived classes are expected to provide the following methods: + * { + * dispatch: function(method, args) { + * // Dispatch a call to method `method` with args `args` + * }, + * log: function(...msg) { + * // Log (or discard) messages (optional) + * }, + * postMessage: function(message, ...transfers) { + * // Post a message to the main thread + * }, + * close: function() { + * // Close the worker + * } + * } + * + * By default, the AbstractWorker is not connected to a message port, + * hence will not receive anything. + * + * To connect it, use `onmessage`, as follows: + * self.addEventListener("message", msg => myWorkerInstance.handleMessage(msg)); + * To handle rejected promises we receive from handleMessage, we must connect it to + * the onError handler as follows: + * self.addEventListener("unhandledrejection", function(error) { + * throw error.reason; + * }); + */ +function AbstractWorker(agent) { + this._agent = agent; +} +AbstractWorker.prototype = { + // Default logger: discard all messages + log() {}, + + /** + * Handle a message. + */ + async handleMessage(msg) { + let data = msg.data; + this.log("Received message", data); + let id = data.id; + + let start; + let options; + if (data.args) { + options = data.args[data.args.length - 1]; + } + // If |outExecutionDuration| option was supplied, start measuring the + // duration of the operation. + if ( + options && + typeof options === "object" && + "outExecutionDuration" in options + ) { + start = Date.now(); + } + + let result; + let exn; + let durationMs; + let method = data.fun; + try { + this.log("Calling method", method); + result = await this.dispatch(method, data.args); + this.log("Method", method, "succeeded"); + } catch (ex) { + exn = ex; + this.log( + "Error while calling agent method", + method, + exn, + exn.moduleStack || exn.stack || "" + ); + } + + if (start) { + // Record duration + durationMs = Date.now() - start; + this.log("Method took", durationMs, "ms"); + } + + // Now, post a reply, possibly as an uncaught error. + // We post this message from outside the |try ... catch| block + // to avoid capturing errors that take place during |postMessage| and + // built-in serialization. + if (!exn) { + this.log("Sending positive reply", result, "id is", id); + if (result instanceof Meta) { + if ("transfers" in result.meta) { + // Take advantage of zero-copy transfers + this.postMessage( + { ok: result.data, id, durationMs }, + result.meta.transfers + ); + } else { + this.postMessage({ ok: result.data, id, durationMs }); + } + if (result.meta.shutdown || false) { + // Time to close the worker + this.close(); + } + } else { + this.postMessage({ ok: result, id, durationMs }); + } + } else if (exn.constructor.name == "DOMException") { + // We can receive instances of DOMExceptions with file I/O. + // DOMExceptions are not yet serializable (Bug 1561357) and must be + // handled differently, as they only have a name and message + this.log("Sending back DOM exception", exn.constructor.name); + let error = { + exn: exn.constructor.name, + message: exn.message, + }; + this.postMessage({ fail: error, id, durationMs }); + } else if (exn.constructor.name in EXCEPTION_NAMES) { + // Rather than letting the DOM mechanism [de]serialize built-in + // JS errors, which loses lots of information (in particular, + // the constructor name, the moduleName and the moduleStack), + // we [de]serialize them manually with a little more care. + this.log("Sending back exception", exn.constructor.name, "id is", id); + let error = { + exn: exn.constructor.name, + message: exn.message, + fileName: exn.moduleName || exn.fileName, + lineNumber: exn.lineNumber, + stack: exn.moduleStack, + }; + this.postMessage({ fail: error, id, durationMs }); + } else if ("toMsg" in exn) { + // Extension mechanism for exception [de]serialization. We + // assume that any exception with a method `toMsg()` knows how + // to serialize itself. The other side is expected to have + // registered a deserializer using the `ExceptionHandlers` + // object. + this.log( + "Sending back an error that knows how to serialize itself", + exn, + "id is", + id + ); + let msg = exn.toMsg(); + this.postMessage({ fail: msg, id, durationMs }); + } else { + // If we encounter an exception for which we have no + // serialization mechanism in place, we have no choice but to + // let the DOM handle said [de]serialization. We can just + // attempt to mitigate the data loss by injecting `moduleName` and + // `moduleStack`. + this.log( + "Sending back regular error", + exn, + exn.moduleStack || exn.stack, + "id is", + id + ); + + try { + // Attempt to introduce human-readable filename and stack + exn.filename = exn.moduleName; + exn.stack = exn.moduleStack; + } catch (_) { + // Nothing we can do + } + throw exn; + } + }, +}; +// #EXPORT AbstractWorker diff --git a/toolkit/components/promiseworker/worker/moz.build b/toolkit/components/promiseworker/worker/moz.build new file mode 100644 index 0000000000..2aaa6d0175 --- /dev/null +++ b/toolkit/components/promiseworker/worker/moz.build @@ -0,0 +1,24 @@ +# -*- Mode: python; indent-tabs-mode: nil; tab-width: 40 -*- +# vim: set filetype=python: +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +EXTRA_JS_MODULES.workers = [ + "!PromiseWorker.js", + "!PromiseWorker.mjs", +] + +GeneratedFile( + "PromiseWorker.js", + inputs=["PromiseWorker.template.worker.js"], + script="GeneratePromiseWorkerScript.py", + entry_point="generate_script", +) + +GeneratedFile( + "PromiseWorker.mjs", + inputs=["PromiseWorker.template.worker.js"], + script="GeneratePromiseWorkerScript.py", + entry_point="generate_module", +) |