summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/common/dispatcher/dispatcher.js
blob: a0f9f43e622483c3ab4df2621a4e4be40692eebf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// Define a universal message passing API. It works cross-origin and across
// browsing context groups.
const dispatcher_path = "/common/dispatcher/dispatcher.py";
const dispatcher_url = new URL(dispatcher_path, location.href).href;

// Return a promise, limiting the number of concurrent accesses to a shared
// resources to |max_concurrent_access|.
const concurrencyLimiter = (max_concurrency) => {
  let pending = 0;
  let waiting = [];
  return async (task) => {
    pending++;
    if (pending > max_concurrency)
      await new Promise(resolve => waiting.push(resolve));
    let result = await task();
    pending--;
    waiting.shift()?.();
    return result;
  };
}

// Wait for a random amount of time in the range [10ms,100ms].
const randomDelay = () => {
  return new Promise(resolve => setTimeout(resolve, 10 + 90*Math.random()));
}

// Sending too many requests in parallel causes congestion. Limiting it improves
// throughput.
//
// Note: The following table has been determined on the test:
// ../cache-storage.tentative.https.html
// using Chrome with a 64 core CPU / 64GB ram, in release mode:
// ┌───────────┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬────┐
// │concurrency│ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 10│ 15│ 20│ 30│ 50│ 100│
// ├───────────┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┤
// │time (s)   │ 54│ 38│ 31│ 29│ 26│ 24│ 22│ 22│ 22│ 22│ 34│ 36 │
// └───────────┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴───┴────┘
const limiter = concurrencyLimiter(6);

// While requests to different remote contexts can go in parallel, we need to
// ensure that requests to each remote context are done in order. This maps a
// uuid to a queue of requests to send. A queue is processed until it is empty
// and then is deleted from the map.
const sendQueues = new Map();

// Sends a single item (with rate-limiting) and calls the associated resolver
// when it is successfully sent.
const sendItem = async function (uuid, resolver, message) {
  await limiter(async () => {
    // Requests might be dropped. Retry until getting a confirmation it has been
    // processed.
    while(1) {
      try {
        let response = await fetch(dispatcher_url + `?uuid=${uuid}`, {
          method: 'POST',
          body: message
        })
        if (await response.text() == "done") {
          resolver();
          return;
        }
      } catch (fetch_error) {}
      await randomDelay();
    };
  });
}

// While the queue is non-empty, send the next item. This is async and new items
// may be added to the queue while others are being sent.
const processQueue = async function (uuid, queue) {
  while (queue.length) {
    const [resolver, message] = queue.shift();
    await sendItem(uuid, resolver, message);
  }
  // The queue is empty, delete it.
  sendQueues.delete(uuid);
}

const send = async function (uuid, message) {
  const itemSentPromise = new Promise((resolve) => {
    const item = [resolve, message];
    if (sendQueues.has(uuid)) {
      // There is already a queue for `uuid`, just add to it and it will be processed.
      sendQueues.get(uuid).push(item);
    } else {
      // There is no queue for `uuid`, create it and start processing.
      const queue = [item];
      sendQueues.set(uuid, queue);
      processQueue(uuid, queue);
    }
  });
  // Wait until the item has been successfully sent.
  await itemSentPromise;
}

const receive = async function (uuid) {
  while(1) {
    let data = "not ready";
    try {
      data = await limiter(async () => {
        let response = await fetch(dispatcher_url + `?uuid=${uuid}`);
        return await response.text();
      });
    } catch (fetch_error) {}

    if (data == "not ready") {
      await randomDelay();
      continue;
    }

    return data;
  }
}

// Returns an URL. When called, the server sends toward the `uuid` queue the
// request headers. Useful for determining if something was requested with
// Cookies.
const showRequestHeaders = function(origin, uuid) {
  return origin + dispatcher_path + `?uuid=${uuid}&show-headers`;
}

// Same as above, except for the response is cacheable.
const cacheableShowRequestHeaders = function(origin, uuid) {
  return origin + dispatcher_path + `?uuid=${uuid}&cacheable&show-headers`;
}

// This script requires
// - `/common/utils.js` for `token()`.

// Returns the URL of a document that can be used as a `RemoteContext`.
//
// `uuid` should be a UUID uniquely identifying the given remote context.
// `options` has the following shape:
//
// {
//   host: (optional) Sets the returned URL's `host` property. Useful for
//     cross-origin executors.
//   protocol: (optional) Sets the returned URL's `protocol` property.
// }
function remoteExecutorUrl(uuid, options) {
  const url = new URL("/common/dispatcher/remote-executor.html", location);
  url.searchParams.set("uuid", uuid);

  if (options?.host) {
    url.host = options.host;
  }

  if (options?.protocol) {
    url.protocol = options.protocol;
  }

  return url;
}

// Represents a remote executor. For more detailed explanation see `README.md`.
class RemoteContext {
  // `uuid` is a UUID string that identifies the remote context and should
  // match with the `uuid` parameter of the URL of the remote context.
  constructor(uuid) {
    this.context_id = uuid;
  }

  // Evaluates the script `expr` on the executor.
  // - If `expr` is evaluated to a Promise that is resolved with a value:
  //   `execute_script()` returns a Promise resolved with the value.
  // - If `expr` is evaluated to a non-Promise value:
  //   `execute_script()` returns a Promise resolved with the value.
  // - If `expr` throws an error or is evaluated to a Promise that is rejected:
  //   `execute_script()` returns a rejected Promise with the error's
  //   `message`.
  //   Note that currently the type of error (e.g. DOMException) is not
  //   preserved, except for `TypeError`.
  // The values should be able to be serialized by JSON.stringify().
  async execute_script(fn, args) {
    const receiver = token();
    await this.send({receiver: receiver, fn: fn.toString(), args: args});
    const response = JSON.parse(await receive(receiver));
    if (response.status === 'success') {
      return response.value;
    }

    // exception
    if (response.name === 'TypeError') {
      throw new TypeError(response.value);
    }
    throw new Error(response.value);
  }

  async send(msg) {
    return await send(this.context_id, JSON.stringify(msg));
  }
};

class Executor {
  constructor(uuid) {
    this.uuid = uuid;

    // If `suspend_callback` is not `null`, the executor should be suspended
    // when there are no ongoing tasks.
    this.suspend_callback = null;

    this.execute();
  }

  // Wait until there are no ongoing tasks nor fetch requests for polling
  // tasks, and then suspend the executor and call `callback()`.
  // Navigation from the executor page should be triggered inside `callback()`,
  // to avoid conflict with in-flight fetch requests.
  suspend(callback) {
    this.suspend_callback = callback;
  }

  resume() {
  }

  async execute() {
    while(true) {
      if (this.suspend_callback !== null) {
        this.suspend_callback();
        this.suspend_callback = null;
        // Wait for `resume()` to be called.
        await new Promise(resolve => this.resume = resolve);

        // Workaround for https://crbug.com/1244230.
        // Without this workaround, the executor is resumed and the fetch
        // request to poll the next task is initiated synchronously from
        // pageshow event after the page restored from BFCache, and the fetch
        // request promise is never resolved (and thus the test results in
        // timeout) due to https://crbug.com/1244230. The root cause is not yet
        // known, but setTimeout() with 0ms causes the resume triggered on
        // another task and seems to resolve the issue.
        await new Promise(resolve => setTimeout(resolve, 0));

        continue;
      }

      const task = JSON.parse(await receive(this.uuid));

      let response;
      try {
        const value = await eval(task.fn).apply(null, task.args);
        response = JSON.stringify({
          status: 'success',
          value: value
        });
      } catch(e) {
        response = JSON.stringify({
          status: 'exception',
          name: e.name,
          value: e.message
        });
      }
      await send(task.receiver, response);
    }
  }
}