1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
|
// META: global=window,worker
// META: script=../resources/recording-streams.js
// META: script=../resources/rs-utils.js
// META: script=../resources/test-utils.js
'use strict';
// The size() function of the readable strategy can re-entrantly call back into the ReadableStream implementation. This
// makes it risky to cache state across the call to ReadableStreamDefaultControllerEnqueue. These tests attempt to catch
// such errors. They are separated from the other strategy tests because no real user code should ever do anything like
// this.
const error1 = new Error('error1');
error1.name = 'error1';
promise_test(() => {
let controller;
let calls = 0;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
++calls;
if (calls < 2) {
controller.enqueue('b');
}
return 1;
}
});
controller.enqueue('a');
controller.close();
return readableStreamToArray(rs)
.then(array => assert_array_equals(array, ['b', 'a'], 'array should contain two chunks'));
}, 'enqueue() inside size() should work');
promise_test(() => {
let controller;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
// The queue is empty.
controller.close();
// The state has gone from "readable" to "closed".
return 1;
// This chunk will be enqueued, but will be impossible to read because the state is already "closed".
}
});
controller.enqueue('a');
return readableStreamToArray(rs)
.then(array => assert_array_equals(array, [], 'array should contain no chunks'));
// The chunk 'a' is still in rs's queue. It is closed so 'a' cannot be read.
}, 'close() inside size() should not crash');
promise_test(() => {
let controller;
let calls = 0;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
++calls;
if (calls === 2) {
// The queue contains one chunk.
controller.close();
// The state is still "readable", but closeRequest is now true.
}
return 1;
}
});
controller.enqueue('a');
controller.enqueue('b');
return readableStreamToArray(rs)
.then(array => assert_array_equals(array, ['a', 'b'], 'array should contain two chunks'));
}, 'close request inside size() should work');
promise_test(t => {
let controller;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
controller.error(error1);
return 1;
}
});
controller.enqueue('a');
return promise_rejects_exactly(t, error1, rs.getReader().read(), 'read() should reject');
}, 'error() inside size() should work');
promise_test(() => {
let controller;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
assert_equals(controller.desiredSize, 1, 'desiredSize should be 1');
return 1;
},
highWaterMark: 1
});
controller.enqueue('a');
controller.close();
return readableStreamToArray(rs)
.then(array => assert_array_equals(array, ['a'], 'array should contain one chunk'));
}, 'desiredSize inside size() should work');
promise_test(t => {
let cancelPromise;
let controller;
const rs = new ReadableStream({
start(c) {
controller = c;
},
cancel: t.step_func(reason => {
assert_equals(reason, error1, 'reason should be error1');
assert_throws_js(TypeError, () => controller.enqueue(), 'enqueue() should throw');
})
}, {
size() {
cancelPromise = rs.cancel(error1);
return 1;
},
highWaterMark: Infinity
});
controller.enqueue('a');
const reader = rs.getReader();
return Promise.all([
reader.closed,
cancelPromise
]);
}, 'cancel() inside size() should work');
promise_test(() => {
let controller;
let pipeToPromise;
const ws = recordingWritableStream();
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
if (!pipeToPromise) {
pipeToPromise = rs.pipeTo(ws);
}
return 1;
},
highWaterMark: 1
});
controller.enqueue('a');
assert_not_equals(pipeToPromise, undefined);
// Some pipeTo() implementations need an additional chunk enqueued in order for the first one to be processed. See
// https://github.com/whatwg/streams/issues/794 for background.
controller.enqueue('a');
// Give pipeTo() a chance to process the queued chunks.
return delay(0).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'a'], 'ws should contain two chunks');
controller.close();
return pipeToPromise;
}).then(() => {
assert_array_equals(ws.events, ['write', 'a', 'write', 'a', 'close'], 'target should have been closed');
});
}, 'pipeTo() inside size() should behave as expected');
promise_test(() => {
let controller;
let readPromise;
let calls = 0;
let readResolved = false;
let reader;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
// This is triggered by controller.enqueue(). The queue is empty and there are no pending reads. This read is
// added to the list of pending reads.
readPromise = reader.read();
++calls;
return 1;
},
highWaterMark: 0
});
reader = rs.getReader();
controller.enqueue('a');
readPromise.then(() => {
readResolved = true;
});
return flushAsyncEvents().then(() => {
assert_false(readResolved);
controller.enqueue('b');
assert_equals(calls, 1, 'size() should have been called once');
return delay(0);
}).then(() => {
assert_true(readResolved);
assert_equals(calls, 1, 'size() should only be called once');
return readPromise;
}).then(({ value, done }) => {
assert_false(done, 'done should be false');
// See https://github.com/whatwg/streams/issues/794 for why this chunk is not 'a'.
assert_equals(value, 'b', 'chunk should have been read');
assert_equals(calls, 1, 'calls should still be 1');
return reader.read();
}).then(({ value, done }) => {
assert_false(done, 'done should be false again');
assert_equals(value, 'a', 'chunk a should come after b');
});
}, 'read() inside of size() should behave as expected');
promise_test(() => {
let controller;
let reader;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
reader = rs.getReader();
return 1;
}
});
controller.enqueue('a');
return reader.read().then(({ value, done }) => {
assert_false(done, 'done should be false');
assert_equals(value, 'a', 'value should be a');
});
}, 'getReader() inside size() should work');
promise_test(() => {
let controller;
let branch1;
let branch2;
const rs = new ReadableStream({
start(c) {
controller = c;
}
}, {
size() {
[branch1, branch2] = rs.tee();
return 1;
}
});
controller.enqueue('a');
assert_true(rs.locked, 'rs should be locked');
controller.close();
return Promise.all([
readableStreamToArray(branch1).then(array => assert_array_equals(array, ['a'], 'branch1 should have one chunk')),
readableStreamToArray(branch2).then(array => assert_array_equals(array, ['a'], 'branch2 should have one chunk'))
]);
}, 'tee() inside size() should work');
|