1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.next(4);
subscriber.complete();
});
const results = [];
source
.filter(value => value % 2 === 0)
.subscribe({
next: v => results.push(v),
error: () => results.push("error"),
complete: () => results.push("complete"),
});
assert_array_equals(results, [2, 4, "complete"]);
}, "filter(): Returned Observable filters out results based on predicate");
test(() => {
const error = new Error("error while filtering");
const results = [];
let teardownCalled = false;
const source = new Observable(subscriber => {
subscriber.addTeardown(() => teardownCalled = true);
subscriber.next(1);
assert_true(teardownCalled, "Teardown called once map unsubscribes due to error");
assert_false(subscriber.active, "Unsubscription makes Subscriber inactive");
subscriber.next(2);
subscriber.complete();
});
source
.filter(() => {
throw error;
})
.subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push("complete"),
});
assert_array_equals(results, [error]);
}, "filter(): Errors thrown in filter predicate are emitted to Observer error() handler");
test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.complete();
subscriber.next(2);
});
let predicateCalls = 0;
const results = [];
source.filter(v => ++predicateCalls).subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push('complete'),
});
assert_equals(predicateCalls, 1, "Predicate is not called after complete()");
assert_array_equals(results, [1, "complete"]);
}, "filter(): Passes complete() through from source Observable");
test(() => {
const source = new Observable(subscriber => {
subscriber.next(1);
subscriber.error('error');
subscriber.next(2);
});
let predicateCalls = 0;
const results = [];
source.map(v => ++predicateCalls).subscribe({
next: v => results.push(v),
error: e => results.push(e),
complete: () => results.push('complete'),
});
assert_equals(predicateCalls, 1, "Predicate is not called after error()");
assert_array_equals(results, [1, "error"]);
}, "filter(): Passes error() through from source Observable");
test(() => {
const results = [];
const source = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('source teardown'));
subscriber.signal.addEventListener('abort',
() => results.push('source abort event'));
subscriber.complete();
});
source.filter(() => results.push('filter predicate called')).subscribe({
complete: () => results.push('filter observable complete'),
});
assert_array_equals(results,
['source teardown', 'source abort event', 'filter observable complete']);
}, "filter(): Upon source completion, source Observable teardown sequence " +
"happens after downstream filter complete() is called");
test(() => {
const source = new Observable(subscriber => {
subscriber.next('value1');
subscriber.next('value2');
subscriber.next('value3');
});
const indices = [];
source.filter((value, index) => indices.push(index)).subscribe();
assert_array_equals(indices, [0, 1, 2]);
}, "filter(): Index is passed correctly to predicate");
|