summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/streams/transform-streams/backpressure.any.js
blob: 47a21fb7e71fed837832dd1b91b5100a3513131d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// META: global=window,worker,shadowrealm
// META: script=../resources/recording-streams.js
// META: script=../resources/test-utils.js
'use strict';

const error1 = new Error('error1 message');
error1.name = 'error1';

promise_test(() => {
  const ts = recordingTransformStream();
  const writer = ts.writable.getWriter();
  // This call never resolves.
  writer.write('a');
  return flushAsyncEvents().then(() => {
    assert_array_equals(ts.events, [], 'transform should not be called');
  });
}, 'backpressure allows no transforms with a default identity transform and no reader');

promise_test(() => {
  const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  // This call to write() resolves asynchronously.
  writer.write('a');
  // This call to write() waits for backpressure that is never relieved and never calls transform().
  writer.write('b');
  return flushAsyncEvents().then(() => {
    assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once');
  });
}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader');

promise_test(() => {
  // Without a transform() implementation, recordingTransformStream() never enqueues anything.
  const ts = recordingTransformStream({
    transform() {
      // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform()
      // keeps being called.
    }
  }, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  const writePromises = [];
  for (let i = 0; i < 4; ++i) {
    writePromises.push(writer.write(i));
  }
  return Promise.all(writePromises).then(() => {
    assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3],
                        'all 4 events should be transformed');
  });
}, 'transform() should keep being called as long as there is no backpressure');

promise_test(() => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  const reader = ts.readable.getReader();
  const events = [];
  const writerPromises = [
    writer.write('a').then(() => events.push('a')),
    writer.write('b').then(() => events.push('b')),
    writer.close().then(() => events.push('closed'))];
  return delay(0).then(() => {
    assert_array_equals(events, ['a'], 'the first write should have resolved');
    return reader.read();
  }).then(({ value, done }) => {
    assert_false(done, 'done should not be true');
    assert_equals('a', value, 'value should be "a"');
    return delay(0);
  }).then(() => {
    assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved');
    return reader.read();
  }).then(({ value, done }) => {
    assert_false(done, 'done should still not be true');
    assert_equals('b', value, 'value should be "b"');
    return reader.read();
  }).then(({ done }) => {
    assert_true(done, 'done should be true');
    return writerPromises;
  });
}, 'writes should resolve as soon as transform completes');

promise_test(() => {
  const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 });
  const writer = ts.writable.getWriter();
  const reader = ts.readable.getReader();
  const readPromise = reader.read();
  writer.write('a');
  return readPromise.then(({ value, done }) => {
    assert_false(done, 'not done');
    assert_equals(value, 'a', 'value should be "a"');
  });
}, 'calling pull() before the first write() with backpressure should work');

promise_test(() => {
  let reader;
  const ts = recordingTransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk);
      return reader.read();
    }
  }, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  reader = ts.readable.getReader();
  return writer.write('a');
}, 'transform() should be able to read the chunk it just enqueued');

promise_test(() => {
  let resolveTransform;
  const transformPromise = new Promise(resolve => {
    resolveTransform = resolve;
  });
  const ts = recordingTransformStream({
    transform() {
      return transformPromise;
    }
  }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity }));
  const writer = ts.writable.getWriter();
  assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
  return delay(0).then(() => {
    writer.write('a');
    assert_array_equals(ts.events, ['transform', 'a']);
    assert_equals(writer.desiredSize, 0, 'desiredSize should be 0');
    return flushAsyncEvents();
  }).then(() => {
    assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0');
    resolveTransform();
    return delay(0);
  }).then(() => {
    assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
  });
}, 'blocking transform() should cause backpressure');

promise_test(t => {
  const ts = new TransformStream();
  ts.readable.cancel(error1);
  return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
}, 'writer.closed should resolve after readable is canceled during start');

promise_test(t => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
  });
}, 'writer.closed should resolve after readable is canceled with backpressure');

promise_test(t => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
  });
}, 'writer.closed should resolve after readable is canceled with no backpressure');

promise_test(() => {
  const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
  const writer = ts.writable.getWriter();
  return delay(0).then(() => {
    const writePromise = writer.write('a');
    ts.readable.cancel(error1);
    return writePromise;
  });
}, 'cancelling the readable should cause a pending write to resolve');

promise_test(t => {
  const rs = new ReadableStream();
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  ts.readable.cancel(error1);
  return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
}, 'cancelling the readable side of a TransformStream should abort an empty pipe');

promise_test(t => {
  const rs = new ReadableStream();
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
  });
}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup');

promise_test(t => {
  const rs = new ReadableStream({
    start(controller) {
      controller.enqueue('a');
      controller.enqueue('b');
      controller.enqueue('c');
    }
  });
  const ts = new TransformStream();
  const pipePromise = rs.pipeTo(ts.writable);
  // Allow data to flow into the pipe.
  return delay(0).then(() => {
    ts.readable.cancel(error1);
    return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
  });
}, 'cancelling the readable side of a TransformStream should abort a full pipe');