summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/streams/transform-streams/reentrant-strategies.any.js
blob: fc2f91886659f620867058306481b402970ef76d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// META: global=window,worker
// META: script=../resources/recording-streams.js
// META: script=../resources/rs-utils.js
// META: script=../resources/test-utils.js
'use strict';

// The size() function of readableStrategy can re-entrantly call back into the TransformStream implementation. This
// makes it risky to cache state across the call to ReadableStreamDefaultControllerEnqueue. These tests attempt to catch
// such errors. They are separated from the other strategy tests because no real user code should ever do anything like
// this.
//
// There is no such issue with writableStrategy size() because it is never called from within TransformStream
// algorithms.

const error1 = new Error('error1');
error1.name = 'error1';

promise_test(() => {
  let controller;
  let calls = 0;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      ++calls;
      if (calls < 2) {
        controller.enqueue('b');
      }
      return 1;
    },
    highWaterMark: Infinity
  });
  const writer = ts.writable.getWriter();
  return Promise.all([writer.write('a'), writer.close()])
      .then(() => readableStreamToArray(ts.readable))
      .then(array => assert_array_equals(array, ['b', 'a'], 'array should contain two chunks'));
}, 'enqueue() inside size() should work');

promise_test(() => {
  let controller;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      // The readable queue is empty.
      controller.terminate();
      // The readable state has gone from "readable" to "closed".
      return 1;
      // This chunk will be enqueued, but will be impossible to read because the state is already "closed".
    },
    highWaterMark: Infinity
  });
  const writer = ts.writable.getWriter();
  return writer.write('a')
      .then(() => readableStreamToArray(ts.readable))
      .then(array => assert_array_equals(array, [], 'array should contain no chunks'));
  // The chunk 'a' is still in readable's queue. readable is closed so 'a' cannot be read. writable's queue is empty and
  // it is still writable.
}, 'terminate() inside size() should work');

promise_test(t => {
  let controller;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      controller.error(error1);
      return 1;
    },
    highWaterMark: Infinity
  });
  const writer = ts.writable.getWriter();
  return writer.write('a')
      .then(() => promise_rejects_exactly(t, error1, ts.readable.getReader().read(), 'read() should reject'));
}, 'error() inside size() should work');

promise_test(() => {
  let controller;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      assert_equals(controller.desiredSize, 1, 'desiredSize should be 1');
      return 1;
    },
    highWaterMark: 1
  });
  const writer = ts.writable.getWriter();
  return Promise.all([writer.write('a'), writer.close()])
      .then(() => readableStreamToArray(ts.readable))
      .then(array => assert_array_equals(array, ['a'], 'array should contain one chunk'));
}, 'desiredSize inside size() should work');

promise_test(t => {
  let cancelPromise;
  const ts = new TransformStream({}, undefined, {
    size() {
      cancelPromise = ts.readable.cancel(error1);
      return 1;
    },
    highWaterMark: Infinity
  });
  const writer = ts.writable.getWriter();
  return writer.write('a')
      .then(() => {
        promise_rejects_exactly(t, error1, writer.closed, 'writer.closed should reject');
        return cancelPromise;
      });
}, 'readable cancel() inside size() should work');

promise_test(() => {
  let controller;
  let pipeToPromise;
  const ws = recordingWritableStream();
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      if (!pipeToPromise) {
        pipeToPromise = ts.readable.pipeTo(ws);
      }
      return 1;
    },
    highWaterMark: 1
  });
  // Allow promise returned by start() to resolve so that enqueue() will happen synchronously.
  return delay(0).then(() => {
    controller.enqueue('a');
    assert_not_equals(pipeToPromise, undefined);

    // Some pipeTo() implementations need an additional chunk enqueued in order for the first one to be processed. See
    // https://github.com/whatwg/streams/issues/794 for background.
    controller.enqueue('a');

    // Give pipeTo() a chance to process the queued chunks.
    return delay(0);
  }).then(() => {
    assert_array_equals(ws.events, ['write', 'a', 'write', 'a'], 'ws should contain two chunks');
    controller.terminate();
    return pipeToPromise;
  }).then(() => {
    assert_array_equals(ws.events, ['write', 'a', 'write', 'a', 'close'], 'target should have been closed');
  });
}, 'pipeTo() inside size() should work');

promise_test(() => {
  let controller;
  let readPromise;
  let calls = 0;
  let reader;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      // This is triggered by controller.enqueue(). The queue is empty and there are no pending reads. pull() is called
      // synchronously, allowing transform() to proceed asynchronously. This results in a second call to enqueue(),
      // which resolves this pending read() without calling size() again.
      readPromise = reader.read();
      ++calls;
      return 1;
    },
    highWaterMark: 0
  });
  reader = ts.readable.getReader();
  const writer = ts.writable.getWriter();
  let writeResolved = false;
  const writePromise = writer.write('b').then(() => {
    writeResolved = true;
  });
  return flushAsyncEvents().then(() => {
    assert_false(writeResolved);
    controller.enqueue('a');
    assert_equals(calls, 1, 'size() should have been called once');
    return delay(0);
  }).then(() => {
    assert_true(writeResolved);
    assert_equals(calls, 1, 'size() should only be called once');
    return readPromise;
  }).then(({ value, done }) => {
    assert_false(done, 'done should be false');
    // See https://github.com/whatwg/streams/issues/794 for why this chunk is not 'a'.
    assert_equals(value, 'b', 'chunk should have been read');
    assert_equals(calls, 1, 'calls should still be 1');
    return writePromise;
  });
}, 'read() inside of size() should work');

promise_test(() => {
  let writer;
  let writePromise1;
  let calls = 0;
  const ts = new TransformStream({}, undefined, {
    size() {
      ++calls;
      if (calls < 2) {
        writePromise1 = writer.write('a');
      }
      return 1;
    },
    highWaterMark: Infinity
  });
  writer = ts.writable.getWriter();
  // Give pull() a chance to be called.
  return delay(0).then(() => {
    // This write results in a synchronous call to transform(), enqueue(), and size().
    const writePromise2 = writer.write('b');
    assert_equals(calls, 1, 'size() should have been called once');
    return Promise.all([writePromise1, writePromise2, writer.close()]);
  }).then(() => {
    assert_equals(calls, 2, 'size() should have been called twice');
    return readableStreamToArray(ts.readable);
  }).then(array => {
    assert_array_equals(array, ['b', 'a'], 'both chunks should have been enqueued');
    assert_equals(calls, 2, 'calls should still be 2');
  });
}, 'writer.write() inside size() should work');

promise_test(() => {
  let controller;
  let writer;
  let writePromise;
  let calls = 0;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      ++calls;
      if (calls < 2) {
        writePromise = writer.write('a');
      }
      return 1;
    },
    highWaterMark: Infinity
  });
  writer = ts.writable.getWriter();
  // Give pull() a chance to be called.
  return delay(0).then(() => {
    // This enqueue results in synchronous calls to size(), write(), transform() and enqueue().
    controller.enqueue('b');
    assert_equals(calls, 2, 'size() should have been called twice');
    return Promise.all([writePromise, writer.close()]);
  }).then(() => {
    return readableStreamToArray(ts.readable);
  }).then(array => {
    // Because one call to enqueue() is nested inside the other, they finish in the opposite order that they were
    // called, so the chunks end up reverse order.
    assert_array_equals(array, ['a', 'b'], 'both chunks should have been enqueued');
    assert_equals(calls, 2, 'calls should still be 2');
  });
}, 'synchronous writer.write() inside size() should work');

promise_test(() => {
  let writer;
  let closePromise;
  let controller;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      closePromise = writer.close();
      return 1;
    },
    highWaterMark: 1
  });
  writer = ts.writable.getWriter();
  const reader = ts.readable.getReader();
  // Wait for the promise returned by start() to be resolved so that the call to close() will result in a synchronous
  // call to TransformStreamDefaultSink.
  return delay(0).then(() => {
    controller.enqueue('a');
    return reader.read();
  }).then(({ value, done }) => {
    assert_false(done, 'done should be false');
    assert_equals(value, 'a', 'value should be correct');
    return reader.read();
  }).then(({ done }) => {
    assert_true(done, 'done should be true');
    return closePromise;
  });
}, 'writer.close() inside size() should work');

promise_test(t => {
  let abortPromise;
  let controller;
  const ts = new TransformStream({
    start(c) {
      controller = c;
    }
  }, undefined, {
    size() {
      abortPromise = ts.writable.abort(error1);
      return 1;
    },
    highWaterMark: 1
  });
  const reader = ts.readable.getReader();
  // Wait for the promise returned by start() to be resolved so that the call to abort() will result in a synchronous
  // call to TransformStreamDefaultSink.
  return delay(0).then(() => {
    controller.enqueue('a');
    return Promise.all([promise_rejects_exactly(t, error1, reader.read(), 'read() should reject'), abortPromise]);
  });
}, 'writer.abort() inside size() should work');