summaryrefslogtreecommitdiffstats
path: root/testing/web-platform/tests/streams/transform-streams/backpressure.any.js
diff options
context:
space:
mode:
Diffstat (limited to 'testing/web-platform/tests/streams/transform-streams/backpressure.any.js')
-rw-r--r--testing/web-platform/tests/streams/transform-streams/backpressure.any.js195
1 files changed, 195 insertions, 0 deletions
diff --git a/testing/web-platform/tests/streams/transform-streams/backpressure.any.js b/testing/web-platform/tests/streams/transform-streams/backpressure.any.js
new file mode 100644
index 0000000000..6befba41b7
--- /dev/null
+++ b/testing/web-platform/tests/streams/transform-streams/backpressure.any.js
@@ -0,0 +1,195 @@
+// META: global=window,worker
+// META: script=../resources/recording-streams.js
+// META: script=../resources/test-utils.js
+'use strict';
+
+const error1 = new Error('error1 message');
+error1.name = 'error1';
+
+promise_test(() => {
+ const ts = recordingTransformStream();
+ const writer = ts.writable.getWriter();
+ // This call never resolves.
+ writer.write('a');
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ts.events, [], 'transform should not be called');
+ });
+}, 'backpressure allows no transforms with a default identity transform and no reader');
+
+promise_test(() => {
+ const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 });
+ const writer = ts.writable.getWriter();
+ // This call to write() resolves asynchronously.
+ writer.write('a');
+ // This call to write() waits for backpressure that is never relieved and never calls transform().
+ writer.write('b');
+ return flushAsyncEvents().then(() => {
+ assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once');
+ });
+}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader');
+
+promise_test(() => {
+ // Without a transform() implementation, recordingTransformStream() never enqueues anything.
+ const ts = recordingTransformStream({
+ transform() {
+ // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform()
+ // keeps being called.
+ }
+ }, undefined, { highWaterMark: 1 });
+ const writer = ts.writable.getWriter();
+ const writePromises = [];
+ for (let i = 0; i < 4; ++i) {
+ writePromises.push(writer.write(i));
+ }
+ return Promise.all(writePromises).then(() => {
+ assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3],
+ 'all 4 events should be transformed');
+ });
+}, 'transform() should keep being called as long as there is no backpressure');
+
+promise_test(() => {
+ const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
+ const writer = ts.writable.getWriter();
+ const reader = ts.readable.getReader();
+ const events = [];
+ const writerPromises = [
+ writer.write('a').then(() => events.push('a')),
+ writer.write('b').then(() => events.push('b')),
+ writer.close().then(() => events.push('closed'))];
+ return delay(0).then(() => {
+ assert_array_equals(events, ['a'], 'the first write should have resolved');
+ return reader.read();
+ }).then(({ value, done }) => {
+ assert_false(done, 'done should not be true');
+ assert_equals('a', value, 'value should be "a"');
+ return delay(0);
+ }).then(() => {
+ assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved');
+ return reader.read();
+ }).then(({ value, done }) => {
+ assert_false(done, 'done should still not be true');
+ assert_equals('b', value, 'value should be "b"');
+ return reader.read();
+ }).then(({ done }) => {
+ assert_true(done, 'done should be true');
+ return writerPromises;
+ });
+}, 'writes should resolve as soon as transform completes');
+
+promise_test(() => {
+ const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 });
+ const writer = ts.writable.getWriter();
+ const reader = ts.readable.getReader();
+ const readPromise = reader.read();
+ writer.write('a');
+ return readPromise.then(({ value, done }) => {
+ assert_false(done, 'not done');
+ assert_equals(value, 'a', 'value should be "a"');
+ });
+}, 'calling pull() before the first write() with backpressure should work');
+
+promise_test(() => {
+ let reader;
+ const ts = recordingTransformStream({
+ transform(chunk, controller) {
+ controller.enqueue(chunk);
+ return reader.read();
+ }
+ }, undefined, { highWaterMark: 1 });
+ const writer = ts.writable.getWriter();
+ reader = ts.readable.getReader();
+ return writer.write('a');
+}, 'transform() should be able to read the chunk it just enqueued');
+
+promise_test(() => {
+ let resolveTransform;
+ const transformPromise = new Promise(resolve => {
+ resolveTransform = resolve;
+ });
+ const ts = recordingTransformStream({
+ transform() {
+ return transformPromise;
+ }
+ }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity }));
+ const writer = ts.writable.getWriter();
+ assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
+ return delay(0).then(() => {
+ writer.write('a');
+ assert_array_equals(ts.events, ['transform', 'a']);
+ assert_equals(writer.desiredSize, 0, 'desiredSize should be 0');
+ return flushAsyncEvents();
+ }).then(() => {
+ assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0');
+ resolveTransform();
+ return delay(0);
+ }).then(() => {
+ assert_equals(writer.desiredSize, 1, 'desiredSize should be 1');
+ });
+}, 'blocking transform() should cause backpressure');
+
+promise_test(t => {
+ const ts = new TransformStream();
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
+}, 'writer.closed should resolve after readable is canceled during start');
+
+promise_test(t => {
+ const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
+ return delay(0).then(() => {
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
+ });
+}, 'writer.closed should resolve after readable is canceled with backpressure');
+
+promise_test(t => {
+ const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
+ return delay(0).then(() => {
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject');
+ });
+}, 'writer.closed should resolve after readable is canceled with no backpressure');
+
+promise_test(() => {
+ const ts = new TransformStream({}, undefined, { highWaterMark: 1 });
+ const writer = ts.writable.getWriter();
+ return delay(0).then(() => {
+ const writePromise = writer.write('a');
+ ts.readable.cancel(error1);
+ return writePromise;
+ });
+}, 'cancelling the readable should cause a pending write to resolve');
+
+promise_test(t => {
+ const rs = new ReadableStream();
+ const ts = new TransformStream();
+ const pipePromise = rs.pipeTo(ts.writable);
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
+}, 'cancelling the readable side of a TransformStream should abort an empty pipe');
+
+promise_test(t => {
+ const rs = new ReadableStream();
+ const ts = new TransformStream();
+ const pipePromise = rs.pipeTo(ts.writable);
+ return delay(0).then(() => {
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
+ });
+}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup');
+
+promise_test(t => {
+ const rs = new ReadableStream({
+ start(controller) {
+ controller.enqueue('a');
+ controller.enqueue('b');
+ controller.enqueue('c');
+ }
+ });
+ const ts = new TransformStream();
+ const pipePromise = rs.pipeTo(ts.writable);
+ // Allow data to flow into the pipe.
+ return delay(0).then(() => {
+ ts.readable.cancel(error1);
+ return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected');
+ });
+}, 'cancelling the readable side of a TransformStream should abort a full pipe');