1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
<!DOCTYPE html>
<meta charset="utf-8">
<script src="/resources/testharness.js"></script>
<script src="/resources/testharnessreport.js"></script>
<script src="resources/helpers.js"></script>
<script src="../resources/recording-streams.js"></script>
<script src="../resources/test-utils.js"></script>
<script>
'use strict';
promise_test(async () => {
const rs = await createTransferredReadableStream({
start(controller) {
controller.enqueue('a');
controller.close();
}
});
const reader = rs.getReader();
{
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, 'a', 'first chunk should be a');
}
{
const {done} = await reader.read();
assert_true(done, 'should be done now');
}
}, 'sending one chunk through a transferred stream should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start(c) {
controller = c;
}
});
for (let i = 0; i < 10; ++i) {
controller.enqueue(i);
}
controller.close();
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks through a transferred stream should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start(c) {
controller = c;
}
});
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
controller.enqueue(i);
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
controller.close();
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks one at a time should work');
promise_test(async () => {
let controller;
const rs = await createTransferredReadableStream({
start() {
this.counter = 0;
},
pull(controller) {
controller.enqueue(this.counter);
++this.counter;
if (this.counter === 10)
controller.close();
}
});
const reader = rs.getReader();
for (let i = 0; i < 10; ++i) {
const {value, done} = await reader.read();
assert_false(done, 'should not be done yet');
assert_equals(value, i, 'chunk content should match index');
}
const {done} = await reader.read();
assert_true(done, 'should be done now');
}, 'sending ten chunks on demand should work');
promise_test(async () => {
const rs = recordingReadableStream({}, { highWaterMark: 0 });
await delay(0);
assert_array_equals(rs.events, [], 'pull() should not have been called');
// Eat the message so it can't interfere with other tests.
addEventListener('message', () => {}, {once: true});
// The transfer is done manually to verify that it is posting the stream that
// relieves backpressure, not receiving it.
postMessage(rs, '*', [rs]);
await delay(0);
assert_array_equals(rs.events, ['pull'], 'pull() should have been called');
}, 'transferring a stream should relieve backpressure');
promise_test(async () => {
const rs = await recordingTransferredReadableStream({
pull(controller) {
controller.enqueue('a');
}
}, { highWaterMark: 2 });
await delay(0);
assert_array_equals(rs.events, ['pull', 'pull', 'pull'],
'pull() should have been called three times');
}, 'transferring a stream should add one chunk to the queue size');
promise_test(async () => {
const rs = await recordingTransferredReadableStream({
start(controller) {
controller.enqueue(new Uint8Array(1024));
controller.enqueue(new Uint8Array(1024));
}
}, new ByteLengthQueuingStrategy({highWaterMark: 512}));
await delay(0);
// At this point the queue contains 1024/512 bytes and 1/1 chunk, so it's full
// and pull() is not called.
assert_array_equals(rs.events, [], 'pull() should not have been called');
const reader = rs.getReader();
const {value, done} = await reader.read();
assert_false(done, 'we should not be done');
assert_equals(value.byteLength, 1024, 'expected chunk should be returned');
// Now the queue contains 0/512 bytes and 1/1 chunk, so pull() is called. If
// the implementation erroneously counted the extra queue space in bytes, then
// the queue would contain 1024/513 bytes and pull() wouldn't be called.
assert_array_equals(rs.events, ['pull'], 'pull() should have been called');
}, 'the extra queue from transferring is counted in chunks');
async function transferredReadableStreamWithCancelPromise() {
let resolveCancelCalled;
const cancelCalled = new Promise(resolve => {
resolveCancelCalled = resolve;
});
const rs = await recordingTransferredReadableStream({
cancel() {
resolveCancelCalled();
}
});
return { rs, cancelCalled };
}
promise_test(async () => {
const { rs, cancelCalled } = await transferredReadableStreamWithCancelPromise();
rs.cancel('message');
await cancelCalled;
assert_array_equals(rs.events, ['pull', 'cancel', 'message'],
'cancel() should have been called');
const reader = rs.getReader();
// Check the stream really got closed.
await reader.closed;
}, 'cancel should be propagated to the original');
promise_test(async () => {
const { rs, cancelCalled } = await transferredReadableStreamWithCancelPromise();
const reader = rs.getReader();
const readPromise = reader.read();
reader.cancel('done');
const { done } = await readPromise;
assert_true(done, 'should be done');
await cancelCalled;
assert_array_equals(rs.events, ['pull', 'cancel', 'done'],
'events should match');
}, 'cancel should abort a pending read()');
promise_test(async () => {
let cancelComplete = false;
const rs = await createTransferredReadableStream({
async cancel() {
await flushAsyncEvents();
cancelComplete = true;
}
});
await rs.cancel();
assert_false(cancelComplete,
'cancel() on the underlying sink should not have completed');
}, 'stream cancel should not wait for underlying source cancel');
promise_test(async t => {
const rs = await recordingTransferredReadableStream();
const reader = rs.getReader();
let serializationHappened = false;
rs.controller.enqueue({
get getter() {
serializationHappened = true;
return 'a';
}
});
await flushAsyncEvents();
assert_false(serializationHappened,
'serialization should not have happened yet');
const {value, done} = await reader.read();
assert_false(done, 'should not be done');
assert_equals(value.getter, 'a', 'getter should be a');
assert_true(serializationHappened,
'serialization should have happened');
}, 'serialization should not happen until the value is read');
promise_test(async t => {
const rs = await recordingTransferredReadableStream();
const reader = rs.getReader();
rs.controller.enqueue(new ReadableStream());
await promise_rejects_dom(t, 'DataCloneError', reader.read(),
'closed promise should reject');
assert_throws_js(TypeError, () => rs.controller.enqueue(),
'original stream should be errored');
}, 'transferring a non-serializable chunk should error both sides');
promise_test(async t => {
const rs = await createTransferredReadableStream({
start(controller) {
controller.error('foo');
}
});
const reader = rs.getReader();
return promise_rejects_exactly(t, 'foo', reader.read(),
'error should be passed through');
}, 'errors should be passed through');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.error();
const {done} = await reader.read();
assert_true(done, 'should be done');
assert_throws_js(TypeError, () => rs.controller.enqueue(),
'enqueue should throw');
}, 'race between cancel() and error() should leave sides in different states');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.close();
const {done} = await reader.read();
assert_true(done, 'should be done');
}, 'race between cancel() and close() should be benign');
promise_test(async () => {
const rs = await recordingTransferredReadableStream();
await delay(0);
const reader = rs.getReader();
reader.cancel();
rs.controller.enqueue('a');
const {done} = await reader.read();
assert_true(done, 'should be done');
}, 'race between cancel() and enqueue() should be benign');
</script>
|