// META: global=window,worker,shadowrealm // META: script=../resources/rs-utils.js // META: script=../resources/test-utils.js // META: script=../resources/recording-streams.js 'use strict'; const error1 = new Error('error1'); function assert_iter_result(iterResult, value, done, message) { const prefix = message === undefined ? '' : `${message} `; assert_equals(typeof iterResult, 'object', `${prefix}type is object`); assert_equals(Object.getPrototypeOf(iterResult), Object.prototype, `${prefix}[[Prototype]]`); assert_array_equals(Object.getOwnPropertyNames(iterResult).sort(), ['done', 'value'], `${prefix}property names`); assert_equals(iterResult.value, value, `${prefix}value`); assert_equals(iterResult.done, done, `${prefix}done`); } test(() => { const s = new ReadableStream(); const it = s.values(); const proto = Object.getPrototypeOf(it); const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype); assert_equals(Object.getPrototypeOf(proto), AsyncIteratorPrototype, 'prototype should extend AsyncIteratorPrototype'); const methods = ['next', 'return'].sort(); assert_array_equals(Object.getOwnPropertyNames(proto).sort(), methods, 'should have all the correct methods'); for (const m of methods) { const propDesc = Object.getOwnPropertyDescriptor(proto, m); assert_true(propDesc.enumerable, 'method should be enumerable'); assert_true(propDesc.configurable, 'method should be configurable'); assert_true(propDesc.writable, 'method should be writable'); assert_equals(typeof it[m], 'function', 'method should be a function'); assert_equals(it[m].name, m, 'method should have the correct name'); } assert_equals(it.next.length, 0, 'next should have no parameters'); assert_equals(it.return.length, 1, 'return should have 1 parameter'); assert_equals(typeof it.throw, 'undefined', 'throw should not exist'); }, 'Async iterator instances should have the correct list of properties'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(1); c.enqueue(2); c.enqueue(3); c.close(); } }); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [1, 2, 3]); }, 'Async-iterating a push source'); promise_test(async () => { let i = 1; const s = new ReadableStream({ pull(c) { c.enqueue(i); if (i >= 3) { c.close(); } i += 1; } }); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [1, 2, 3]); }, 'Async-iterating a pull source'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(undefined); c.enqueue(undefined); c.enqueue(undefined); c.close(); } }); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [undefined, undefined, undefined]); }, 'Async-iterating a push source with undefined values'); promise_test(async () => { let i = 1; const s = new ReadableStream({ pull(c) { c.enqueue(undefined); if (i >= 3) { c.close(); } i += 1; } }); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [undefined, undefined, undefined]); }, 'Async-iterating a pull source with undefined values'); promise_test(async () => { let i = 1; const s = recordingReadableStream({ pull(c) { c.enqueue(i); if (i >= 3) { c.close(); } i += 1; }, }, new CountQueuingStrategy({ highWaterMark: 0 })); const it = s.values(); assert_array_equals(s.events, []); const read1 = await it.next(); assert_iter_result(read1, 1, false); assert_array_equals(s.events, ['pull']); const read2 = await it.next(); assert_iter_result(read2, 2, false); assert_array_equals(s.events, ['pull', 'pull']); const read3 = await it.next(); assert_iter_result(read3, 3, false); assert_array_equals(s.events, ['pull', 'pull', 'pull']); const read4 = await it.next(); assert_iter_result(read4, undefined, true); assert_array_equals(s.events, ['pull', 'pull', 'pull']); }, 'Async-iterating a pull source manually'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.error('e'); }, }); try { for await (const chunk of s) {} assert_unreached(); } catch (e) { assert_equals(e, 'e'); } }, 'Async-iterating an errored stream throws'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.close(); } }); for await (const chunk of s) { assert_unreached(); } }, 'Async-iterating a closed stream never executes the loop body, but works fine'); promise_test(async () => { const s = new ReadableStream(); const loop = async () => { for await (const chunk of s) { assert_unreached(); } assert_unreached(); }; await Promise.race([ loop(), flushAsyncEvents() ]); }, 'Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(1); c.enqueue(2); c.enqueue(3); c.close(); }, }); const reader = s.getReader(); const readResult = await reader.read(); assert_iter_result(readResult, 1, false); reader.releaseLock(); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [2, 3]); }, 'Async-iterating a partially consumed stream'); for (const type of ['throw', 'break', 'return']) { for (const preventCancel of [false, true]) { promise_test(async () => { const s = recordingReadableStream({ start(c) { c.enqueue(0); } }); // use a separate function for the loop body so return does not stop the test const loop = async () => { for await (const c of s.values({ preventCancel })) { if (type === 'throw') { throw new Error(); } else if (type === 'break') { break; } else if (type === 'return') { return; } } }; try { await loop(); } catch (e) {} if (preventCancel) { assert_array_equals(s.events, ['pull'], `cancel() should not be called`); } else { assert_array_equals(s.events, ['pull', 'cancel', undefined], `cancel() should be called`); } }, `Cancellation behavior when ${type}ing inside loop body; preventCancel = ${preventCancel}`); } } for (const preventCancel of [false, true]) { promise_test(async () => { const s = recordingReadableStream({ start(c) { c.enqueue(0); } }); const it = s.values({ preventCancel }); await it.return(); if (preventCancel) { assert_array_equals(s.events, [], `cancel() should not be called`); } else { assert_array_equals(s.events, ['cancel', undefined], `cancel() should be called`); } }, `Cancellation behavior when manually calling return(); preventCancel = ${preventCancel}`); } promise_test(async t => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResult1 = await it.next(); assert_iter_result(iterResult1, 0, false, '1st next()'); await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); }, 'next() rejects if the stream errors'); promise_test(async () => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResult = await it.return('return value'); assert_iter_result(iterResult, 'return value', true); }, 'return() does not rejects if the stream has not errored yet'); promise_test(async t => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { // Do not error in start() because doing so would prevent acquiring a reader/async iterator. c.error(error1); } }); const it = s[Symbol.asyncIterator](); await flushAsyncEvents(); await promise_rejects_exactly(t, error1, it.return('return value')); }, 'return() rejects if the stream has errored'); promise_test(async t => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResult1 = await it.next(); assert_iter_result(iterResult1, 0, false, '1st next()'); await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); const iterResult3 = await it.next(); assert_iter_result(iterResult3, undefined, true, '3rd next()'); }, 'next() that succeeds; next() that reports an error; next()'); promise_test(async () => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResults = await Promise.allSettled([it.next(), it.next(), it.next()]); assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); assert_iter_result(iterResults[0].value, 0, false, '1st next()'); assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); assert_equals(iterResults[2].status, 'fulfilled', '3rd next() promise status'); assert_iter_result(iterResults[2].value, undefined, true, '3rd next()'); }, 'next() that succeeds; next() that reports an error(); next() [no awaiting]'); promise_test(async t => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResult1 = await it.next(); assert_iter_result(iterResult1, 0, false, '1st next()'); await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); const iterResult3 = await it.return('return value'); assert_iter_result(iterResult3, 'return value', true, 'return()'); }, 'next() that succeeds; next() that reports an error(); return()'); promise_test(async () => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator](); const iterResults = await Promise.allSettled([it.next(), it.next(), it.return('return value')]); assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); assert_iter_result(iterResults[0].value, 0, false, '1st next()'); assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); assert_equals(iterResults[2].status, 'fulfilled', 'return() promise status'); assert_iter_result(iterResults[2].value, 'return value', true, 'return()'); }, 'next() that succeeds; next() that reports an error(); return() [no awaiting]'); promise_test(async () => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { c.enqueue(timesPulled); ++timesPulled; } }); const it = s[Symbol.asyncIterator](); const iterResult1 = await it.next(); assert_iter_result(iterResult1, 0, false, 'next()'); const iterResult2 = await it.return('return value'); assert_iter_result(iterResult2, 'return value', true, 'return()'); assert_equals(timesPulled, 2); }, 'next() that succeeds; return()'); promise_test(async () => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { c.enqueue(timesPulled); ++timesPulled; } }); const it = s[Symbol.asyncIterator](); const iterResults = await Promise.allSettled([it.next(), it.return('return value')]); assert_equals(iterResults[0].status, 'fulfilled', 'next() promise status'); assert_iter_result(iterResults[0].value, 0, false, 'next()'); assert_equals(iterResults[1].status, 'fulfilled', 'return() promise status'); assert_iter_result(iterResults[1].value, 'return value', true, 'return()'); assert_equals(timesPulled, 2); }, 'next() that succeeds; return() [no awaiting]'); promise_test(async () => { const rs = new ReadableStream(); const it = rs.values(); const iterResult1 = await it.return('return value'); assert_iter_result(iterResult1, 'return value', true, 'return()'); const iterResult2 = await it.next(); assert_iter_result(iterResult2, undefined, true, 'next()'); }, 'return(); next()'); promise_test(async () => { const rs = new ReadableStream(); const it = rs.values(); const iterResults = await Promise.allSettled([it.return('return value'), it.next()]); assert_equals(iterResults[0].status, 'fulfilled', 'return() promise status'); assert_iter_result(iterResults[0].value, 'return value', true, 'return()'); assert_equals(iterResults[1].status, 'fulfilled', 'next() promise status'); assert_iter_result(iterResults[1].value, undefined, true, 'next()'); }, 'return(); next() [no awaiting]'); promise_test(async () => { const rs = new ReadableStream(); const it = rs.values(); const iterResult1 = await it.return('return value 1'); assert_iter_result(iterResult1, 'return value 1', true, '1st return()'); const iterResult2 = await it.return('return value 2'); assert_iter_result(iterResult2, 'return value 2', true, '1st return()'); }, 'return(); return()'); promise_test(async () => { const rs = new ReadableStream(); const it = rs.values(); const iterResults = await Promise.allSettled([it.return('return value 1'), it.return('return value 2')]); assert_equals(iterResults[0].status, 'fulfilled', '1st return() promise status'); assert_iter_result(iterResults[0].value, 'return value 1', true, '1st return()'); assert_equals(iterResults[1].status, 'fulfilled', '2nd return() promise status'); assert_iter_result(iterResults[1].value, 'return value 2', true, '1st return()'); }, 'return(); return() [no awaiting]'); test(() => { const s = new ReadableStream({ start(c) { c.enqueue(0); c.close(); }, }); s.values(); assert_throws_js(TypeError, () => s.values(), 'values() should throw'); }, 'values() throws if there\'s already a lock'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(1); c.enqueue(2); c.enqueue(3); c.close(); } }); const chunks = []; for await (const chunk of s) { chunks.push(chunk); } assert_array_equals(chunks, [1, 2, 3]); const reader = s.getReader(); await reader.closed; }, 'Acquiring a reader after exhaustively async-iterating a stream'); promise_test(async t => { let timesPulled = 0; const s = new ReadableStream({ pull(c) { if (timesPulled === 0) { c.enqueue(0); ++timesPulled; } else { c.error(error1); } } }); const it = s[Symbol.asyncIterator]({ preventCancel: true }); const iterResult1 = await it.next(); assert_iter_result(iterResult1, 0, false, '1st next()'); await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); const iterResult2 = await it.return('return value'); assert_iter_result(iterResult2, 'return value', true, 'return()'); // i.e. it should not reject with a generic "this stream is locked" TypeError. const reader = s.getReader(); await promise_rejects_exactly(t, error1, reader.closed, 'closed on the new reader should reject with the error'); }, 'Acquiring a reader after return()ing from a stream that errors'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(1); c.enqueue(2); c.enqueue(3); c.close(); }, }); // read the first two chunks, then cancel const chunks = []; for await (const chunk of s) { chunks.push(chunk); if (chunk >= 2) { break; } } assert_array_equals(chunks, [1, 2]); const reader = s.getReader(); await reader.closed; }, 'Acquiring a reader after partially async-iterating a stream'); promise_test(async () => { const s = new ReadableStream({ start(c) { c.enqueue(1); c.enqueue(2); c.enqueue(3); c.close(); }, }); // read the first two chunks, then release lock const chunks = []; for await (const chunk of s.values({preventCancel: true})) { chunks.push(chunk); if (chunk >= 2) { break; } } assert_array_equals(chunks, [1, 2]); const reader = s.getReader(); const readResult = await reader.read(); assert_iter_result(readResult, 3, false); await reader.closed; }, 'Acquiring a reader and reading the remaining chunks after partially async-iterating a stream with preventCancel = true'); for (const preventCancel of [false, true]) { test(() => { const rs = new ReadableStream(); rs.values({ preventCancel }).return(); // The test passes if this line doesn't throw. rs.getReader(); }, `return() should unlock the stream synchronously when preventCancel = ${preventCancel}`); } promise_test(async () => { const rs = new ReadableStream({ async start(c) { c.enqueue('a'); c.enqueue('b'); c.enqueue('c'); await flushAsyncEvents(); // At this point, the async iterator has a read request in the stream's queue for its pending next() promise. // Closing the stream now causes two things to happen *synchronously*: // 1. ReadableStreamClose resolves reader.[[closedPromise]] with undefined. // 2. ReadableStreamClose calls the read request's close steps, which calls ReadableStreamReaderGenericRelease, // which replaces reader.[[closedPromise]] with a rejected promise. c.close(); } }); const chunks = []; for await (const chunk of rs) { chunks.push(chunk); } assert_array_equals(chunks, ['a', 'b', 'c']); }, 'close() while next() is pending');