diff options
Diffstat (limited to 'testing/web-platform/tests/streams/transform-streams/backpressure.any.js')
-rw-r--r-- | testing/web-platform/tests/streams/transform-streams/backpressure.any.js | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/testing/web-platform/tests/streams/transform-streams/backpressure.any.js b/testing/web-platform/tests/streams/transform-streams/backpressure.any.js new file mode 100644 index 0000000000..6befba41b7 --- /dev/null +++ b/testing/web-platform/tests/streams/transform-streams/backpressure.any.js @@ -0,0 +1,195 @@ +// META: global=window,worker +// META: script=../resources/recording-streams.js +// META: script=../resources/test-utils.js +'use strict'; + +const error1 = new Error('error1 message'); +error1.name = 'error1'; + +promise_test(() => { + const ts = recordingTransformStream(); + const writer = ts.writable.getWriter(); + // This call never resolves. + writer.write('a'); + return flushAsyncEvents().then(() => { + assert_array_equals(ts.events, [], 'transform should not be called'); + }); +}, 'backpressure allows no transforms with a default identity transform and no reader'); + +promise_test(() => { + const ts = recordingTransformStream({}, undefined, { highWaterMark: 1 }); + const writer = ts.writable.getWriter(); + // This call to write() resolves asynchronously. + writer.write('a'); + // This call to write() waits for backpressure that is never relieved and never calls transform(). + writer.write('b'); + return flushAsyncEvents().then(() => { + assert_array_equals(ts.events, ['transform', 'a'], 'transform should be called once'); + }); +}, 'backpressure only allows one transform() with a identity transform with a readable HWM of 1 and no reader'); + +promise_test(() => { + // Without a transform() implementation, recordingTransformStream() never enqueues anything. + const ts = recordingTransformStream({ + transform() { + // Discard all chunks. As a result, the readable side is never full enough to exert backpressure and transform() + // keeps being called. + } + }, undefined, { highWaterMark: 1 }); + const writer = ts.writable.getWriter(); + const writePromises = []; + for (let i = 0; i < 4; ++i) { + writePromises.push(writer.write(i)); + } + return Promise.all(writePromises).then(() => { + assert_array_equals(ts.events, ['transform', 0, 'transform', 1, 'transform', 2, 'transform', 3], + 'all 4 events should be transformed'); + }); +}, 'transform() should keep being called as long as there is no backpressure'); + +promise_test(() => { + const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + const events = []; + const writerPromises = [ + writer.write('a').then(() => events.push('a')), + writer.write('b').then(() => events.push('b')), + writer.close().then(() => events.push('closed'))]; + return delay(0).then(() => { + assert_array_equals(events, ['a'], 'the first write should have resolved'); + return reader.read(); + }).then(({ value, done }) => { + assert_false(done, 'done should not be true'); + assert_equals('a', value, 'value should be "a"'); + return delay(0); + }).then(() => { + assert_array_equals(events, ['a', 'b', 'closed'], 'both writes and close() should have resolved'); + return reader.read(); + }).then(({ value, done }) => { + assert_false(done, 'done should still not be true'); + assert_equals('b', value, 'value should be "b"'); + return reader.read(); + }).then(({ done }) => { + assert_true(done, 'done should be true'); + return writerPromises; + }); +}, 'writes should resolve as soon as transform completes'); + +promise_test(() => { + const ts = new TransformStream(undefined, undefined, { highWaterMark: 0 }); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + const readPromise = reader.read(); + writer.write('a'); + return readPromise.then(({ value, done }) => { + assert_false(done, 'not done'); + assert_equals(value, 'a', 'value should be "a"'); + }); +}, 'calling pull() before the first write() with backpressure should work'); + +promise_test(() => { + let reader; + const ts = recordingTransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk); + return reader.read(); + } + }, undefined, { highWaterMark: 1 }); + const writer = ts.writable.getWriter(); + reader = ts.readable.getReader(); + return writer.write('a'); +}, 'transform() should be able to read the chunk it just enqueued'); + +promise_test(() => { + let resolveTransform; + const transformPromise = new Promise(resolve => { + resolveTransform = resolve; + }); + const ts = recordingTransformStream({ + transform() { + return transformPromise; + } + }, undefined, new CountQueuingStrategy({ highWaterMark: Infinity })); + const writer = ts.writable.getWriter(); + assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); + return delay(0).then(() => { + writer.write('a'); + assert_array_equals(ts.events, ['transform', 'a']); + assert_equals(writer.desiredSize, 0, 'desiredSize should be 0'); + return flushAsyncEvents(); + }).then(() => { + assert_equals(writer.desiredSize, 0, 'desiredSize should still be 0'); + resolveTransform(); + return delay(0); + }).then(() => { + assert_equals(writer.desiredSize, 1, 'desiredSize should be 1'); + }); +}, 'blocking transform() should cause backpressure'); + +promise_test(t => { + const ts = new TransformStream(); + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); +}, 'writer.closed should resolve after readable is canceled during start'); + +promise_test(t => { + const ts = new TransformStream({}, undefined, { highWaterMark: 0 }); + return delay(0).then(() => { + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); + }); +}, 'writer.closed should resolve after readable is canceled with backpressure'); + +promise_test(t => { + const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); + return delay(0).then(() => { + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, ts.writable.getWriter().closed, 'closed should reject'); + }); +}, 'writer.closed should resolve after readable is canceled with no backpressure'); + +promise_test(() => { + const ts = new TransformStream({}, undefined, { highWaterMark: 1 }); + const writer = ts.writable.getWriter(); + return delay(0).then(() => { + const writePromise = writer.write('a'); + ts.readable.cancel(error1); + return writePromise; + }); +}, 'cancelling the readable should cause a pending write to resolve'); + +promise_test(t => { + const rs = new ReadableStream(); + const ts = new TransformStream(); + const pipePromise = rs.pipeTo(ts.writable); + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); +}, 'cancelling the readable side of a TransformStream should abort an empty pipe'); + +promise_test(t => { + const rs = new ReadableStream(); + const ts = new TransformStream(); + const pipePromise = rs.pipeTo(ts.writable); + return delay(0).then(() => { + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); + }); +}, 'cancelling the readable side of a TransformStream should abort an empty pipe after startup'); + +promise_test(t => { + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.enqueue('c'); + } + }); + const ts = new TransformStream(); + const pipePromise = rs.pipeTo(ts.writable); + // Allow data to flow into the pipe. + return delay(0).then(() => { + ts.readable.cancel(error1); + return promise_rejects_exactly(t, error1, pipePromise, 'promise returned from pipeTo() should be rejected'); + }); +}, 'cancelling the readable side of a TransformStream should abort a full pipe'); |