test(() => { const source = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); let projectionCalls = 0; const results = []; const flattened = source.flatMap(value => { projectionCalls++; return new Observable((subscriber) => { subscriber.next(value * 10); subscriber.next(value * 100); subscriber.complete(); }); }); assert_true(flattened instanceof Observable, "flatMap() returns an Observable"); assert_equals(projectionCalls, 0, "Projection is not called until subscription starts"); flattened.subscribe({ next: v => results.push(v), error: () => results.push("error"), complete: () => results.push("complete"), }); assert_equals(projectionCalls, 3, "Mapper is called three times, once for each source Observable value"); assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"], "flatMap() results are correct"); }, "flatMap(): Flattens simple source Observable properly"); test(() => { const error = new Error("error"); const source = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.error(error); subscriber.next(3); }); const flattened = source.flatMap(value => { return new Observable(subscriber => { subscriber.next(value * 10); subscriber.next(value * 100); subscriber.complete(); }); }); const results = []; flattened.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [10, 100, 20, 200, error], "Source error is passed through to the flatMap() Observable"); }, "flatMap(): Returned Observable passes through source Observable errors"); test(() => { const results = []; const error = new Error("error"); const source = new Observable(subscriber => { subscriber.next(1); results.push(subscriber.active ? "active" : "inactive"); subscriber.next(2); results.push(subscriber.active ? "active" : "inactive"); subscriber.next(3); subscriber.complete(); }); const flattened = source.flatMap((value) => { return new Observable((subscriber) => { subscriber.next(value * 10); subscriber.next(value * 100); if (value === 2) { subscriber.error(error); } else { subscriber.complete(); } }); }); flattened.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"], "Inner subscription error gets surfaced"); }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + "'inner' Observable emits an error"); test(() => { const results = []; const error = new Error("error"); const source = new Observable(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.next(3); results.push(subscriber.active ? "active" : "inactive"); subscriber.complete(); }); const flattened = source.flatMap(value => { if (value === 3) { throw error; } return new Observable(subscriber => { subscriber.next(value * 10); subscriber.next(value * 100); subscriber.complete(); }); }); flattened.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [10, 100, 20, 200, error, "inactive"], "Inner subscriber thrown error gets surfaced"); }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + "'inner' Observable throws an error"); test(() => { const source = createTestSubject(); const inner1 = createTestSubject(); const inner2 = createTestSubject(); const flattened = source.flatMap(value => { if (value === 1) { return inner1; } return inner2; }); const results = []; flattened.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, []); source.next(1); assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); source.next(2); assert_equals(inner2.subscriberCount(), 0, "inner2 is queued, not subscribed to until inner1 completes"); assert_array_equals(results, []); inner1.next(100); inner1.next(101); assert_array_equals(results, [100, 101]); inner1.complete(); assert_equals(inner1.subscriberCount(), 0, "inner1 becomes inactive once it completes"); assert_equals(inner2.subscriberCount(), 1, "inner2 gets un-queued and subscribed to once inner1 completes"); inner2.next(200); inner2.next(201); assert_array_equals(results, [100, 101, 200, 201]); inner2.complete(); assert_equals(inner2.subscriberCount(), 0, "inner2 becomes inactive once it completes"); assert_equals(source.subscriberCount(), 1, "source is not unsubscribed from yet, since it has not completed"); assert_array_equals(results, [100, 101, 200, 201]); source.complete(); assert_equals(source.subscriberCount(), 0, "source unsubscribed from after it completes"); assert_array_equals(results, [100, 101, 200, 201, "complete"]); }, "flatMap(): result Observable does not complete until source and inner " + "Observables all complete"); test(() => { const source = createTestSubject(); const inner1 = createTestSubject(); const inner2 = createTestSubject(); const flattened = source.flatMap(value => { if (value === 1) { return inner1; } return inner2; }); const results = []; flattened.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, []); source.next(1); source.next(2); assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); assert_equals(inner2.subscriberCount(), 0, "inner2 is queued, not subscribed to until inner1 completes"); assert_array_equals(results, []); // Before `inner1` pushes any values, we first complete the source Observable. // This will not fire completion of the Observable returned from `flatMap()`, // because there are two values (corresponding to inner Observables) that are // queued to the inner queue that need to be processed first. Once the last // one of *those* completes (i.e., `inner2.complete()` further down), then the // returned Observable can finally complete. source.complete(); assert_equals(source.subscriberCount(), 0, "source becomes inactive once it completes"); inner1.next(100); inner1.next(101); assert_array_equals(results, [100, 101]); inner1.complete(); assert_array_equals(results, [100, 101], "Outer completion not triggered after inner1 completes"); assert_equals(inner2.subscriberCount(), 1, "inner2 gets un-queued and subscribed after inner1 completes"); inner2.next(200); inner2.next(201); assert_array_equals(results, [100, 101, 200, 201]); inner2.complete(); assert_equals(inner2.subscriberCount(), 0, "inner2 becomes inactive once it completes"); assert_array_equals(results, [100, 101, 200, 201, "complete"]); }, "flatMap(): result Observable does not complete after source Observable " + "completes while there are still queued inner Observables to process " + "Observables all complete"); test(() => { const source = createTestSubject(); const inner = createTestSubject(); const result = source.flatMap(() => inner); const ac = new AbortController(); result.subscribe({}, { signal: ac.signal, }); source.next(1); assert_equals(inner.subscriberCount(), 1, "inner Observable subscribed to once source emits it"); ac.abort(); assert_equals(source.subscriberCount(), 0, "source unsubscribed from, once outer signal is aborted"); assert_equals(inner.subscriberCount(), 0, "inner Observable unsubscribed from once the outer Observable is " + "subscribed from, as a result of the outer signal being aborted"); }, "flatMap(): source and inner active Observables are both unsubscribed " + "from once the outer subscription signal is aborted"); // A helper function to create an Observable that can be externally controlled // and examined for testing purposes. function createTestSubject() { const subscribers = new Set(); const subject = new Observable(subscriber => { subscribers.add(subscriber); subscriber.addTeardown(() => subscribers.delete(subscriber)); }); subject.next = value => { for (const subscriber of Array.from(subscribers)) { subscriber.next(value); } }; subject.error = error => { for (const subscriber of Array.from(subscribers)) { subscriber.error(error); } }; subject.complete = () => { for (const subscriber of Array.from(subscribers)) { subscriber.complete(); } }; subject.subscriberCount = () => { return subscribers.size; }; return subject; }