diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-21 20:56:19 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-21 20:56:19 +0000 |
commit | 0b6210cd37b68b94252cb798598b12974a20e1c1 (patch) | |
tree | e371686554a877842d95aa94f100bee552ff2a8e /test/wpt/tests/common/dispatcher/dispatcher.js | |
parent | Initial commit. (diff) | |
download | node-undici-upstream.tar.xz node-undici-upstream.zip |
Adding upstream version 5.28.2+dfsg1+~cs23.11.12.3.upstream/5.28.2+dfsg1+_cs23.11.12.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'test/wpt/tests/common/dispatcher/dispatcher.js')
-rw-r--r-- | test/wpt/tests/common/dispatcher/dispatcher.js | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/test/wpt/tests/common/dispatcher/dispatcher.js b/test/wpt/tests/common/dispatcher/dispatcher.js new file mode 100644 index 0000000..a0f9f43 --- /dev/null +++ b/test/wpt/tests/common/dispatcher/dispatcher.js @@ -0,0 +1,256 @@ +// Define a universal message passing API. It works cross-origin and across +// browsing context groups. +const dispatcher_path = "/common/dispatcher/dispatcher.py"; +const dispatcher_url = new URL(dispatcher_path, location.href).href; + +// Return a promise, limiting the number of concurrent accesses to a shared +// resources to |max_concurrent_access|. +const concurrencyLimiter = (max_concurrency) => { + let pending = 0; + let waiting = []; + return async (task) => { + pending++; + if (pending > max_concurrency) + await new Promise(resolve => waiting.push(resolve)); + let result = await task(); + pending--; + waiting.shift()?.(); + return result; + }; +} + +// Wait for a random amount of time in the range [10ms,100ms]. +const randomDelay = () => { + return new Promise(resolve => setTimeout(resolve, 10 + 90*Math.random())); +} + +// Sending too many requests in parallel causes congestion. Limiting it improves +// throughput. +// +// Note: The following table has been determined on the test: +// ../cache-storage.tentative.https.html +// using Chrome with a 64 core CPU / 64GB ram, in release mode: +// ┌───────────┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬────┐ +// │concurrency│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 10│ 15│ 20│ 30│ 50│ 100│ +// ├───────────┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┤ +// │time (s) │ 54│ 38│ 31│ 29│ 26│ 24│ 22│ 22│ 22│ 22│ 34│ 36 │ +// └───────────┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴────┘ +const limiter = concurrencyLimiter(6); + +// While requests to different remote contexts can go in parallel, we need to +// ensure that requests to each remote context are done in order. This maps a +// uuid to a queue of requests to send. A queue is processed until it is empty +// and then is deleted from the map. +const sendQueues = new Map(); + +// Sends a single item (with rate-limiting) and calls the associated resolver +// when it is successfully sent. +const sendItem = async function (uuid, resolver, message) { + await limiter(async () => { + // Requests might be dropped. Retry until getting a confirmation it has been + // processed. + while(1) { + try { + let response = await fetch(dispatcher_url + `?uuid=${uuid}`, { + method: 'POST', + body: message + }) + if (await response.text() == "done") { + resolver(); + return; + } + } catch (fetch_error) {} + await randomDelay(); + }; + }); +} + +// While the queue is non-empty, send the next item. This is async and new items +// may be added to the queue while others are being sent. +const processQueue = async function (uuid, queue) { + while (queue.length) { + const [resolver, message] = queue.shift(); + await sendItem(uuid, resolver, message); + } + // The queue is empty, delete it. + sendQueues.delete(uuid); +} + +const send = async function (uuid, message) { + const itemSentPromise = new Promise((resolve) => { + const item = [resolve, message]; + if (sendQueues.has(uuid)) { + // There is already a queue for `uuid`, just add to it and it will be processed. + sendQueues.get(uuid).push(item); + } else { + // There is no queue for `uuid`, create it and start processing. + const queue = [item]; + sendQueues.set(uuid, queue); + processQueue(uuid, queue); + } + }); + // Wait until the item has been successfully sent. + await itemSentPromise; +} + +const receive = async function (uuid) { + while(1) { + let data = "not ready"; + try { + data = await limiter(async () => { + let response = await fetch(dispatcher_url + `?uuid=${uuid}`); + return await response.text(); + }); + } catch (fetch_error) {} + + if (data == "not ready") { + await randomDelay(); + continue; + } + + return data; + } +} + +// Returns an URL. When called, the server sends toward the `uuid` queue the +// request headers. Useful for determining if something was requested with +// Cookies. +const showRequestHeaders = function(origin, uuid) { + return origin + dispatcher_path + `?uuid=${uuid}&show-headers`; +} + +// Same as above, except for the response is cacheable. +const cacheableShowRequestHeaders = function(origin, uuid) { + return origin + dispatcher_path + `?uuid=${uuid}&cacheable&show-headers`; +} + +// This script requires +// - `/common/utils.js` for `token()`. + +// Returns the URL of a document that can be used as a `RemoteContext`. +// +// `uuid` should be a UUID uniquely identifying the given remote context. +// `options` has the following shape: +// +// { +// host: (optional) Sets the returned URL's `host` property. Useful for +// cross-origin executors. +// protocol: (optional) Sets the returned URL's `protocol` property. +// } +function remoteExecutorUrl(uuid, options) { + const url = new URL("/common/dispatcher/remote-executor.html", location); + url.searchParams.set("uuid", uuid); + + if (options?.host) { + url.host = options.host; + } + + if (options?.protocol) { + url.protocol = options.protocol; + } + + return url; +} + +// Represents a remote executor. For more detailed explanation see `README.md`. +class RemoteContext { + // `uuid` is a UUID string that identifies the remote context and should + // match with the `uuid` parameter of the URL of the remote context. + constructor(uuid) { + this.context_id = uuid; + } + + // Evaluates the script `expr` on the executor. + // - If `expr` is evaluated to a Promise that is resolved with a value: + // `execute_script()` returns a Promise resolved with the value. + // - If `expr` is evaluated to a non-Promise value: + // `execute_script()` returns a Promise resolved with the value. + // - If `expr` throws an error or is evaluated to a Promise that is rejected: + // `execute_script()` returns a rejected Promise with the error's + // `message`. + // Note that currently the type of error (e.g. DOMException) is not + // preserved, except for `TypeError`. + // The values should be able to be serialized by JSON.stringify(). + async execute_script(fn, args) { + const receiver = token(); + await this.send({receiver: receiver, fn: fn.toString(), args: args}); + const response = JSON.parse(await receive(receiver)); + if (response.status === 'success') { + return response.value; + } + + // exception + if (response.name === 'TypeError') { + throw new TypeError(response.value); + } + throw new Error(response.value); + } + + async send(msg) { + return await send(this.context_id, JSON.stringify(msg)); + } +}; + +class Executor { + constructor(uuid) { + this.uuid = uuid; + + // If `suspend_callback` is not `null`, the executor should be suspended + // when there are no ongoing tasks. + this.suspend_callback = null; + + this.execute(); + } + + // Wait until there are no ongoing tasks nor fetch requests for polling + // tasks, and then suspend the executor and call `callback()`. + // Navigation from the executor page should be triggered inside `callback()`, + // to avoid conflict with in-flight fetch requests. + suspend(callback) { + this.suspend_callback = callback; + } + + resume() { + } + + async execute() { + while(true) { + if (this.suspend_callback !== null) { + this.suspend_callback(); + this.suspend_callback = null; + // Wait for `resume()` to be called. + await new Promise(resolve => this.resume = resolve); + + // Workaround for https://crbug.com/1244230. + // Without this workaround, the executor is resumed and the fetch + // request to poll the next task is initiated synchronously from + // pageshow event after the page restored from BFCache, and the fetch + // request promise is never resolved (and thus the test results in + // timeout) due to https://crbug.com/1244230. The root cause is not yet + // known, but setTimeout() with 0ms causes the resume triggered on + // another task and seems to resolve the issue. + await new Promise(resolve => setTimeout(resolve, 0)); + + continue; + } + + const task = JSON.parse(await receive(this.uuid)); + + let response; + try { + const value = await eval(task.fn).apply(null, task.args); + response = JSON.stringify({ + status: 'success', + value: value + }); + } catch(e) { + response = JSON.stringify({ + status: 'exception', + name: e.name, + value: e.message + }); + } + await send(task.receiver, response); + } + } +} |