diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /testing/web-platform/tests/streams/piping/flow-control.any.js | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'testing/web-platform/tests/streams/piping/flow-control.any.js')
-rw-r--r-- | testing/web-platform/tests/streams/piping/flow-control.any.js | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/testing/web-platform/tests/streams/piping/flow-control.any.js b/testing/web-platform/tests/streams/piping/flow-control.any.js new file mode 100644 index 0000000000..09c4420f87 --- /dev/null +++ b/testing/web-platform/tests/streams/piping/flow-control.any.js @@ -0,0 +1,297 @@ +// META: global=window,worker +// META: script=../resources/test-utils.js +// META: script=../resources/rs-utils.js +// META: script=../resources/recording-streams.js +'use strict'; + +const error1 = new Error('error1!'); +error1.name = 'error1'; + +promise_test(t => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('a'); + controller.enqueue('b'); + controller.close(); + } + }); + + const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); + + const pipePromise = rs.pipeTo(ws, { preventCancel: true }); + + // Wait and make sure it doesn't do any reading. + return flushAsyncEvents().then(() => { + ws.controller.error(error1); + }) + .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error')) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, []); + }) + .then(() => readableStreamToArray(rs)) + .then(chunksNotPreviouslyRead => { + assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']); + }); + +}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks'); + +promise_test(() => { + + const rs = recordingReadableStream({ + start(controller) { + controller.enqueue('b'); + controller.close(); + } + }); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }); + + const writer = ws.getWriter(); + const firstWritePromise = writer.write('a'); + assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); + writer.releaseLock(); + + // firstWritePromise won't settle until we call resolveWritePromise. + + const pipePromise = rs.pipeTo(ws); + + return flushAsyncEvents().then(() => resolveWritePromise()) + .then(() => Promise.all([firstWritePromise, pipePromise])) + .then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); + }); + +}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does'); + +promise_test(() => { + + const rs = recordingReadableStream(); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }); + + const writer = ws.getWriter(); + writer.write('a'); + + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a']); + assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); + writer.releaseLock(); + + const pipePromise = rs.pipeTo(ws); + + rs.controller.enqueue('b'); + resolveWritePromise(); + rs.controller.close(); + + return pipePromise.then(() => { + assert_array_equals(rs.eventsWithoutPulls, []); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); + }); + }); + +}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' + + 'stream becomes non-empty and the writable stream starts desiring chunks'); + +promise_test(() => { + const unreadChunks = ['b', 'c', 'd']; + + const rs = recordingReadableStream({ + pull(controller) { + controller.enqueue(unreadChunks.shift()); + if (unreadChunks.length === 0) { + controller.close(); + } + } + }, new CountQueuingStrategy({ highWaterMark: 0 })); + + let resolveWritePromise; + const ws = recordingWritableStream({ + write() { + if (!resolveWritePromise) { + // first write + return new Promise(resolve => { + resolveWritePromise = resolve; + }); + } + return undefined; + } + }, new CountQueuingStrategy({ highWaterMark: 3 })); + + const writer = ws.getWriter(); + const firstWritePromise = writer.write('a'); + assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2'); + writer.releaseLock(); + + // firstWritePromise won't settle until we call resolveWritePromise. + + const pipePromise = rs.pipeTo(ws); + + return flushAsyncEvents().then(() => { + assert_array_equals(ws.events, ['write', 'a']); + assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached'); + }).then(() => resolveWritePromise()) + .then(() => Promise.all([firstWritePromise, pipePromise])) + .then(() => { + assert_array_equals(rs.events, ['pull', 'pull', 'pull']); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']); + }); + +}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones'); + +class StepTracker { + constructor() { + this.waiters = []; + this.wakers = []; + } + + // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the + // promise is resolved. + waitThenAdvance(n) { + if (this.waiters[n] === undefined) { + this.waiters[n] = new Promise(resolve => { + this.wakers[n] = resolve; + }); + this.waiters[n] + .then(() => flushAsyncEvents()) + .then(() => { + if (this.wakers[n + 1] !== undefined) { + this.wakers[n + 1](); + } + }); + } + if (n == 0) { + this.wakers[0](); + } + return this.waiters[n]; + } +} + +promise_test(() => { + const steps = new StepTracker(); + const desiredSizes = []; + const rs = recordingReadableStream({ + start(controller) { + steps.waitThenAdvance(1).then(() => enqueue('a')); + steps.waitThenAdvance(3).then(() => enqueue('b')); + steps.waitThenAdvance(5).then(() => enqueue('c')); + steps.waitThenAdvance(7).then(() => enqueue('d')); + steps.waitThenAdvance(11).then(() => controller.close()); + + function enqueue(chunk) { + controller.enqueue(chunk); + desiredSizes.push(controller.desiredSize); + } + } + }); + + const chunksFinishedWriting = []; + const writableStartPromise = Promise.resolve(); + let writeCalled = false; + const ws = recordingWritableStream({ + start() { + return writableStartPromise; + }, + write(chunk) { + const waitForStep = writeCalled ? 12 : 9; + writeCalled = true; + return steps.waitThenAdvance(waitForStep).then(() => { + chunksFinishedWriting.push(chunk); + }); + } + }); + + return writableStartPromise.then(() => { + const pipePromise = rs.pipeTo(ws); + steps.waitThenAdvance(0); + + return Promise.all([ + steps.waitThenAdvance(2).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written'); + + // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request + // promise, leaving the queue empty. + assert_array_equals(desiredSizes, [1], + 'at step 2, the desiredSize at the last enqueue (step 1) must have been 1'); + assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1'); + }), + + steps.waitThenAdvance(4).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written'); + + // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at + // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue + // had size 1 (thus desiredSize of 0). + assert_array_equals(desiredSizes, [1, 0], + 'at step 4, the desiredSize at the last enqueue (step 3) must have been 0'); + assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0'); + }), + + steps.waitThenAdvance(6).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written'); + + // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until + // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of + // -1. + assert_array_equals(desiredSizes, [1, 0, -1], + 'at step 6, the desiredSize at the last enqueue (step 5) must have been -1'); + assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1'); + }), + + steps.waitThenAdvance(8).then(() => { + assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing'); + assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written'); + + // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c', + // and 'd'. + assert_array_equals(desiredSizes, [1, 0, -1, -2], + 'at step 8, the desiredSize at the last enqueue (step 7) must have been -2'); + assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2'); + }), + + steps.waitThenAdvance(10).then(() => { + assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing'); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], + 'at step 10, two chunks must have been written'); + + assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1'); + }), + + pipePromise.then(() => { + assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source'); + assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing'); + + assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream'); + assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'], + 'all chunks were written (and the WritableStream closed)'); + }) + ]); + }); +}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream'); |