969 lines
33 KiB
JavaScript
969 lines
33 KiB
JavaScript
// META: global=window,worker,shadowrealm
|
|
// META: script=../resources/rs-utils.js
|
|
// META: script=../resources/test-utils.js
|
|
// META: script=../resources/recording-streams.js
|
|
// META: script=../resources/rs-test-templates.js
|
|
'use strict';
|
|
|
|
test(() => {
|
|
|
|
const rs = new ReadableStream({ type: 'bytes' });
|
|
const result = rs.tee();
|
|
|
|
assert_true(Array.isArray(result), 'return value should be an array');
|
|
assert_equals(result.length, 2, 'array should have length 2');
|
|
assert_equals(result[0].constructor, ReadableStream, '0th element should be a ReadableStream');
|
|
assert_equals(result[1].constructor, ReadableStream, '1st element should be a ReadableStream');
|
|
|
|
}, 'ReadableStream teeing with byte source: rs.tee() returns an array of two ReadableStreams');
|
|
|
|
promise_test(async t => {
|
|
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
c.enqueue(new Uint8Array([0x01]));
|
|
c.enqueue(new Uint8Array([0x02]));
|
|
c.close();
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
|
|
reader2.closed.then(t.unreached_func('branch2 should not be closed'));
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(1));
|
|
assert_equals(result.done, false, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x01]), 'value');
|
|
}
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(1));
|
|
assert_equals(result.done, false, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x02]), 'value');
|
|
}
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(1));
|
|
assert_equals(result.done, true, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0]).subarray(0, 0), 'value');
|
|
}
|
|
|
|
{
|
|
const result = await reader2.read(new Uint8Array(1));
|
|
assert_equals(result.done, false, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x01]), 'value');
|
|
}
|
|
|
|
await reader1.closed;
|
|
|
|
}, 'ReadableStream teeing with byte source: should be able to read one branch to the end without affecting the other');
|
|
|
|
promise_test(async () => {
|
|
|
|
let pullCount = 0;
|
|
const enqueuedChunk = new Uint8Array([0x01]);
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
pull(c) {
|
|
++pullCount;
|
|
if (pullCount === 1) {
|
|
c.enqueue(enqueuedChunk);
|
|
}
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader();
|
|
const reader2 = branch2.getReader();
|
|
|
|
const [result1, result2] = await Promise.all([reader1.read(), reader2.read()]);
|
|
assert_equals(result1.done, false, 'reader1 done');
|
|
assert_equals(result2.done, false, 'reader2 done');
|
|
|
|
const view1 = result1.value;
|
|
const view2 = result2.value;
|
|
assert_typed_array_equals(view1, new Uint8Array([0x01]), 'reader1 value');
|
|
assert_typed_array_equals(view2, new Uint8Array([0x01]), 'reader2 value');
|
|
|
|
assert_not_equals(view1.buffer, view2.buffer, 'chunks should have different buffers');
|
|
assert_not_equals(enqueuedChunk.buffer, view1.buffer, 'enqueued chunk and branch1\'s chunk should have different buffers');
|
|
assert_not_equals(enqueuedChunk.buffer, view2.buffer, 'enqueued chunk and branch2\'s chunk should have different buffers');
|
|
|
|
}, 'ReadableStream teeing with byte source: chunks should be cloned for each branch');
|
|
|
|
promise_test(async () => {
|
|
|
|
let pullCount = 0;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
pull(c) {
|
|
++pullCount;
|
|
if (pullCount === 1) {
|
|
c.byobRequest.view[0] = 0x01;
|
|
c.byobRequest.respond(1);
|
|
}
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader();
|
|
const buffer = new Uint8Array([42, 42, 42]).buffer;
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(buffer, 0, 1));
|
|
assert_equals(result.done, false, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x01, 42, 42]).subarray(0, 1), 'value');
|
|
}
|
|
|
|
{
|
|
const result = await reader2.read();
|
|
assert_equals(result.done, false, 'done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x01]), 'value');
|
|
}
|
|
|
|
}, 'ReadableStream teeing with byte source: chunks for BYOB requests from branch 1 should be cloned to branch 2');
|
|
|
|
promise_test(async t => {
|
|
|
|
const theError = { name: 'boo!' };
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
c.enqueue(new Uint8Array([0x01]));
|
|
c.enqueue(new Uint8Array([0x02]));
|
|
},
|
|
pull() {
|
|
throw theError;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(1));
|
|
assert_equals(result.done, false, 'first read from branch1 should not be done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x01]), 'first read from branch1');
|
|
}
|
|
|
|
{
|
|
const result = await reader1.read(new Uint8Array(1));
|
|
assert_equals(result.done, false, 'second read from branch1 should not be done');
|
|
assert_typed_array_equals(result.value, new Uint8Array([0x02]), 'second read from branch1');
|
|
}
|
|
|
|
await promise_rejects_exactly(t, theError, reader1.read(new Uint8Array(1)));
|
|
await promise_rejects_exactly(t, theError, reader2.read(new Uint8Array(1)));
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.closed),
|
|
promise_rejects_exactly(t, theError, reader2.closed)
|
|
]);
|
|
|
|
}, 'ReadableStream teeing with byte source: errors in the source should propagate to both branches');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
c.enqueue(new Uint8Array([0x01]));
|
|
c.enqueue(new Uint8Array([0x02]));
|
|
c.close();
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
branch1.cancel();
|
|
|
|
const [chunks1, chunks2] = await Promise.all([readableStreamToArray(branch1), readableStreamToArray(branch2)]);
|
|
assert_array_equals(chunks1, [], 'branch1 should have no chunks');
|
|
assert_equals(chunks2.length, 2, 'branch2 should have two chunks');
|
|
assert_typed_array_equals(chunks2[0], new Uint8Array([0x01]), 'first chunk from branch2');
|
|
assert_typed_array_equals(chunks2[1], new Uint8Array([0x02]), 'second chunk from branch2');
|
|
|
|
}, 'ReadableStream teeing with byte source: canceling branch1 should not impact branch2');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
c.enqueue(new Uint8Array([0x01]));
|
|
c.enqueue(new Uint8Array([0x02]));
|
|
c.close();
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
branch2.cancel();
|
|
|
|
const [chunks1, chunks2] = await Promise.all([readableStreamToArray(branch1), readableStreamToArray(branch2)]);
|
|
assert_equals(chunks1.length, 2, 'branch1 should have two chunks');
|
|
assert_typed_array_equals(chunks1[0], new Uint8Array([0x01]), 'first chunk from branch1');
|
|
assert_typed_array_equals(chunks1[1], new Uint8Array([0x02]), 'second chunk from branch1');
|
|
assert_array_equals(chunks2, [], 'branch2 should have no chunks');
|
|
|
|
}, 'ReadableStream teeing with byte source: canceling branch2 should not impact branch1');
|
|
|
|
templatedRSTeeCancel('ReadableStream teeing with byte source', (extras) => {
|
|
return new ReadableStream({ type: 'bytes', ...extras });
|
|
});
|
|
|
|
promise_test(async () => {
|
|
|
|
let controller;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
|
|
const promise = Promise.all([reader1.closed, reader2.closed]);
|
|
|
|
controller.close();
|
|
|
|
// The branches are created with HWM 0, so we need to read from at least one of them
|
|
// to observe the stream becoming closed.
|
|
const read1 = await reader1.read(new Uint8Array(1));
|
|
assert_equals(read1.done, true, 'first read from branch1 should be done');
|
|
|
|
await promise;
|
|
|
|
}, 'ReadableStream teeing with byte source: closing the original should close the branches');
|
|
|
|
promise_test(async t => {
|
|
|
|
let controller;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
|
|
const theError = { name: 'boo!' };
|
|
const promise = Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.closed),
|
|
promise_rejects_exactly(t, theError, reader2.closed)
|
|
]);
|
|
|
|
controller.error(theError);
|
|
await promise;
|
|
|
|
}, 'ReadableStream teeing with byte source: erroring the original should immediately error the branches');
|
|
|
|
promise_test(async t => {
|
|
|
|
let controller;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader();
|
|
const reader2 = branch2.getReader();
|
|
|
|
const theError = { name: 'boo!' };
|
|
const promise = Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.read()),
|
|
promise_rejects_exactly(t, theError, reader2.read())
|
|
]);
|
|
|
|
controller.error(theError);
|
|
await promise;
|
|
|
|
}, 'ReadableStream teeing with byte source: erroring the original should error pending reads from default reader');
|
|
|
|
promise_test(async t => {
|
|
|
|
let controller;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
|
|
const theError = { name: 'boo!' };
|
|
const promise = Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.read(new Uint8Array(1))),
|
|
promise_rejects_exactly(t, theError, reader2.read(new Uint8Array(1)))
|
|
]);
|
|
|
|
controller.error(theError);
|
|
await promise;
|
|
|
|
}, 'ReadableStream teeing with byte source: erroring the original should error pending reads from BYOB reader');
|
|
|
|
promise_test(async () => {
|
|
|
|
let controller;
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
const cancelPromise = reader2.cancel();
|
|
|
|
controller.enqueue(new Uint8Array([0x01]));
|
|
|
|
const read1 = await reader1.read(new Uint8Array(1));
|
|
assert_equals(read1.done, false, 'first read() from branch1 should not be done');
|
|
assert_typed_array_equals(read1.value, new Uint8Array([0x01]), 'first read() from branch1');
|
|
|
|
controller.close();
|
|
|
|
const read2 = await reader1.read(new Uint8Array(1));
|
|
assert_equals(read2.done, true, 'second read() from branch1 should be done');
|
|
|
|
await Promise.all([
|
|
reader1.closed,
|
|
cancelPromise
|
|
]);
|
|
|
|
}, 'ReadableStream teeing with byte source: canceling branch1 should finish when branch2 reads until end of stream');
|
|
|
|
promise_test(async t => {
|
|
|
|
let controller;
|
|
const theError = { name: 'boo!' };
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
start(c) {
|
|
controller = c;
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
const cancelPromise = reader2.cancel();
|
|
|
|
controller.error(theError);
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.read(new Uint8Array(1))),
|
|
cancelPromise
|
|
]);
|
|
|
|
}, 'ReadableStream teeing with byte source: canceling branch1 should finish when original stream errors');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
// Create two branches, each with a HWM of 0. This should result in no chunks being pulled.
|
|
rs.tee();
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, [], 'pull should not be called');
|
|
|
|
}, 'ReadableStream teeing with byte source: should not pull any chunks if no branches are reading');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({
|
|
type: 'bytes',
|
|
pull(controller) {
|
|
controller.enqueue(new Uint8Array([0x01]));
|
|
}
|
|
});
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
await Promise.all([
|
|
reader1.read(new Uint8Array(1)),
|
|
reader2.read(new Uint8Array(1))
|
|
]);
|
|
assert_array_equals(rs.events, ['pull'], 'pull should be called once');
|
|
|
|
}, 'ReadableStream teeing with byte source: should only pull enough to fill the emptiest queue');
|
|
|
|
promise_test(async t => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const theError = { name: 'boo!' };
|
|
|
|
rs.controller.error(theError);
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, [], 'pull should not be called');
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, reader1.closed),
|
|
promise_rejects_exactly(t, theError, reader2.closed)
|
|
]);
|
|
|
|
}, 'ReadableStream teeing with byte source: should not pull when original is already errored');
|
|
|
|
for (const branch of [1, 2]) {
|
|
promise_test(async t => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const theError = { name: 'boo!' };
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, [], 'pull should not be called');
|
|
|
|
const reader = (branch === 1) ? reader1 : reader2;
|
|
const read1 = reader.read(new Uint8Array(1));
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, ['pull'], 'pull should be called once');
|
|
|
|
rs.controller.error(theError);
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, read1),
|
|
promise_rejects_exactly(t, theError, reader1.closed),
|
|
promise_rejects_exactly(t, theError, reader2.closed)
|
|
]);
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, ['pull'], 'pull should be called once');
|
|
|
|
}, `ReadableStream teeing with byte source: stops pulling when original stream errors while branch ${branch} is reading`);
|
|
}
|
|
|
|
promise_test(async t => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const theError = { name: 'boo!' };
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, [], 'pull should not be called');
|
|
|
|
const read1 = reader1.read(new Uint8Array(1));
|
|
const read2 = reader2.read(new Uint8Array(1));
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, ['pull'], 'pull should be called once');
|
|
|
|
rs.controller.error(theError);
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, read1),
|
|
promise_rejects_exactly(t, theError, read2),
|
|
promise_rejects_exactly(t, theError, reader1.closed),
|
|
promise_rejects_exactly(t, theError, reader2.closed)
|
|
]);
|
|
|
|
await flushAsyncEvents();
|
|
assert_array_equals(rs.events, ['pull'], 'pull should be called once');
|
|
|
|
}, 'ReadableStream teeing with byte source: stops pulling when original stream errors while both branches are reading');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
|
|
const cancel1 = reader1.cancel();
|
|
await flushAsyncEvents();
|
|
const cancel2 = reader2.cancel();
|
|
|
|
const result1 = await read1;
|
|
assert_object_equals(result1, { value: undefined, done: true });
|
|
const result2 = await read2;
|
|
assert_object_equals(result2, { value: undefined, done: true });
|
|
|
|
await Promise.all([cancel1, cancel2]);
|
|
|
|
}, 'ReadableStream teeing with byte source: canceling both branches in sequence with delay');
|
|
|
|
promise_test(async t => {
|
|
|
|
const theError = { name: 'boo!' };
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
cancel() {
|
|
throw theError;
|
|
}
|
|
});
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
|
|
const cancel1 = reader1.cancel();
|
|
await flushAsyncEvents();
|
|
const cancel2 = reader2.cancel();
|
|
|
|
const result1 = await read1;
|
|
assert_object_equals(result1, { value: undefined, done: true });
|
|
const result2 = await read2;
|
|
assert_object_equals(result2, { value: undefined, done: true });
|
|
|
|
await Promise.all([
|
|
promise_rejects_exactly(t, theError, cancel1),
|
|
promise_rejects_exactly(t, theError, cancel2)
|
|
]);
|
|
|
|
}, 'ReadableStream teeing with byte source: failing to cancel when canceling both branches in sequence with delay');
|
|
|
|
promise_test(async () => {
|
|
|
|
let cancelResolve;
|
|
const cancelCalled = new Promise((resolve) => {
|
|
cancelResolve = resolve;
|
|
});
|
|
const rs = recordingReadableStream({
|
|
type: 'bytes',
|
|
cancel() {
|
|
cancelResolve();
|
|
}
|
|
});
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// We are reading into branch1's buffer.
|
|
const byobRequest1 = rs.controller.byobRequest;
|
|
assert_not_equals(byobRequest1, null);
|
|
assert_typed_array_equals(byobRequest1.view, new Uint8Array([0x11]), 'byobRequest1.view');
|
|
|
|
// Cancelling branch1 should not affect the BYOB request.
|
|
const cancel1 = reader1.cancel();
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true);
|
|
assert_equals(result1.value, undefined);
|
|
await flushAsyncEvents();
|
|
const byobRequest2 = rs.controller.byobRequest;
|
|
assert_typed_array_equals(byobRequest2.view, new Uint8Array([0x11]), 'byobRequest2.view');
|
|
|
|
// Cancelling branch1 should invalidate the BYOB request.
|
|
const cancel2 = reader2.cancel();
|
|
await cancelCalled;
|
|
const byobRequest3 = rs.controller.byobRequest;
|
|
assert_equals(byobRequest3, null);
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true);
|
|
assert_equals(result2.value, undefined);
|
|
|
|
await Promise.all([cancel1, cancel2]);
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, cancel branch2');
|
|
|
|
promise_test(async () => {
|
|
|
|
let cancelResolve;
|
|
const cancelCalled = new Promise((resolve) => {
|
|
cancelResolve = resolve;
|
|
});
|
|
const rs = recordingReadableStream({
|
|
type: 'bytes',
|
|
cancel() {
|
|
cancelResolve();
|
|
}
|
|
});
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// We are reading into branch1's buffer.
|
|
const byobRequest1 = rs.controller.byobRequest;
|
|
assert_not_equals(byobRequest1, null);
|
|
assert_typed_array_equals(byobRequest1.view, new Uint8Array([0x11]), 'byobRequest1.view');
|
|
|
|
// Cancelling branch2 should not affect the BYOB request.
|
|
const cancel2 = reader2.cancel();
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true);
|
|
assert_equals(result2.value, undefined);
|
|
await flushAsyncEvents();
|
|
const byobRequest2 = rs.controller.byobRequest;
|
|
assert_typed_array_equals(byobRequest2.view, new Uint8Array([0x11]), 'byobRequest2.view');
|
|
|
|
// Cancelling branch1 should invalidate the BYOB request.
|
|
const cancel1 = reader1.cancel();
|
|
await cancelCalled;
|
|
const byobRequest3 = rs.controller.byobRequest;
|
|
assert_equals(byobRequest3, null);
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true);
|
|
assert_equals(result1.value, undefined);
|
|
|
|
await Promise.all([cancel1, cancel2]);
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, cancel branch1');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// We are reading into branch1's buffer.
|
|
assert_typed_array_equals(rs.controller.byobRequest.view, new Uint8Array([0x11]), 'first byobRequest.view');
|
|
|
|
// Cancelling branch2 should not affect the BYOB request.
|
|
reader2.cancel();
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true);
|
|
assert_equals(result2.value, undefined);
|
|
await flushAsyncEvents();
|
|
assert_typed_array_equals(rs.controller.byobRequest.view, new Uint8Array([0x11]), 'second byobRequest.view');
|
|
|
|
// Respond to the BYOB request.
|
|
rs.controller.byobRequest.view[0] = 0x33;
|
|
rs.controller.byobRequest.respond(1);
|
|
|
|
// branch1 should receive the read chunk.
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, false);
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x33]), 'first read() from branch1');
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, enqueue to branch1');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// We are reading into branch1's buffer.
|
|
assert_typed_array_equals(rs.controller.byobRequest.view, new Uint8Array([0x11]), 'first byobRequest.view');
|
|
|
|
// Cancelling branch1 should not affect the BYOB request.
|
|
reader1.cancel();
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true);
|
|
assert_equals(result1.value, undefined);
|
|
await flushAsyncEvents();
|
|
assert_typed_array_equals(rs.controller.byobRequest.view, new Uint8Array([0x11]), 'second byobRequest.view');
|
|
|
|
// Respond to the BYOB request.
|
|
rs.controller.byobRequest.view[0] = 0x33;
|
|
rs.controller.byobRequest.respond(1);
|
|
|
|
// branch2 should receive the read chunk.
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, false);
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x33]), 'first read() from branch2');
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, respond to branch2');
|
|
|
|
promise_test(async () => {
|
|
|
|
let pullCount = 0;
|
|
const byobRequestDefined = [];
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
pull(c) {
|
|
++pullCount;
|
|
byobRequestDefined.push(c.byobRequest !== null);
|
|
c.enqueue(new Uint8Array([pullCount]));
|
|
}
|
|
});
|
|
|
|
const [branch1, _] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
|
|
const result1 = await reader1.read(new Uint8Array([0x11]));
|
|
assert_equals(result1.done, false, 'first read should not be done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x1]), 'first read');
|
|
assert_equals(pullCount, 1, 'pull() should be called once');
|
|
assert_equals(byobRequestDefined[0], true, 'should have created a BYOB request for first read');
|
|
|
|
reader1.releaseLock();
|
|
const reader2 = branch1.getReader();
|
|
|
|
const result2 = await reader2.read();
|
|
assert_equals(result2.done, false, 'second read should not be done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x2]), 'second read');
|
|
assert_equals(pullCount, 2, 'pull() should be called twice');
|
|
assert_equals(byobRequestDefined[1], false, 'should not have created a BYOB request for second read');
|
|
|
|
}, 'ReadableStream teeing with byte source: pull with BYOB reader, then pull with default reader');
|
|
|
|
promise_test(async () => {
|
|
|
|
let pullCount = 0;
|
|
const byobRequestDefined = [];
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
pull(c) {
|
|
++pullCount;
|
|
byobRequestDefined.push(c.byobRequest !== null);
|
|
c.enqueue(new Uint8Array([pullCount]));
|
|
}
|
|
});
|
|
|
|
const [branch1, _] = rs.tee();
|
|
const reader1 = branch1.getReader();
|
|
|
|
const result1 = await reader1.read();
|
|
assert_equals(result1.done, false, 'first read should not be done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x1]), 'first read');
|
|
assert_equals(pullCount, 1, 'pull() should be called once');
|
|
assert_equals(byobRequestDefined[0], false, 'should not have created a BYOB request for first read');
|
|
|
|
reader1.releaseLock();
|
|
const reader2 = branch1.getReader({ mode: 'byob' });
|
|
|
|
const result2 = await reader2.read(new Uint8Array([0x22]));
|
|
assert_equals(result2.done, false, 'second read should not be done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x2]), 'second read');
|
|
assert_equals(pullCount, 2, 'pull() should be called twice');
|
|
assert_equals(byobRequestDefined[1], true, 'should have created a BYOB request for second read');
|
|
|
|
}, 'ReadableStream teeing with byte source: pull with default reader, then pull with BYOB reader');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({
|
|
type: 'bytes'
|
|
});
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
|
|
// Wait for each branch's start() promise to resolve.
|
|
await flushAsyncEvents();
|
|
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
|
|
// branch2 should provide the BYOB request.
|
|
const byobRequest = rs.controller.byobRequest;
|
|
assert_typed_array_equals(byobRequest.view, new Uint8Array([0x22]), 'first BYOB request');
|
|
byobRequest.view[0] = 0x01;
|
|
byobRequest.respond(1);
|
|
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, false, 'first read should not be done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x1]), 'first read');
|
|
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, false, 'second read should not be done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x1]), 'second read');
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch2, then read from branch1');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader();
|
|
const reader2 = branch2.getReader({ mode: 'byob' });
|
|
await flushAsyncEvents();
|
|
|
|
const read1 = reader1.read();
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// There should be no BYOB request.
|
|
assert_equals(rs.controller.byobRequest, null, 'first BYOB request');
|
|
|
|
// Close the stream.
|
|
rs.controller.close();
|
|
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true, 'read from branch1 should be done');
|
|
assert_equals(result1.value, undefined, 'read from branch1');
|
|
|
|
// branch2 should get its buffer back.
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true, 'read from branch2 should be done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x22]).subarray(0, 0), 'read from branch2');
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch1 with default reader, then close while branch2 has pending BYOB read');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader({ mode: 'byob' });
|
|
const reader2 = branch2.getReader();
|
|
await flushAsyncEvents();
|
|
|
|
const read2 = reader2.read();
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
await flushAsyncEvents();
|
|
|
|
// There should be no BYOB request.
|
|
assert_equals(rs.controller.byobRequest, null, 'first BYOB request');
|
|
|
|
// Close the stream.
|
|
rs.controller.close();
|
|
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true, 'read from branch2 should be done');
|
|
assert_equals(result2.value, undefined, 'read from branch2');
|
|
|
|
// branch1 should get its buffer back.
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true, 'read from branch1 should be done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x11]).subarray(0, 0), 'read from branch1');
|
|
|
|
}, 'ReadableStream teeing with byte source: read from branch2 with default reader, then close while branch1 has pending BYOB read');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
await flushAsyncEvents();
|
|
|
|
const read1 = reader1.read(new Uint8Array([0x11]));
|
|
const read2 = reader2.read(new Uint8Array([0x22]));
|
|
await flushAsyncEvents();
|
|
|
|
// branch1 should provide the BYOB request.
|
|
const byobRequest = rs.controller.byobRequest;
|
|
assert_typed_array_equals(byobRequest.view, new Uint8Array([0x11]), 'first BYOB request');
|
|
|
|
// Close the stream.
|
|
rs.controller.close();
|
|
byobRequest.respond(0);
|
|
|
|
// Both branches should get their buffers back.
|
|
const result1 = await read1;
|
|
assert_equals(result1.done, true, 'first read should be done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x11]).subarray(0, 0), 'first read');
|
|
|
|
const result2 = await read2;
|
|
assert_equals(result2.done, true, 'second read should be done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x22]).subarray(0, 0), 'second read');
|
|
|
|
}, 'ReadableStream teeing with byte source: close when both branches have pending BYOB reads');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader());
|
|
const branch1Reads = [reader1.read(), reader1.read()];
|
|
const branch2Reads = [reader2.read(), reader2.read()];
|
|
|
|
await flushAsyncEvents();
|
|
rs.controller.enqueue(new Uint8Array([0x11]));
|
|
rs.controller.close();
|
|
|
|
const result1 = await branch1Reads[0];
|
|
assert_equals(result1.done, false, 'first read() from branch1 should be not done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x11]), 'first chunk from branch1 should be correct');
|
|
const result2 = await branch2Reads[0];
|
|
assert_equals(result2.done, false, 'first read() from branch2 should be not done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x11]), 'first chunk from branch2 should be correct');
|
|
|
|
assert_object_equals(await branch1Reads[1], { value: undefined, done: true }, 'second read() from branch1 should be done');
|
|
assert_object_equals(await branch2Reads[1], { value: undefined, done: true }, 'second read() from branch2 should be done');
|
|
|
|
}, 'ReadableStream teeing with byte source: enqueue() and close() while both branches are pulling');
|
|
|
|
promise_test(async () => {
|
|
|
|
const rs = recordingReadableStream({ type: 'bytes' });
|
|
|
|
const [reader1, reader2] = rs.tee().map(branch => branch.getReader({ mode: 'byob' }));
|
|
const branch1Reads = [reader1.read(new Uint8Array(1)), reader1.read(new Uint8Array(1))];
|
|
const branch2Reads = [reader2.read(new Uint8Array(1)), reader2.read(new Uint8Array(1))];
|
|
|
|
await flushAsyncEvents();
|
|
rs.controller.byobRequest.view[0] = 0x11;
|
|
rs.controller.byobRequest.respond(1);
|
|
rs.controller.close();
|
|
|
|
const result1 = await branch1Reads[0];
|
|
assert_equals(result1.done, false, 'first read() from branch1 should be not done');
|
|
assert_typed_array_equals(result1.value, new Uint8Array([0x11]), 'first chunk from branch1 should be correct');
|
|
const result2 = await branch2Reads[0];
|
|
assert_equals(result2.done, false, 'first read() from branch2 should be not done');
|
|
assert_typed_array_equals(result2.value, new Uint8Array([0x11]), 'first chunk from branch2 should be correct');
|
|
|
|
const result3 = await branch1Reads[1];
|
|
assert_equals(result3.done, true, 'second read() from branch1 should be done');
|
|
assert_typed_array_equals(result3.value, new Uint8Array([0]).subarray(0, 0), 'second chunk from branch1 should be correct');
|
|
const result4 = await branch2Reads[1];
|
|
assert_equals(result4.done, true, 'second read() from branch2 should be done');
|
|
assert_typed_array_equals(result4.value, new Uint8Array([0]).subarray(0, 0), 'second chunk from branch2 should be correct');
|
|
|
|
}, 'ReadableStream teeing with byte source: respond() and close() while both branches are pulling');
|
|
|
|
promise_test(async t => {
|
|
let pullCount = 0;
|
|
const arrayBuffer = new Uint8Array([0x01, 0x02, 0x03]).buffer;
|
|
const enqueuedChunk = new Uint8Array(arrayBuffer, 2);
|
|
assert_equals(enqueuedChunk.length, 1);
|
|
assert_equals(enqueuedChunk.byteOffset, 2);
|
|
const rs = new ReadableStream({
|
|
type: 'bytes',
|
|
pull(c) {
|
|
++pullCount;
|
|
if (pullCount === 1) {
|
|
c.enqueue(enqueuedChunk);
|
|
}
|
|
}
|
|
});
|
|
|
|
const [branch1, branch2] = rs.tee();
|
|
const reader1 = branch1.getReader();
|
|
const reader2 = branch2.getReader();
|
|
|
|
const [result1, result2] = await Promise.all([reader1.read(), reader2.read()]);
|
|
assert_equals(result1.done, false, 'reader1 done');
|
|
assert_equals(result2.done, false, 'reader2 done');
|
|
|
|
const view1 = result1.value;
|
|
const view2 = result2.value;
|
|
// The first stream has the transferred buffer, but the second stream has the
|
|
// cloned buffer.
|
|
const underlying = new Uint8Array([0x01, 0x02, 0x03]).buffer;
|
|
assert_typed_array_equals(view1, new Uint8Array(underlying, 2), 'reader1 value');
|
|
assert_typed_array_equals(view2, new Uint8Array([0x03]), 'reader2 value');
|
|
}, 'ReadableStream teeing with byte source: reading an array with a byte offset should clone correctly');
|