test(() => { const results = []; const source = new Observable(subscriber => { subscriber.addTeardown(() => results.push("source teardown")); subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); const result = source.take(2); result.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [1, 2, "source teardown", "complete"]); }, "take(): Takes the first N values from the source observable, then completes"); test(() => { const results = []; const source = new Observable(subscriber => { subscriber.addTeardown(() => results.push("source teardown")); subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); const result = source.take(5); result.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [1, 2, 3, "source teardown", "complete"], "complete() is immediately forwarded"); }, "take(): Forwards complete()s that happen before the take count is met, " + "and unsubscribes from source Observable"); test(() => { const results = []; const error = new Error('source error'); const source = new Observable(subscriber => { subscriber.next(1); subscriber.error(error); }); const result = source.take(100); result.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, [1, error], "Errors are forwarded"); }, "take(): Should forward errors from the source observable"); test(() => { const results = []; const source = new Observable((subscriber) => { results.push("source subscribe"); subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); const result = source.take(0); result.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, ["complete"]); }, "take(): take(0) should not subscribe to the source observable, and " + "should return an observable that immediately completes"); test(() => { const results = []; const source = new Observable((subscriber) => { results.push("source subscribe"); subscriber.next(1); subscriber.next(2); subscriber.next(3); subscriber.complete(); }); // Per WebIDL, `-1` passed into an `unsigned long long` gets wrapped around // into the maximum value (18446744073709551615), which means the `result` // Observable captures everything that `source` does. const result = source.take(-1); result.subscribe({ next: v => results.push(v), error: e => results.push(e), complete: () => results.push("complete"), }); assert_array_equals(results, ["source subscribe", 1, 2, 3, "complete"]); }, "take(): Negative count is treated as maximum value"); // This tests a regression in Chromium's implementation. In ref-counted // producers, when Subscriber#next() is called, the Subscriber iterates over all // of its "internal observers" [1] and calls "next" on them. However, "next" can // complete the subscription, and modify the "internal observers" list while // Subscriber is iterating over it. This mutation-during-iteration caused a // crash regression in Chromium, which this test covers. // // [1]: https://wicg.github.io/observable/#subscriber-internal-observers promise_test(async () => { async function* asyncNumbers() { yield* [1,2,3,4]; } const source = Observable.from(asyncNumbers()); const results = []; source.take(1).toArray().then(result => results.push(result)); await source.take(3).toArray().then(result => results.push(result)); assert_equals(results.length, 2); assert_array_equals(results[0], [1]); assert_array_equals(results[1], [1, 2, 3]); }, "take(): No crash when take(1) unsubscribes from its source when next() " + "is called, and the Subscriber iterates over the rest of the Observables");