diff options
Diffstat (limited to 'testing/web-platform/tests/dom/observable/tentative/observable-flatMap.any.js')
-rw-r--r-- | testing/web-platform/tests/dom/observable/tentative/observable-flatMap.any.js | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/testing/web-platform/tests/dom/observable/tentative/observable-flatMap.any.js b/testing/web-platform/tests/dom/observable/tentative/observable-flatMap.any.js new file mode 100644 index 0000000000..7cbfa6cb60 --- /dev/null +++ b/testing/web-platform/tests/dom/observable/tentative/observable-flatMap.any.js @@ -0,0 +1,315 @@ +test(() => { + const source = new Observable(subscriber => { + subscriber.next(1); + subscriber.next(2); + subscriber.next(3); + subscriber.complete(); + }); + + let projectionCalls = 0; + + const results = []; + + const flattened = source.flatMap(value => { + projectionCalls++; + return new Observable((subscriber) => { + subscriber.next(value * 10); + subscriber.next(value * 100); + subscriber.complete(); + }); + }); + + assert_true(flattened instanceof Observable, "flatMap() returns an Observable"); + assert_equals(projectionCalls, 0, + "Projection is not called until subscription starts"); + + flattened.subscribe({ + next: v => results.push(v), + error: () => results.push("error"), + complete: () => results.push("complete"), + }); + + assert_equals(projectionCalls, 3, + "Mapper is called three times, once for each source Observable value"); + assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"], + "flatMap() results are correct"); +}, "flatMap(): Flattens simple source Observable properly"); + +test(() => { + const error = new Error("error"); + const source = new Observable(subscriber => { + subscriber.next(1); + subscriber.next(2); + subscriber.error(error); + subscriber.next(3); + }); + + const flattened = source.flatMap(value => { + return new Observable(subscriber => { + subscriber.next(value * 10); + subscriber.next(value * 100); + subscriber.complete(); + }); + }); + + const results = []; + + flattened.subscribe({ + next: v => results.push(v), + error: e => results.push(e), + complete: () => results.push("complete"), + }); + + assert_array_equals(results, [10, 100, 20, 200, error], + "Source error is passed through to the flatMap() Observable"); +}, "flatMap(): Returned Observable passes through source Observable errors"); + +test(() => { + const results = []; + const error = new Error("error"); + const source = new Observable(subscriber => { + subscriber.next(1); + results.push(subscriber.active ? "active" : "inactive"); + subscriber.next(2); + results.push(subscriber.active ? "active" : "inactive"); + subscriber.next(3); + subscriber.complete(); + }); + + const flattened = source.flatMap((value) => { + return new Observable((subscriber) => { + subscriber.next(value * 10); + subscriber.next(value * 100); + if (value === 2) { + subscriber.error(error); + } else { + subscriber.complete(); + } + }); + }); + + flattened.subscribe({ + next: v => results.push(v), + error: e => results.push(e), + complete: () => results.push("complete"), + }); + + assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"], + "Inner subscription error gets surfaced"); +}, "flatMap(): Outer Subscription synchronously becomes inactive when an " + + "'inner' Observable emits an error"); + +test(() => { + const results = []; + const error = new Error("error"); + const source = new Observable(subscriber => { + subscriber.next(1); + subscriber.next(2); + subscriber.next(3); + results.push(subscriber.active ? "active" : "inactive"); + subscriber.complete(); + }); + + const flattened = source.flatMap(value => { + if (value === 3) { + throw error; + } + return new Observable(subscriber => { + subscriber.next(value * 10); + subscriber.next(value * 100); + subscriber.complete(); + }); + }); + + flattened.subscribe({ + next: v => results.push(v), + error: e => results.push(e), + complete: () => results.push("complete"), + }); + + assert_array_equals(results, [10, 100, 20, 200, error, "inactive"], + "Inner subscriber thrown error gets surfaced"); +}, "flatMap(): Outer Subscription synchronously becomes inactive when an " + + "'inner' Observable throws an error"); + +test(() => { + const source = createTestSubject(); + const inner1 = createTestSubject(); + const inner2 = createTestSubject(); + + const flattened = source.flatMap(value => { + if (value === 1) { + return inner1; + } + + return inner2; + }); + + const results = []; + + flattened.subscribe({ + next: v => results.push(v), + error: e => results.push(e), + complete: () => results.push("complete"), + }); + + assert_array_equals(results, []); + + source.next(1); + assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); + + source.next(2); + assert_equals(inner2.subscriberCount(), 0, + "inner2 is queued, not subscribed to until inner1 completes"); + + assert_array_equals(results, []); + + inner1.next(100); + inner1.next(101); + + assert_array_equals(results, [100, 101]); + + inner1.complete(); + assert_equals(inner1.subscriberCount(), 0, + "inner1 becomes inactive once it completes"); + assert_equals(inner2.subscriberCount(), 1, + "inner2 gets un-queued and subscribed to once inner1 completes"); + + inner2.next(200); + inner2.next(201); + assert_array_equals(results, [100, 101, 200, 201]); + + inner2.complete(); + assert_equals(inner2.subscriberCount(), 0, + "inner2 becomes inactive once it completes"); + assert_equals(source.subscriberCount(), 1, + "source is not unsubscribed from yet, since it has not completed"); + assert_array_equals(results, [100, 101, 200, 201]); + + source.complete(); + assert_equals(source.subscriberCount(), 0, + "source unsubscribed from after it completes"); + + assert_array_equals(results, [100, 101, 200, 201, "complete"]); +}, "flatMap(): result Observable does not complete until source and inner " + + "Observables all complete"); + +test(() => { + const source = createTestSubject(); + const inner1 = createTestSubject(); + const inner2 = createTestSubject(); + + const flattened = source.flatMap(value => { + if (value === 1) { + return inner1; + } + + return inner2; + }); + + const results = []; + + flattened.subscribe({ + next: v => results.push(v), + error: e => results.push(e), + complete: () => results.push("complete"), + }); + + assert_array_equals(results, []); + + source.next(1); + source.next(2); + assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); + assert_equals(inner2.subscriberCount(), 0, + "inner2 is queued, not subscribed to until inner1 completes"); + + assert_array_equals(results, []); + + // Before `inner1` pushes any values, we first complete the source Observable. + // This will not fire completion of the Observable returned from `flatMap()`, + // because there are two values (corresponding to inner Observables) that are + // queued to the inner queue that need to be processed first. Once the last + // one of *those* completes (i.e., `inner2.complete()` further down), then the + // returned Observable can finally complete. + source.complete(); + assert_equals(source.subscriberCount(), 0, + "source becomes inactive once it completes"); + + inner1.next(100); + inner1.next(101); + + assert_array_equals(results, [100, 101]); + + inner1.complete(); + assert_array_equals(results, [100, 101], + "Outer completion not triggered after inner1 completes"); + assert_equals(inner2.subscriberCount(), 1, + "inner2 gets un-queued and subscribed after inner1 completes"); + + inner2.next(200); + inner2.next(201); + assert_array_equals(results, [100, 101, 200, 201]); + + inner2.complete(); + assert_equals(inner2.subscriberCount(), 0, + "inner2 becomes inactive once it completes"); + assert_array_equals(results, [100, 101, 200, 201, "complete"]); +}, "flatMap(): result Observable does not complete after source Observable " + + "completes while there are still queued inner Observables to process " + + "Observables all complete"); + +test(() => { + const source = createTestSubject(); + const inner = createTestSubject(); + const result = source.flatMap(() => inner); + + const ac = new AbortController(); + + result.subscribe({}, { signal: ac.signal, }); + + source.next(1); + + assert_equals(inner.subscriberCount(), 1, + "inner Observable subscribed to once source emits it"); + + ac.abort(); + + assert_equals(source.subscriberCount(), 0, + "source unsubscribed from, once outer signal is aborted"); + + assert_equals(inner.subscriberCount(), 0, + "inner Observable unsubscribed from once the outer Observable is " + + "subscribed from, as a result of the outer signal being aborted"); +}, "flatMap(): source and inner active Observables are both unsubscribed " + + "from once the outer subscription signal is aborted"); + +// A helper function to create an Observable that can be externally controlled +// and examined for testing purposes. +function createTestSubject() { + const subscribers = new Set(); + const subject = new Observable(subscriber => { + subscribers.add(subscriber); + subscriber.addTeardown(() => subscribers.delete(subscriber)); + }); + + subject.next = value => { + for (const subscriber of Array.from(subscribers)) { + subscriber.next(value); + } + }; + subject.error = error => { + for (const subscriber of Array.from(subscribers)) { + subscriber.error(error); + } + }; + subject.complete = () => { + for (const subscriber of Array.from(subscribers)) { + subscriber.complete(); + } + }; + subject.subscriberCount = () => { + return subscribers.size; + }; + + return subject; +} |