summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/streams/piping
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /testing/web-platform/tests/streams/piping
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/web-platform/tests/streams/piping')
-rw-r--r--testing/web-platform/tests/streams/piping/abort.any.js408
-rw-r--r--testing/web-platform/tests/streams/piping/close-propagation-backward.any.js153
-rw-r--r--testing/web-platform/tests/streams/piping/close-propagation-forward.any.js589
-rw-r--r--testing/web-platform/tests/streams/piping/error-propagation-backward.any.js630
-rw-r--r--testing/web-platform/tests/streams/piping/error-propagation-forward.any.js569
-rw-r--r--testing/web-platform/tests/streams/piping/flow-control.any.js297
-rw-r--r--testing/web-platform/tests/streams/piping/general.any.js211
-rw-r--r--testing/web-platform/tests/streams/piping/multiple-propagation.any.js227
-rw-r--r--testing/web-platform/tests/streams/piping/pipe-through.any.js331
-rw-r--r--testing/web-platform/tests/streams/piping/then-interception.any.js68
-rw-r--r--testing/web-platform/tests/streams/piping/throwing-options.any.js65
-rw-r--r--testing/web-platform/tests/streams/piping/transform-streams.any.js22
12 files changed, 3570 insertions, 0 deletions
diff --git a/testing/web-platform/tests/streams/piping/abort.any.js b/testing/web-platform/tests/streams/piping/abort.any.js
new file mode 100644
index 0000000000..503de9dcaf
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/abort.any.js
@@ -0,0 +1,408 @@
+// META: global=window,worker
+// META: script=../resources/recording-streams.js
+// META: script=../resources/test-utils.js
+'use strict';
+
+// Tests for the use of pipeTo with AbortSignal.
+// There is some extra complexity to avoid timeouts in environments where abort is not implemented.
+
+const error1 = new Error('error1');
+error1.name = 'error1';
+const error2 = new Error('error2');
+error2.name = 'error2';
+
+const errorOnPull = {
+ pull(controller) {
+ // This will cause the test to error if pipeTo abort is not implemented.
+ controller.error('failed to abort');
+ }
+};
+
+// To stop pull() being called immediately when the stream is created, we need to set highWaterMark to 0.
+const hwm0 = { highWaterMark: 0 };
+
+for (const invalidSignal of [null, 'AbortSignal', true, -1, Object.create(AbortSignal.prototype)]) {
+ promise_test(t => {
+ const rs = recordingReadableStream(errorOnPull, hwm0);
+ const ws = recordingWritableStream();
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { signal: invalidSignal }), 'pipeTo should reject')
+ .then(() => {
+ assert_equals(rs.events.length, 0, 'no ReadableStream methods should have been called');
+ assert_equals(ws.events.length, 0, 'no WritableStream methods should have been called');
+ });
+ }, `a signal argument '${invalidSignal}' should cause pipeTo() to reject`);
+}
+
+promise_test(t => {
+ const rs = recordingReadableStream(errorOnPull, hwm0);
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject')
+ .then(() => Promise.all([
+ rs.getReader().closed,
+ promise_rejects_dom(t, 'AbortError', ws.getWriter().closed, 'writer.closed should reject')
+ ]))
+ .then(() => {
+ assert_equals(rs.events.length, 2, 'cancel should have been called');
+ assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
+ assert_equals(rs.events[1].name, 'AbortError', 'the argument to cancel should be an AbortError');
+ assert_equals(rs.events[1].constructor.name, 'DOMException',
+ 'the argument to cancel should be a DOMException');
+ });
+}, 'an aborted signal should cause the writable stream to reject with an AbortError');
+
+for (const reason of [null, undefined, error1]) {
+ promise_test(async t => {
+ const rs = recordingReadableStream(errorOnPull, hwm0);
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort(reason);
+ const pipeToPromise = rs.pipeTo(ws, { signal });
+ if (reason !== undefined) {
+ await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
+ } else {
+ await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
+ }
+ const error = await pipeToPromise.catch(e => e);
+ await rs.getReader().closed;
+ await promise_rejects_exactly(t, error, ws.getWriter().closed, 'the writable should be errored with the same object');
+ assert_equals(signal.reason, error, 'signal.reason should be error'),
+ assert_equals(rs.events.length, 2, 'cancel should have been called');
+ assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
+ assert_equals(rs.events[1], error, 'the readable should be canceled with the same object');
+ }, `(reason: '${reason}') all the error objects should be the same object`);
+}
+
+promise_test(t => {
+ const rs = recordingReadableStream(errorOnPull, hwm0);
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true }), 'pipeTo should reject')
+ .then(() => assert_equals(rs.events.length, 0, 'cancel should not be called'));
+}, 'preventCancel should prevent canceling the readable');
+
+promise_test(t => {
+ const rs = new ReadableStream(errorOnPull, hwm0);
+ const ws = recordingWritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventAbort: true }), 'pipeTo should reject')
+ .then(() => {
+ assert_equals(ws.events.length, 0, 'writable should not have been aborted');
+ return ws.getWriter().ready;
+ });
+}, 'preventAbort should prevent aborting the readable');
+
+promise_test(t => {
+ const rs = recordingReadableStream(errorOnPull, hwm0);
+ const ws = recordingWritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }),
+ 'pipeTo should reject')
+ .then(() => {
+ assert_equals(rs.events.length, 0, 'cancel should not be called');
+ assert_equals(ws.events.length, 0, 'writable should not have been aborted');
+ return ws.getWriter().ready;
+ });
+}, 'preventCancel and preventAbort should prevent canceling the readable and aborting the readable');
+
+for (const reason of [null, undefined, error1]) {
+ promise_test(async t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.close();
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ const ws = recordingWritableStream({
+ write() {
+ abortController.abort(reason);
+ }
+ });
+ const pipeToPromise = rs.pipeTo(ws, { signal });
+ if (reason !== undefined) {
+ await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
+ } else {
+ await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
+ }
+ const error = await pipeToPromise.catch(e => e);
+ assert_equals(signal.reason, error, 'signal.reason should be error');
+ assert_equals(ws.events.length, 4, 'only chunk "a" should have been written');
+ assert_array_equals(ws.events.slice(0, 3), ['write', 'a', 'abort'], 'events should match');
+ assert_equals(ws.events[3], error, 'abort reason should be error');
+ }, `(reason: '${reason}') abort should prevent further reads`);
+}
+
+for (const reason of [null, undefined, error1]) {
+ promise_test(async t => {
+ let readController;
+ const rs = new ReadableStream({
+ start(c) {
+ readController = c;
+ c.enqueue('a');
+ c.enqueue('b');
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ let resolveWrite;
+ const writePromise = new Promise(resolve => {
+ resolveWrite = resolve;
+ });
+ const ws = recordingWritableStream({
+ write() {
+ return writePromise;
+ }
+ }, new CountQueuingStrategy({ highWaterMark: Infinity }));
+ const pipeToPromise = rs.pipeTo(ws, { signal });
+ await delay(0);
+ await abortController.abort(reason);
+ await readController.close(); // Make sure the test terminates when signal is not implemented.
+ await resolveWrite();
+ if (reason !== undefined) {
+ await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
+ } else {
+ await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
+ }
+ const error = await pipeToPromise.catch(e => e);
+ assert_equals(signal.reason, error, 'signal.reason should be error');
+ assert_equals(ws.events.length, 6, 'chunks "a" and "b" should have been written');
+ assert_array_equals(ws.events.slice(0, 5), ['write', 'a', 'write', 'b', 'abort'], 'events should match');
+ assert_equals(ws.events[5], error, 'abort reason should be error');
+ }, `(reason: '${reason}') all pending writes should complete on abort`);
+}
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.error('failed to abort');
+ },
+ cancel() {
+ return Promise.reject(error1);
+ }
+ }, hwm0);
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+}, 'a rejection from underlyingSource.cancel() should be returned by pipeTo()');
+
+promise_test(t => {
+ const rs = new ReadableStream(errorOnPull, hwm0);
+ const ws = new WritableStream({
+ abort() {
+ return Promise.reject(error1);
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+}, 'a rejection from underlyingSink.abort() should be returned by pipeTo()');
+
+promise_test(t => {
+ const events = [];
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.error('failed to abort');
+ },
+ cancel() {
+ events.push('cancel');
+ return Promise.reject(error1);
+ }
+ }, hwm0);
+ const ws = new WritableStream({
+ abort() {
+ events.push('abort');
+ return Promise.reject(error2);
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws, { signal }), 'pipeTo should reject')
+ .then(() => assert_array_equals(events, ['abort', 'cancel'], 'abort() should be called before cancel()'));
+}, 'a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+}, 'abort signal takes priority over closed readable');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.error(error1);
+ }
+ });
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+}, 'abort signal takes priority over errored readable');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.error('failed to abort');
+ }
+ }, hwm0);
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ const writer = ws.getWriter();
+ return writer.close().then(() => {
+ writer.releaseLock();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+ });
+}, 'abort signal takes priority over closed writable');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.error('failed to abort');
+ }
+ }, hwm0);
+ const ws = new WritableStream({
+ start(controller) {
+ controller.error(error1);
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ abortController.abort();
+ return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
+}, 'abort signal takes priority over errored writable');
+
+promise_test(() => {
+ let readController;
+ const rs = new ReadableStream({
+ start(c) {
+ readController = c;
+ }
+ });
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ const pipeToPromise = rs.pipeTo(ws, { signal, preventClose: true });
+ readController.close();
+ return Promise.resolve().then(() => {
+ abortController.abort();
+ return pipeToPromise;
+ }).then(() => ws.getWriter().write('this should succeed'));
+}, 'abort should do nothing after the readable is closed');
+
+promise_test(t => {
+ let readController;
+ const rs = new ReadableStream({
+ start(c) {
+ readController = c;
+ }
+ });
+ const ws = new WritableStream();
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
+ readController.error(error1);
+ return Promise.resolve().then(() => {
+ abortController.abort();
+ return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
+ }).then(() => ws.getWriter().write('this should succeed'));
+}, 'abort should do nothing after the readable is errored');
+
+promise_test(t => {
+ let readController;
+ const rs = new ReadableStream({
+ start(c) {
+ readController = c;
+ }
+ });
+ let resolveWrite;
+ const writePromise = new Promise(resolve => {
+ resolveWrite = resolve;
+ });
+ const ws = new WritableStream({
+ write() {
+ readController.error(error1);
+ return writePromise;
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
+ readController.enqueue('a');
+ return delay(0).then(() => {
+ abortController.abort();
+ resolveWrite();
+ return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
+ }).then(() => ws.getWriter().write('this should succeed'));
+}, 'abort should do nothing after the readable is errored, even with pending writes');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ pull(controller) {
+ return delay(0).then(() => controller.close());
+ }
+ });
+ let writeController;
+ const ws = new WritableStream({
+ start(c) {
+ writeController = c;
+ }
+ });
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+ const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true });
+ return Promise.resolve().then(() => {
+ writeController.error(error1);
+ return Promise.resolve();
+ }).then(() => {
+ abortController.abort();
+ return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
+ }).then(() => {
+ assert_array_equals(rs.events, ['pull'], 'cancel should not have been called');
+ });
+}, 'abort should do nothing after the writable is errored');
+
+promise_test(async t => {
+ const rs = new ReadableStream({
+ pull(c) {
+ c.enqueue(new Uint8Array([]));
+ },
+ type: "bytes",
+ });
+ const ws = new WritableStream();
+ const [first, second] = rs.tee();
+
+ let aborted = false;
+ first.pipeTo(ws, { signal: AbortSignal.abort() }).catch(() => {
+ aborted = true;
+ });
+ await delay(0);
+ assert_true(!aborted, "pipeTo should not resolve yet");
+ await second.cancel();
+ await delay(0);
+ assert_true(aborted, "pipeTo should be aborted now");
+}, "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted");
diff --git a/testing/web-platform/tests/streams/piping/close-propagation-backward.any.js b/testing/web-platform/tests/streams/piping/close-propagation-backward.any.js
new file mode 100644
index 0000000000..5ea47ab85c
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/close-propagation-backward.any.js
@@ -0,0 +1,153 @@
+// META: global=window,worker
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return rs.pipeTo(ws).then(
+ () => assert_unreached('the promise must not fulfill'),
+ err => {
+ assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError');
+
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ }
+ );
+
+}, 'Closing must be propagated backward: starts closed; preventCancel omitted; fulfilled cancel promise');
+
+promise_test(t => {
+
+ // Our recording streams do not deal well with errors generated by the system, so give them some help
+ let recordedError;
+ const rs = recordingReadableStream({
+ cancel(cancelErr) {
+ recordedError = cancelErr;
+ throw error1;
+ }
+ });
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
+ assert_equals(recordedError.name, 'TypeError', 'the cancel reason must be a TypeError');
+
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', recordedError]);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated backward: starts closed; preventCancel omitted; rejected cancel promise');
+
+for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
+ const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
+
+ promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return rs.pipeTo(ws, { preventCancel: falsy }).then(
+ () => assert_unreached('the promise must not fulfill'),
+ err => {
+ assert_equals(err.name, 'TypeError', 'the promise must reject with a TypeError');
+
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ }
+ );
+
+ }, `Closing must be propagated backward: starts closed; preventCancel = ${stringVersion} (falsy); fulfilled cancel ` +
+ `promise`);
+}
+
+for (const truthy of [true, 'a', 1, Symbol(), { }]) {
+ promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: truthy })).then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return ws.getWriter().closed;
+ });
+
+ }, `Closing must be propagated backward: starts closed; preventCancel = ${String(truthy)} (truthy)`);
+}
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { preventCancel: true, preventAbort: true }))
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return ws.getWriter().closed;
+ });
+
+}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return promise_rejects_js(t, TypeError,
+ rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true }))
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return ws.getWriter().closed;
+ });
+
+}, 'Closing must be propagated backward: starts closed; preventCancel = true, preventAbort = true, preventClose ' +
+ '= true');
diff --git a/testing/web-platform/tests/streams/piping/close-propagation-forward.any.js b/testing/web-platform/tests/streams/piping/close-propagation-forward.any.js
new file mode 100644
index 0000000000..71b6e26284
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/close-propagation-forward.any.js
@@ -0,0 +1,589 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return rs.pipeTo(ws).then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated forward: starts closed; preventClose omitted; fulfilled close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream({
+ close() {
+ throw error1;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed)
+ ]);
+ });
+
+}, 'Closing must be propagated forward: starts closed; preventClose omitted; rejected close promise');
+
+for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
+ const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
+
+ promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return rs.pipeTo(ws, { preventClose: falsy }).then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+ }, `Closing must be propagated forward: starts closed; preventClose = ${stringVersion} (falsy); fulfilled close ` +
+ `promise`);
+}
+
+for (const truthy of [true, 'a', 1, Symbol(), { }]) {
+ promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return rs.pipeTo(ws, { preventClose: truthy }).then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return rs.getReader().closed;
+ });
+
+ }, `Closing must be propagated forward: starts closed; preventClose = ${String(truthy)} (truthy)`);
+}
+
+promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return rs.pipeTo(ws, { preventClose: true, preventAbort: true }).then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return rs.pipeTo(ws, { preventClose: true, preventAbort: true, preventCancel: true }).then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Closing must be propagated forward: starts closed; preventClose = true, preventAbort = true, preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = rs.pipeTo(ws);
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; fulfilled close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ close() {
+ throw error1;
+ }
+ });
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed)
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose omitted; rejected close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = rs.pipeTo(ws, { preventClose: true });
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; preventClose = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = rs.pipeTo(ws);
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
+ 'preventClose omitted; fulfilled close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ close() {
+ throw error1;
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed)
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
+ 'preventClose omitted; rejected close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = rs.pipeTo(ws, { preventClose: true });
+
+ t.step_timeout(() => rs.controller.close());
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Closing must be propagated forward: becomes closed asynchronously; dest never desires chunks; ' +
+ 'preventClose = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = rs.pipeTo(ws);
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.close());
+ }, 10);
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello', 'close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; fulfilled close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ close() {
+ throw error1;
+ }
+ });
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.close());
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello', 'close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed)
+ ]);
+ });
+
+}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose omitted; rejected close promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = rs.pipeTo(ws, { preventClose: true });
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.close());
+ }, 10);
+
+ return pipePromise.then(value => {
+ assert_equals(value, undefined, 'the promise must fulfill with undefined');
+ })
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Closing must be propagated forward: becomes closed after one chunk; preventClose = true');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ });
+
+ let pipeComplete = false;
+ const pipePromise = rs.pipeTo(ws).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.close();
+
+ // Flush async events and verify that no shutdown occurs.
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ws.events, ['write', 'a']); // no 'close'
+ assert_equals(pipeComplete, false, 'the pipe must not be complete');
+
+ resolveWritePromise();
+
+ return pipePromise.then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'close']);
+ });
+ });
+
+}, 'Closing must be propagated forward: shutdown must not occur until the final write completes');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ });
+
+ let pipeComplete = false;
+ const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.close();
+
+ // Flush async events and verify that no shutdown occurs.
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the chunk must have been written, but close must not have happened');
+ assert_equals(pipeComplete, false, 'the pipe must not be complete');
+
+ resolveWritePromise();
+
+ return pipePromise;
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the chunk must have been written, but close must not have happened');
+ });
+
+}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; preventClose = true');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 2 }));
+
+ let pipeComplete = false;
+ const pipePromise = rs.pipeTo(ws).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.enqueue('b');
+
+ return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the first chunk must have been written, but close must not have happened yet');
+ assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
+
+ rs.controller.close();
+ resolveWritePromise();
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'the second chunk must have been written, but close must not have happened yet');
+ assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
+
+ resolveWritePromise();
+ return pipePromise;
+ }).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close'],
+ 'all chunks must have been written and close must have happened');
+ });
+
+}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 2 }));
+
+ let pipeComplete = false;
+ const pipePromise = rs.pipeTo(ws, { preventClose: true }).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.enqueue('b');
+
+ return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the first chunk must have been written, but close must not have happened');
+ assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
+
+ rs.controller.close();
+ resolveWritePromise();
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'the second chunk must have been written, but close must not have happened');
+ assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
+
+ resolveWritePromise();
+ return pipePromise;
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'all chunks must have been written, but close must not have happened');
+ });
+
+}, 'Closing must be propagated forward: shutdown must not occur until the final write completes; becomes closed after first write; preventClose = true');
+
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.enqueue('a');
+ c.enqueue('b');
+ c.close();
+ }
+ });
+ let rejectWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ return new Promise((resolve, reject) => {
+ rejectWritePromise = reject;
+ });
+ }
+ }, { highWaterMark: 3 });
+ const pipeToPromise = rs.pipeTo(ws);
+ return delay(0).then(() => {
+ rejectWritePromise(error1);
+ return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
+ }).then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['write', 'a']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed, 'ws should be errored')
+ ]);
+ });
+}, 'Closing must be propagated forward: erroring the writable while flushing pending writes should error pipeTo');
diff --git a/testing/web-platform/tests/streams/piping/error-propagation-backward.any.js b/testing/web-platform/tests/streams/piping/error-propagation-backward.any.js
new file mode 100644
index 0000000000..ec74592f86
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/error-propagation-backward.any.js
@@ -0,0 +1,630 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+const error2 = new Error('error2!');
+error2.name = 'error2';
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: starts errored; preventCancel omitted; fulfilled cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; ' +
+ 'fulfilled cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel omitted; rejected ' +
+ 'cancel promise');
+
+for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
+ const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
+
+ promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: falsy }),
+ 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+ }, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` +
+ `${stringVersion} (falsy); fulfilled cancel promise`);
+}
+
+for (const truthy of [true, 'a', 1, Symbol(), { }]) {
+ promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: truthy }),
+ 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+ }, `Errors must be propagated backward: becomes errored before piping due to write; preventCancel = ` +
+ `${String(truthy)} (truthy)`);
+}
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true }),
+ 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping due to write, preventCancel = true; ' +
+ 'preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ write() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const writer = ws.getWriter();
+
+ return promise_rejects_exactly(t, error1, writer.write('Hello'), 'writer.write() must reject with the write error')
+ .then(() => promise_rejects_exactly(t, error1, writer.closed, 'writer.closed must reject with the write error'))
+ .then(() => {
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true, preventAbort: true, preventClose: true }),
+ 'pipeTo must reject with the write error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping due to write; preventCancel = true, ' +
+ 'preventAbort = true, preventClose = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('Hello');
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ throw error1;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; fulfilled ' +
+ 'cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('Hello');
+ },
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ throw error1;
+ }
+ });
+
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel omitted; rejected ' +
+ 'cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('Hello');
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ throw error1;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write; preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ if (ws.events.length > 2) {
+ return delay(0).then(() => {
+ throw error1;
+ });
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' +
+ 'false; fulfilled cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ },
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ if (ws.events.length > 2) {
+ return delay(0).then(() => {
+ throw error1;
+ });
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error').then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = ' +
+ 'false; rejected cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ if (ws.events.length > 2) {
+ return delay(0).then(() => {
+ throw error1;
+ });
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored during piping due to write, but async; preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; fulfilled cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; preventCancel omitted; rejected cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write(chunk) {
+ if (chunk === 'c') {
+ return Promise.reject(error1);
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error').then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' +
+ 'preventCancel omitted (but cancel is never called)');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream({
+ write(chunk) {
+ if (chunk === 'c') {
+ return Promise.reject(error1);
+ }
+ return undefined;
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c']);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping due to last write; source is closed; ' +
+ 'preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
+ 'false; fulfilled cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
+ 'false; rejected cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => ws.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated backward: becomes errored after piping; dest never desires chunks; preventCancel = ' +
+ 'true');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ ws.abort(error1);
+
+ return rs.pipeTo(ws).then(
+ () => assert_unreached('the promise must not fulfill'),
+ err => {
+ assert_equals(err, error1, 'the promise must reject with error1');
+
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
+ assert_array_equals(ws.events, ['abort', error1]);
+ }
+ );
+
+}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; fulfilled ' +
+ 'cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ cancel() {
+ throw error2;
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ ws.abort(error1);
+
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the cancel error')
+ .then(() => {
+ return ws.getWriter().closed.then(
+ () => assert_unreached('the promise must not fulfill'),
+ err => {
+ assert_equals(err, error1, 'the promise must reject with error1');
+
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', err]);
+ assert_array_equals(ws.events, ['abort', error1]);
+ }
+ );
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel omitted; rejected ' +
+ 'cancel promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ ws.abort(error1);
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventCancel: true })).then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated backward: becomes errored before piping via abort; preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+ return flushAsyncEvents();
+ }
+ });
+
+ const pipePromise = rs.pipeTo(ws);
+
+ rs.controller.enqueue('a');
+
+ return writeCalledPromise.then(() => {
+ ws.controller.error(error1);
+
+ return promise_rejects_exactly(t, error1, pipePromise);
+ }).then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
+ assert_array_equals(ws.events, ['write', 'a']);
+ });
+
+}, 'Errors must be propagated backward: erroring via the controller errors once pending write completes');
diff --git a/testing/web-platform/tests/streams/piping/error-propagation-forward.any.js b/testing/web-platform/tests/streams/piping/error-propagation-forward.any.js
new file mode 100644
index 0000000000..482da2f8a8
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/error-propagation-forward.any.js
@@ -0,0 +1,569 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+const error2 = new Error('error2!');
+error2.name = 'error2';
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: starts errored; preventAbort = false; fulfilled abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream({
+ abort() {
+ throw error2;
+ }
+ });
+
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: starts errored; preventAbort = false; rejected abort promise');
+
+for (const falsy of [undefined, null, false, +0, -0, NaN, '']) {
+ const stringVersion = Object.is(falsy, -0) ? '-0' : String(falsy);
+
+ promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: falsy }), 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+ }, `Errors must be propagated forward: starts errored; preventAbort = ${stringVersion} (falsy); fulfilled abort ` +
+ `promise`);
+}
+
+for (const truthy of [true, 'a', 1, Symbol(), { }]) {
+ promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: truthy }),
+ 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+ });
+
+ }, `Errors must be propagated forward: starts errored; preventAbort = ${String(truthy)} (truthy)`);
+}
+
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true }),
+ 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start() {
+ return Promise.reject(error1);
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true, preventCancel: true, preventClose: true }),
+ 'pipeTo must reject with the same error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated forward: starts errored; preventAbort = true, preventCancel = true, preventClose = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; fulfilled abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ abort() {
+ throw error2;
+ }
+ });
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = false; rejected abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
+ 'preventAbort = false; fulfilled abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ abort() {
+ throw error2;
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
+ 'preventAbort = false; rejected abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => rs.controller.error(error1), 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated forward: becomes errored while empty; dest never desires chunks; ' +
+ 'preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; fulfilled abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ abort() {
+ throw error2;
+ }
+ });
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello', 'abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = false; rejected abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'Hello']);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
+ 'preventAbort = false; fulfilled abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream({
+ abort() {
+ throw error2;
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the abort error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
+ 'preventAbort = false; rejected abort promise');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the same error');
+
+ t.step_timeout(() => {
+ rs.controller.enqueue('Hello');
+ t.step_timeout(() => rs.controller.error(error1), 10);
+ }, 10);
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ });
+
+}, 'Errors must be propagated forward: becomes errored after one chunk; dest never desires chunks; ' +
+ 'preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ });
+
+ let pipeComplete = false;
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+
+ return writeCalledPromise.then(() => {
+ rs.controller.error(error1);
+
+ // Flush async events and verify that no shutdown occurs.
+ return flushAsyncEvents();
+ }).then(() => {
+ assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
+ assert_equals(pipeComplete, false, 'the pipe must not be complete');
+
+ resolveWritePromise();
+
+ return pipePromise.then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'abort', error1]);
+ });
+ });
+
+}, 'Errors must be propagated forward: shutdown must not occur until the final write completes');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ });
+
+ let pipeComplete = false;
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+
+ return writeCalledPromise.then(() => {
+ rs.controller.error(error1);
+
+ // Flush async events and verify that no shutdown occurs.
+ return flushAsyncEvents();
+ }).then(() => {
+ assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
+ assert_equals(pipeComplete, false, 'the pipe must not be complete');
+
+ resolveWritePromise();
+ return pipePromise;
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a']); // no 'abort'
+ });
+
+}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; preventAbort = true');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 2 }));
+
+ let pipeComplete = false;
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws)).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.enqueue('b');
+
+ return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the first chunk must have been written, but abort must not have happened yet');
+ assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
+
+ rs.controller.error(error1);
+ resolveWritePromise();
+ return flushAsyncEvents();
+ }).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'the second chunk must have been written, but abort must not have happened yet');
+ assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
+
+ resolveWritePromise();
+ return pipePromise;
+ }).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'abort', error1],
+ 'all chunks must have been written and abort must have happened');
+ });
+
+}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write');
+
+promise_test(t => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWriteCalled;
+ const writeCalledPromise = new Promise(resolve => {
+ resolveWriteCalled = resolve;
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ resolveWriteCalled();
+
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 2 }));
+
+ let pipeComplete = false;
+ const pipePromise = promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true })).then(() => {
+ pipeComplete = true;
+ });
+
+ rs.controller.enqueue('a');
+ rs.controller.enqueue('b');
+
+ return writeCalledPromise.then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a'],
+ 'the first chunk must have been written, but abort must not have happened');
+ assert_false(pipeComplete, 'the pipe should not complete while the first write is pending');
+
+ rs.controller.error(error1);
+ resolveWritePromise();
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'the second chunk must have been written, but abort must not have happened');
+ assert_false(pipeComplete, 'the pipe should not complete while the second write is pending');
+
+ resolveWritePromise();
+ return pipePromise;
+ }).then(() => flushAsyncEvents()).then(() => {
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'all chunks must have been written, but abort must not have happened');
+ });
+
+}, 'Errors must be propagated forward: shutdown must not occur until the final write completes; becomes errored after first write; preventAbort = true');
diff --git a/testing/web-platform/tests/streams/piping/flow-control.any.js b/testing/web-platform/tests/streams/piping/flow-control.any.js
new file mode 100644
index 0000000000..09c4420f87
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/flow-control.any.js
@@ -0,0 +1,297 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/rs-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+promise_test(t => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.close();
+ }
+ });
+
+ const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ const pipePromise = rs.pipeTo(ws, { preventCancel: true });
+
+ // Wait and make sure it doesn't do any reading.
+ return flushAsyncEvents().then(() => {
+ ws.controller.error(error1);
+ })
+ .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error'))
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, []);
+ })
+ .then(() => readableStreamToArray(rs))
+ .then(chunksNotPreviouslyRead => {
+ assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
+ });
+
+}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream({
+ start(controller) {
+ controller.enqueue('b');
+ controller.close();
+ }
+ });
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ if (!resolveWritePromise) {
+ // first write
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ return undefined;
+ }
+ });
+
+ const writer = ws.getWriter();
+ const firstWritePromise = writer.write('a');
+ assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
+ writer.releaseLock();
+
+ // firstWritePromise won't settle until we call resolveWritePromise.
+
+ const pipePromise = rs.pipeTo(ws);
+
+ return flushAsyncEvents().then(() => resolveWritePromise())
+ .then(() => Promise.all([firstWritePromise, pipePromise]))
+ .then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
+ });
+
+}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');
+
+promise_test(() => {
+
+ const rs = recordingReadableStream();
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ if (!resolveWritePromise) {
+ // first write
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ return undefined;
+ }
+ });
+
+ const writer = ws.getWriter();
+ writer.write('a');
+
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ws.events, ['write', 'a']);
+ assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
+ writer.releaseLock();
+
+ const pipePromise = rs.pipeTo(ws);
+
+ rs.controller.enqueue('b');
+ resolveWritePromise();
+ rs.controller.close();
+
+ return pipePromise.then(() => {
+ assert_array_equals(rs.eventsWithoutPulls, []);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
+ });
+ });
+
+}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
+ 'stream becomes non-empty and the writable stream starts desiring chunks');
+
+promise_test(() => {
+ const unreadChunks = ['b', 'c', 'd'];
+
+ const rs = recordingReadableStream({
+ pull(controller) {
+ controller.enqueue(unreadChunks.shift());
+ if (unreadChunks.length === 0) {
+ controller.close();
+ }
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 0 }));
+
+ let resolveWritePromise;
+ const ws = recordingWritableStream({
+ write() {
+ if (!resolveWritePromise) {
+ // first write
+ return new Promise(resolve => {
+ resolveWritePromise = resolve;
+ });
+ }
+ return undefined;
+ }
+ }, new CountQueuingStrategy({ highWaterMark: 3 }));
+
+ const writer = ws.getWriter();
+ const firstWritePromise = writer.write('a');
+ assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
+ writer.releaseLock();
+
+ // firstWritePromise won't settle until we call resolveWritePromise.
+
+ const pipePromise = rs.pipeTo(ws);
+
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ws.events, ['write', 'a']);
+ assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
+ }).then(() => resolveWritePromise())
+ .then(() => Promise.all([firstWritePromise, pipePromise]))
+ .then(() => {
+ assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
+ });
+
+}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');
+
+class StepTracker {
+ constructor() {
+ this.waiters = [];
+ this.wakers = [];
+ }
+
+ // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
+ // promise is resolved.
+ waitThenAdvance(n) {
+ if (this.waiters[n] === undefined) {
+ this.waiters[n] = new Promise(resolve => {
+ this.wakers[n] = resolve;
+ });
+ this.waiters[n]
+ .then(() => flushAsyncEvents())
+ .then(() => {
+ if (this.wakers[n + 1] !== undefined) {
+ this.wakers[n + 1]();
+ }
+ });
+ }
+ if (n == 0) {
+ this.wakers[0]();
+ }
+ return this.waiters[n];
+ }
+}
+
+promise_test(() => {
+ const steps = new StepTracker();
+ const desiredSizes = [];
+ const rs = recordingReadableStream({
+ start(controller) {
+ steps.waitThenAdvance(1).then(() => enqueue('a'));
+ steps.waitThenAdvance(3).then(() => enqueue('b'));
+ steps.waitThenAdvance(5).then(() => enqueue('c'));
+ steps.waitThenAdvance(7).then(() => enqueue('d'));
+ steps.waitThenAdvance(11).then(() => controller.close());
+
+ function enqueue(chunk) {
+ controller.enqueue(chunk);
+ desiredSizes.push(controller.desiredSize);
+ }
+ }
+ });
+
+ const chunksFinishedWriting = [];
+ const writableStartPromise = Promise.resolve();
+ let writeCalled = false;
+ const ws = recordingWritableStream({
+ start() {
+ return writableStartPromise;
+ },
+ write(chunk) {
+ const waitForStep = writeCalled ? 12 : 9;
+ writeCalled = true;
+ return steps.waitThenAdvance(waitForStep).then(() => {
+ chunksFinishedWriting.push(chunk);
+ });
+ }
+ });
+
+ return writableStartPromise.then(() => {
+ const pipePromise = rs.pipeTo(ws);
+ steps.waitThenAdvance(0);
+
+ return Promise.all([
+ steps.waitThenAdvance(2).then(() => {
+ assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
+ assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');
+
+ // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
+ // promise, leaving the queue empty.
+ assert_array_equals(desiredSizes, [1],
+ 'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
+ assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
+ }),
+
+ steps.waitThenAdvance(4).then(() => {
+ assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
+ assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');
+
+ // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
+ // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
+ // had size 1 (thus desiredSize of 0).
+ assert_array_equals(desiredSizes, [1, 0],
+ 'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
+ assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
+ }),
+
+ steps.waitThenAdvance(6).then(() => {
+ assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
+ assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');
+
+ // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
+ // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
+ // -1.
+ assert_array_equals(desiredSizes, [1, 0, -1],
+ 'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
+ assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
+ }),
+
+ steps.waitThenAdvance(8).then(() => {
+ assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
+ assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');
+
+ // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
+ // and 'd'.
+ assert_array_equals(desiredSizes, [1, 0, -1, -2],
+ 'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
+ assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
+ }),
+
+ steps.waitThenAdvance(10).then(() => {
+ assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
+ 'at step 10, two chunks must have been written');
+
+ assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
+ }),
+
+ pipePromise.then(() => {
+ assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
+ assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');
+
+ assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
+ assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
+ 'all chunks were written (and the WritableStream closed)');
+ })
+ ]);
+ });
+}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');
diff --git a/testing/web-platform/tests/streams/piping/general.any.js b/testing/web-platform/tests/streams/piping/general.any.js
new file mode 100644
index 0000000000..bec3480f65
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/general.any.js
@@ -0,0 +1,211 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+test(() => {
+
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ assert_false(rs.locked, 'sanity check: the ReadableStream must not start locked');
+ assert_false(ws.locked, 'sanity check: the WritableStream must not start locked');
+
+ rs.pipeTo(ws);
+
+ assert_true(rs.locked, 'the ReadableStream must become locked');
+ assert_true(ws.locked, 'the WritableStream must become locked');
+
+}, 'Piping must lock both the ReadableStream and WritableStream');
+
+promise_test(() => {
+
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+ const ws = new WritableStream();
+
+ return rs.pipeTo(ws).then(() => {
+ assert_false(rs.locked, 'the ReadableStream must become unlocked');
+ assert_false(ws.locked, 'the WritableStream must become unlocked');
+ });
+
+}, 'Piping finishing must unlock both the ReadableStream and WritableStream');
+
+promise_test(t => {
+
+ const fakeRS = Object.create(ReadableStream.prototype);
+ const ws = new WritableStream();
+
+ return methodRejects(t, ReadableStream.prototype, 'pipeTo', fakeRS, [ws]);
+
+}, 'pipeTo must check the brand of its ReadableStream this value');
+
+promise_test(t => {
+
+ const rs = new ReadableStream();
+ const fakeWS = Object.create(WritableStream.prototype);
+
+ return methodRejects(t, ReadableStream.prototype, 'pipeTo', rs, [fakeWS]);
+
+}, 'pipeTo must check the brand of its WritableStream argument');
+
+promise_test(t => {
+
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ rs.getReader();
+
+ assert_true(rs.locked, 'sanity check: the ReadableStream starts locked');
+ assert_false(ws.locked, 'sanity check: the WritableStream does not start locked');
+
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => {
+ assert_false(ws.locked, 'the WritableStream must still be unlocked');
+ });
+
+}, 'pipeTo must fail if the ReadableStream is locked, and not lock the WritableStream');
+
+promise_test(t => {
+
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+
+ ws.getWriter();
+
+ assert_false(rs.locked, 'sanity check: the ReadableStream does not start locked');
+ assert_true(ws.locked, 'sanity check: the WritableStream starts locked');
+
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws)).then(() => {
+ assert_false(rs.locked, 'the ReadableStream must still be unlocked');
+ });
+
+}, 'pipeTo must fail if the WritableStream is locked, and not lock the ReadableStream');
+
+promise_test(() => {
+
+ const CHUNKS = 10;
+
+ const rs = new ReadableStream({
+ start(c) {
+ for (let i = 0; i < CHUNKS; ++i) {
+ c.enqueue(i);
+ }
+ c.close();
+ }
+ });
+
+ const written = [];
+ const ws = new WritableStream({
+ write(chunk) {
+ written.push(chunk);
+ },
+ close() {
+ written.push('closed');
+ }
+ }, new CountQueuingStrategy({ highWaterMark: CHUNKS }));
+
+ return rs.pipeTo(ws).then(() => {
+ const targetValues = [];
+ for (let i = 0; i < CHUNKS; ++i) {
+ targetValues.push(i);
+ }
+ targetValues.push('closed');
+
+ assert_array_equals(written, targetValues, 'the correct values must be written');
+
+ // Ensure both readable and writable are closed by the time the pipe finishes.
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+ // NOTE: no requirement on *when* the pipe finishes; that is left to implementations.
+
+}, 'Piping from a ReadableStream from which lots of chunks are synchronously readable');
+
+promise_test(t => {
+
+ let controller;
+ const rs = recordingReadableStream({
+ start(c) {
+ controller = c;
+ }
+ });
+
+ const ws = recordingWritableStream();
+
+ const pipePromise = rs.pipeTo(ws).then(() => {
+ assert_array_equals(ws.events, ['write', 'Hello', 'close']);
+ });
+
+ t.step_timeout(() => {
+ controller.enqueue('Hello');
+ t.step_timeout(() => controller.close(), 10);
+ }, 10);
+
+ return pipePromise;
+
+}, 'Piping from a ReadableStream for which a chunk becomes asynchronously readable after the pipeTo');
+
+for (const preventAbort of [true, false]) {
+ promise_test(() => {
+
+ const rs = new ReadableStream({
+ pull() {
+ return Promise.reject(undefined);
+ }
+ });
+
+ return rs.pipeTo(new WritableStream(), { preventAbort }).then(
+ () => assert_unreached('pipeTo promise should be rejected'),
+ value => assert_equals(value, undefined, 'rejection value should be undefined'));
+
+ }, `an undefined rejection from pull should cause pipeTo() to reject when preventAbort is ${preventAbort}`);
+}
+
+for (const preventCancel of [true, false]) {
+ promise_test(() => {
+
+ const rs = new ReadableStream({
+ pull(controller) {
+ controller.enqueue(0);
+ }
+ });
+
+ const ws = new WritableStream({
+ write() {
+ return Promise.reject(undefined);
+ }
+ });
+
+ return rs.pipeTo(ws, { preventCancel }).then(
+ () => assert_unreached('pipeTo promise should be rejected'),
+ value => assert_equals(value, undefined, 'rejection value should be undefined'));
+
+ }, `an undefined rejection from write should cause pipeTo() to reject when preventCancel is ${preventCancel}`);
+}
+
+promise_test(t => {
+ const rs = new ReadableStream();
+ const ws = new WritableStream();
+ return promise_rejects_js(t, TypeError, rs.pipeTo(ws, {
+ get preventAbort() {
+ ws.getWriter();
+ }
+ }), 'pipeTo should reject');
+}, 'pipeTo() should reject if an option getter grabs a writer');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+ const ws = new WritableStream();
+
+ return rs.pipeTo(ws, null);
+}, 'pipeTo() promise should resolve if null is passed');
diff --git a/testing/web-platform/tests/streams/piping/multiple-propagation.any.js b/testing/web-platform/tests/streams/piping/multiple-propagation.any.js
new file mode 100644
index 0000000000..a78652fc06
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/multiple-propagation.any.js
@@ -0,0 +1,227 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+const error1 = new Error('error1!');
+error1.name = 'error1';
+
+const error2 = new Error('error2!');
+error2.name = 'error2';
+
+function createErroredWritableStream(t) {
+ return Promise.resolve().then(() => {
+ const ws = recordingWritableStream({
+ start(c) {
+ c.error(error2);
+ }
+ });
+
+ const writer = ws.getWriter();
+ return promise_rejects_exactly(t, error2, writer.closed, 'the writable stream must be errored with error2')
+ .then(() => {
+ writer.releaseLock();
+ assert_array_equals(ws.events, []);
+ return ws;
+ });
+ });
+}
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+ const ws = recordingWritableStream({
+ start(c) {
+ c.error(error2);
+ }
+ });
+
+ // Trying to abort a stream that is erroring will give the writable's error
+ return promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return Promise.all([
+ promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
+ promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2')
+ ]);
+ });
+
+}, 'Piping from an errored readable stream to an erroring writable stream');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+
+ return createErroredWritableStream(t)
+ .then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error'))
+ .then(() => {
+ assert_array_equals(rs.events, []);
+
+ return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1');
+ });
+}, 'Piping from an errored readable stream to an errored writable stream');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+ const ws = recordingWritableStream({
+ start(c) {
+ c.error(error2);
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the readable stream\'s error')
+ .then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return Promise.all([
+ promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
+ promise_rejects_exactly(t, error2, ws.getWriter().closed, 'the writable stream must be errored with error2')
+ ]);
+ });
+
+}, 'Piping from an errored readable stream to an erroring writable stream; preventAbort = true');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+ return createErroredWritableStream(t)
+ .then(ws => promise_rejects_exactly(t, error1, rs.pipeTo(ws, { preventAbort: true }),
+ 'pipeTo must reject with the readable stream\'s error'))
+ .then(() => {
+ assert_array_equals(rs.events, []);
+
+ return promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1');
+ });
+
+}, 'Piping from an errored readable stream to an errored writable stream; preventAbort = true');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ const closePromise = writer.close();
+ writer.releaseLock();
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['abort', error1]);
+
+ return Promise.all([
+ promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
+ promise_rejects_exactly(t, error1, ws.getWriter().closed,
+ 'closed must reject with error1'),
+ promise_rejects_exactly(t, error1, closePromise,
+ 'close() must reject with error1')
+ ]);
+ });
+
+}, 'Piping from an errored readable stream to a closing writable stream');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ const closePromise = writer.close();
+ writer.releaseLock();
+
+ return flushAsyncEvents().then(() => {
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the readable stream\'s error').then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ promise_rejects_exactly(t, error1, rs.getReader().closed, 'the readable stream must be errored with error1'),
+ ws.getWriter().closed,
+ closePromise
+ ]);
+ });
+ });
+
+}, 'Piping from an errored readable stream to a closed writable stream');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.close();
+ }
+ });
+ const ws = recordingWritableStream({
+ start(c) {
+ c.error(error1);
+ }
+ });
+
+ return promise_rejects_exactly(t, error1, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error').then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, []);
+
+ return Promise.all([
+ rs.getReader().closed,
+ promise_rejects_exactly(t, error1, ws.getWriter().closed, 'the writable stream must be errored with error1')
+ ]);
+ });
+
+}, 'Piping from a closed readable stream to an erroring writable stream');
+
+promise_test(t => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.close();
+ }
+ });
+ return createErroredWritableStream(t)
+ .then(ws => promise_rejects_exactly(t, error2, rs.pipeTo(ws), 'pipeTo must reject with the writable stream\'s error'))
+ .then(() => {
+ assert_array_equals(rs.events, []);
+
+ return rs.getReader().closed;
+ });
+
+}, 'Piping from a closed readable stream to an errored writable stream');
+
+promise_test(() => {
+ const rs = recordingReadableStream({
+ start(c) {
+ c.close();
+ }
+ });
+ const ws = recordingWritableStream();
+ const writer = ws.getWriter();
+ writer.close();
+ writer.releaseLock();
+
+ return rs.pipeTo(ws).then(() => {
+ assert_array_equals(rs.events, []);
+ assert_array_equals(ws.events, ['close']);
+
+ return Promise.all([
+ rs.getReader().closed,
+ ws.getWriter().closed
+ ]);
+ });
+
+}, 'Piping from a closed readable stream to a closed writable stream');
diff --git a/testing/web-platform/tests/streams/piping/pipe-through.any.js b/testing/web-platform/tests/streams/piping/pipe-through.any.js
new file mode 100644
index 0000000000..26b1cd26a3
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/pipe-through.any.js
@@ -0,0 +1,331 @@
+// META: global=window,worker
+// META: script=../resources/rs-utils.js
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+function duckTypedPassThroughTransform() {
+ let enqueueInReadable;
+ let closeReadable;
+
+ return {
+ writable: new WritableStream({
+ write(chunk) {
+ enqueueInReadable(chunk);
+ },
+
+ close() {
+ closeReadable();
+ }
+ }),
+
+ readable: new ReadableStream({
+ start(c) {
+ enqueueInReadable = c.enqueue.bind(c);
+ closeReadable = c.close.bind(c);
+ }
+ })
+ };
+}
+
+function uninterestingReadableWritablePair() {
+ return { writable: new WritableStream(), readable: new ReadableStream() };
+}
+
+promise_test(() => {
+ const readableEnd = sequentialReadableStream(5).pipeThrough(duckTypedPassThroughTransform());
+
+ return readableStreamToArray(readableEnd).then(chunks =>
+ assert_array_equals(chunks, [1, 2, 3, 4, 5]), 'chunks should match');
+}, 'Piping through a duck-typed pass-through transform stream should work');
+
+promise_test(() => {
+ const transform = {
+ writable: new WritableStream({
+ start(c) {
+ c.error(new Error('this rejection should not be reported as unhandled'));
+ }
+ }),
+ readable: new ReadableStream()
+ };
+
+ sequentialReadableStream(5).pipeThrough(transform);
+
+ // The test harness should complain about unhandled rejections by then.
+ return flushAsyncEvents();
+
+}, 'Piping through a transform errored on the writable end does not cause an unhandled promise rejection');
+
+test(() => {
+ let calledPipeTo = false;
+ class BadReadableStream extends ReadableStream {
+ pipeTo() {
+ calledPipeTo = true;
+ }
+ }
+
+ const brs = new BadReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+ const readable = new ReadableStream();
+ const writable = new WritableStream();
+ const result = brs.pipeThrough({ readable, writable });
+
+ assert_false(calledPipeTo, 'the overridden pipeTo should not have been called');
+ assert_equals(result, readable, 'return value should be the passed readable property');
+}, 'pipeThrough should not call pipeTo on this');
+
+test(t => {
+ let calledFakePipeTo = false;
+ const realPipeTo = ReadableStream.prototype.pipeTo;
+ t.add_cleanup(() => {
+ ReadableStream.prototype.pipeTo = realPipeTo;
+ });
+ ReadableStream.prototype.pipeTo = () => {
+ calledFakePipeTo = true;
+ };
+ const rs = new ReadableStream();
+ const readable = new ReadableStream();
+ const writable = new WritableStream();
+ const result = rs.pipeThrough({ readable, writable });
+
+ assert_false(calledFakePipeTo, 'the monkey-patched pipeTo should not have been called');
+ assert_equals(result, readable, 'return value should be the passed readable property');
+
+}, 'pipeThrough should not call pipeTo on the ReadableStream prototype');
+
+const badReadables = [null, undefined, 0, NaN, true, 'ReadableStream', Object.create(ReadableStream.prototype)];
+for (const readable of badReadables) {
+ test(() => {
+ assert_throws_js(TypeError,
+ ReadableStream.prototype.pipeThrough.bind(readable, uninterestingReadableWritablePair()),
+ 'pipeThrough should throw');
+ }, `pipeThrough should brand-check this and not allow '${readable}'`);
+
+ test(() => {
+ const rs = new ReadableStream();
+ let writableGetterCalled = false;
+ assert_throws_js(
+ TypeError,
+ () => rs.pipeThrough({
+ get writable() {
+ writableGetterCalled = true;
+ return new WritableStream();
+ },
+ readable
+ }),
+ 'pipeThrough should brand-check readable'
+ );
+ assert_false(writableGetterCalled, 'writable should not have been accessed');
+ }, `pipeThrough should brand-check readable and not allow '${readable}'`);
+}
+
+const badWritables = [null, undefined, 0, NaN, true, 'WritableStream', Object.create(WritableStream.prototype)];
+for (const writable of badWritables) {
+ test(() => {
+ const rs = new ReadableStream({
+ start(c) {
+ c.close();
+ }
+ });
+ let readableGetterCalled = false;
+ assert_throws_js(TypeError, () => rs.pipeThrough({
+ get readable() {
+ readableGetterCalled = true;
+ return new ReadableStream();
+ },
+ writable
+ }),
+ 'pipeThrough should brand-check writable');
+ assert_true(readableGetterCalled, 'readable should have been accessed');
+ }, `pipeThrough should brand-check writable and not allow '${writable}'`);
+}
+
+test(t => {
+ const error = new Error();
+ error.name = 'custom';
+
+ const rs = new ReadableStream({
+ pull: t.unreached_func('pull should not be called')
+ }, { highWaterMark: 0 });
+
+ const throwingWritable = {
+ readable: rs,
+ get writable() {
+ throw error;
+ }
+ };
+ assert_throws_exactly(error,
+ () => ReadableStream.prototype.pipeThrough.call(rs, throwingWritable, {}),
+ 'pipeThrough should rethrow the error thrown by the writable getter');
+
+ const throwingReadable = {
+ get readable() {
+ throw error;
+ },
+ writable: {}
+ };
+ assert_throws_exactly(error,
+ () => ReadableStream.prototype.pipeThrough.call(rs, throwingReadable, {}),
+ 'pipeThrough should rethrow the error thrown by the readable getter');
+
+}, 'pipeThrough should rethrow errors from accessing readable or writable');
+
+const badSignals = [null, 0, NaN, true, 'AbortSignal', Object.create(AbortSignal.prototype)];
+for (const signal of badSignals) {
+ test(() => {
+ const rs = new ReadableStream();
+ assert_throws_js(TypeError, () => rs.pipeThrough(uninterestingReadableWritablePair(), { signal }),
+ 'pipeThrough should throw');
+ }, `invalid values of signal should throw; specifically '${signal}'`);
+}
+
+test(() => {
+ const rs = new ReadableStream();
+ const controller = new AbortController();
+ const signal = controller.signal;
+ rs.pipeThrough(uninterestingReadableWritablePair(), { signal });
+}, 'pipeThrough should accept a real AbortSignal');
+
+test(() => {
+ const rs = new ReadableStream();
+ rs.getReader();
+ assert_throws_js(TypeError, () => rs.pipeThrough(uninterestingReadableWritablePair()),
+ 'pipeThrough should throw');
+}, 'pipeThrough should throw if this is locked');
+
+test(() => {
+ const rs = new ReadableStream();
+ const writable = new WritableStream();
+ const readable = new ReadableStream();
+ writable.getWriter();
+ assert_throws_js(TypeError, () => rs.pipeThrough({writable, readable}),
+ 'pipeThrough should throw');
+}, 'pipeThrough should throw if writable is locked');
+
+test(() => {
+ const rs = new ReadableStream();
+ const writable = new WritableStream();
+ const readable = new ReadableStream();
+ readable.getReader();
+ assert_equals(rs.pipeThrough({ writable, readable }), readable,
+ 'pipeThrough should not throw');
+}, 'pipeThrough should not care if readable is locked');
+
+promise_test(() => {
+ const rs = recordingReadableStream();
+ const writable = new WritableStream({
+ start(controller) {
+ controller.error();
+ }
+ });
+ const readable = new ReadableStream();
+ rs.pipeThrough({ writable, readable }, { preventCancel: true });
+ return flushAsyncEvents(0).then(() => {
+ assert_array_equals(rs.events, ['pull'], 'cancel should not have been called');
+ });
+}, 'preventCancel should work');
+
+promise_test(() => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.close();
+ }
+ });
+ const writable = recordingWritableStream();
+ const readable = new ReadableStream();
+ rs.pipeThrough({ writable, readable }, { preventClose: true });
+ return flushAsyncEvents(0).then(() => {
+ assert_array_equals(writable.events, [], 'writable should not be closed');
+ });
+}, 'preventClose should work');
+
+promise_test(() => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.error();
+ }
+ });
+ const writable = recordingWritableStream();
+ const readable = new ReadableStream();
+ rs.pipeThrough({ writable, readable }, { preventAbort: true });
+ return flushAsyncEvents(0).then(() => {
+ assert_array_equals(writable.events, [], 'writable should not be aborted');
+ });
+}, 'preventAbort should work');
+
+test(() => {
+ const rs = new ReadableStream();
+ const readable = new ReadableStream();
+ const writable = new WritableStream();
+ assert_throws_js(TypeError, () => rs.pipeThrough({readable, writable}, {
+ get preventAbort() {
+ writable.getWriter();
+ }
+ }), 'pipeThrough should throw');
+}, 'pipeThrough() should throw if an option getter grabs a writer');
+
+test(() => {
+ const rs = new ReadableStream();
+ const readable = new ReadableStream();
+ const writable = new WritableStream();
+ rs.pipeThrough({readable, writable}, null);
+}, 'pipeThrough() should not throw if option is null');
+
+test(() => {
+ const rs = new ReadableStream();
+ const readable = new ReadableStream();
+ const writable = new WritableStream();
+ rs.pipeThrough({readable, writable}, {signal:undefined});
+}, 'pipeThrough() should not throw if signal is undefined');
+
+function tryPipeThrough(pair, options)
+{
+ const rs = new ReadableStream();
+ if (!pair)
+ pair = {readable:new ReadableStream(), writable:new WritableStream()};
+ try {
+ rs.pipeThrough(pair, options)
+ } catch (e) {
+ return e;
+ }
+}
+
+test(() => {
+ let result = tryPipeThrough({
+ get readable() {
+ return new ReadableStream();
+ },
+ get writable() {
+ throw "writable threw";
+ }
+ }, { });
+ assert_equals(result, "writable threw");
+
+ result = tryPipeThrough({
+ get readable() {
+ throw "readable threw";
+ },
+ get writable() {
+ throw "writable threw";
+ }
+ }, { });
+ assert_equals(result, "readable threw");
+
+ result = tryPipeThrough({
+ get readable() {
+ throw "readable threw";
+ },
+ get writable() {
+ throw "writable threw";
+ }
+ }, {
+ get preventAbort() {
+ throw "preventAbort threw";
+ }
+ });
+ assert_equals(result, "readable threw");
+
+}, 'pipeThrough() should throw if readable/writable getters throw');
diff --git a/testing/web-platform/tests/streams/piping/then-interception.any.js b/testing/web-platform/tests/streams/piping/then-interception.any.js
new file mode 100644
index 0000000000..543f916d94
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/then-interception.any.js
@@ -0,0 +1,68 @@
+// META: global=window,worker
+// META: script=../resources/test-utils.js
+// META: script=../resources/recording-streams.js
+'use strict';
+
+function interceptThen() {
+ const intercepted = [];
+ let callCount = 0;
+ Object.prototype.then = function(resolver) {
+ if (!this.done) {
+ intercepted.push(this.value);
+ }
+ const retval = Object.create(null);
+ retval.done = ++callCount === 3;
+ retval.value = callCount;
+ resolver(retval);
+ if (retval.done) {
+ delete Object.prototype.then;
+ }
+ }
+ return intercepted;
+}
+
+promise_test(async t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.close();
+ }
+ });
+ const ws = recordingWritableStream();
+
+ const intercepted = interceptThen();
+ t.add_cleanup(() => {
+ delete Object.prototype.then;
+ });
+
+ await rs.pipeTo(ws);
+ delete Object.prototype.then;
+
+
+ assert_array_equals(intercepted, [], 'nothing should have been intercepted');
+ assert_array_equals(ws.events, ['write', 'a', 'close'], 'written chunk should be "a"');
+}, 'piping should not be observable');
+
+promise_test(async t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.close();
+ }
+ });
+ const ws = recordingWritableStream();
+
+ const [ branch1, branch2 ] = rs.tee();
+
+ const intercepted = interceptThen();
+ t.add_cleanup(() => {
+ delete Object.prototype.then;
+ });
+
+ await branch1.pipeTo(ws);
+ delete Object.prototype.then;
+ branch2.cancel();
+
+ assert_array_equals(intercepted, [], 'nothing should have been intercepted');
+ assert_array_equals(ws.events, ['write', 'a', 'close'], 'written chunk should be "a"');
+}, 'tee should not be observable');
diff --git a/testing/web-platform/tests/streams/piping/throwing-options.any.js b/testing/web-platform/tests/streams/piping/throwing-options.any.js
new file mode 100644
index 0000000000..b9f906778f
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/throwing-options.any.js
@@ -0,0 +1,65 @@
+// META: global=window,worker
+'use strict';
+
+class ThrowingOptions {
+ constructor(whatShouldThrow) {
+ this.whatShouldThrow = whatShouldThrow;
+ this.touched = [];
+ }
+
+ get preventClose() {
+ this.maybeThrow('preventClose');
+ return false;
+ }
+
+ get preventAbort() {
+ this.maybeThrow('preventAbort');
+ return false;
+ }
+
+ get preventCancel() {
+ this.maybeThrow('preventCancel');
+ return false;
+ }
+
+ get signal() {
+ this.maybeThrow('signal');
+ return undefined;
+ }
+
+ maybeThrow(forWhat) {
+ this.touched.push(forWhat);
+ if (this.whatShouldThrow === forWhat) {
+ throw new Error(this.whatShouldThrow);
+ }
+ }
+}
+
+const checkOrder = ['preventAbort', 'preventCancel', 'preventClose', 'signal'];
+
+for (let i = 0; i < checkOrder.length; ++i) {
+ const whatShouldThrow = checkOrder[i];
+ const whatShouldBeTouched = checkOrder.slice(0, i + 1);
+
+ promise_test(t => {
+ const options = new ThrowingOptions(whatShouldThrow);
+ return promise_rejects_js(
+ t, Error,
+ new ReadableStream().pipeTo(new WritableStream(), options),
+ 'pipeTo should reject')
+ .then(() => assert_array_equals(
+ options.touched, whatShouldBeTouched,
+ 'options should be touched in the right order'));
+ }, `pipeTo should stop after getting ${whatShouldThrow} throws`);
+
+ test(() => {
+ const options = new ThrowingOptions(whatShouldThrow);
+ assert_throws_js(
+ Error,
+ () => new ReadableStream().pipeThrough(new TransformStream(), options),
+ 'pipeThrough should throw');
+ assert_array_equals(
+ options.touched, whatShouldBeTouched,
+ 'options should be touched in the right order');
+ }, `pipeThrough should stop after getting ${whatShouldThrow} throws`);
+}
diff --git a/testing/web-platform/tests/streams/piping/transform-streams.any.js b/testing/web-platform/tests/streams/piping/transform-streams.any.js
new file mode 100644
index 0000000000..caae9fbad8
--- /dev/null
+++ b/testing/web-platform/tests/streams/piping/transform-streams.any.js
@@ -0,0 +1,22 @@
+// META: global=window,worker
+'use strict';
+
+promise_test(() => {
+ const rs = new ReadableStream({
+ start(c) {
+ c.enqueue('a');
+ c.enqueue('b');
+ c.enqueue('c');
+ c.close();
+ }
+ });
+
+ const ts = new TransformStream();
+
+ const ws = new WritableStream();
+
+ return rs.pipeThrough(ts).pipeTo(ws).then(() => {
+ const writer = ws.getWriter();
+ return writer.closed;
+ });
+}, 'Piping through an identity transform stream should close the destination when the source closes');