1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
extern crate differential_dataflow;
extern crate rand;
extern crate timely;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::operators::*;
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::*;
use differential_dataflow::AsCollection;
// mod loglikelihoodratio;
fn main() {
// define a new timely dataflow computation.
timely::execute_from_args(std::env::args().skip(6), move |worker| {
// capture parameters of the experiment.
let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let items: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let scale: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(4).unwrap().parse().unwrap();
let noisy: bool = std::env::args().nth(5).unwrap() == "noisy";
let index = worker.index();
let peers = worker.peers();
let (input, probe) = worker.dataflow(|scope| {
// input of (user, item) collection.
let (input, occurrences) = scope.new_input();
let occurrences = occurrences.as_collection();
//TODO adjust code to only work with upper triangular half of cooccurrence matrix
/* Compute the cooccurrence matrix C = A'A from the binary interaction matrix A. */
let cooccurrences = occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b))
.filter(|&(item_a, item_b)| item_a != item_b)
.count();
/* compute the rowsums of C indicating how often we encounter individual items. */
let row_sums = occurrences.map(|(_user, item)| item).count();
// row_sums.inspect(|record| println!("[row_sums] {:?}", record));
/* Join the cooccurrence pairs with the corresponding row sums. */
let mut cooccurrences_with_row_sums = cooccurrences
.map(|((item_a, item_b), num_cooccurrences)| (item_a, (item_b, num_cooccurrences)))
.join_map(
&row_sums,
|&item_a, &(item_b, num_cooccurrences), &row_sum_a| {
assert!(row_sum_a > 0);
(item_b, (item_a, num_cooccurrences, row_sum_a))
},
)
.join_map(
&row_sums,
|&item_b, &(item_a, num_cooccurrences, row_sum_a), &row_sum_b| {
assert!(row_sum_a > 0);
assert!(row_sum_b > 0);
(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))
},
);
// cooccurrences_with_row_sums
// .inspect(|record| println!("[cooccurrences_with_row_sums] {:?}", record));
// //TODO compute top-k "similar items" per item
// /* Compute LLR scores for each item pair. */
// let llr_scores = cooccurrences_with_row_sums.map(
// |(item_a, (item_b, num_cooccurrences, row_sum_a, row_sum_b))| {
// println!(
// "[llr_scores] item_a={} item_b={}, num_cooccurrences={} row_sum_a={} row_sum_b={}",
// item_a, item_b, num_cooccurrences, row_sum_a, row_sum_b);
// let k11: isize = num_cooccurrences;
// let k12: isize = row_sum_a as isize - k11;
// let k21: isize = row_sum_b as isize - k11;
// let k22: isize = 10000 - k12 - k21 + k11;
// let llr_score = loglikelihoodratio::log_likelihood_ratio(k11, k12, k21, k22);
// ((item_a, item_b), llr_score)
// });
if noisy {
cooccurrences_with_row_sums =
cooccurrences_with_row_sums.inspect(|x| println!("change: {:?}", x));
}
let probe = cooccurrences_with_row_sums.probe();
/*
// produce the (item, item) collection
let cooccurrences = occurrences
.join_map(&occurrences, |_user, &item_a, &item_b| (item_a, item_b));
// count the occurrences of each item.
let counts = cooccurrences
.map(|(item_a,_)| item_a)
.count();
// produce ((item1, item2), count1, count2, count12) tuples
let cooccurrences_with_counts = cooccurrences
.join_map(&counts, |&item_a, &item_b, &count_item_a| (item_b, (item_a, count_item_a)))
.join_map(&counts, |&item_b, &(item_a, count_item_a), &count_item_b| {
((item_a, item_b), count_item_a, count_item_b)
});
let probe = cooccurrences_with_counts
.inspect(|x| println!("change: {:?}", x))
.probe();
*/
(input, probe)
});
let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions
let mut input = InputSession::from(input);
for count in 0..scale {
if count % peers == index {
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
// println!("[INITIAL INPUT] ({}, {})", user, item);
input.insert((user, item));
}
}
// load the initial data up!
while probe.less_than(input.time()) {
worker.step();
}
for round in 1.. {
for element in (round * batch)..((round + 1) * batch) {
if element % peers == index {
// advance the input timestamp.
input.advance_to(round * batch);
// insert a new item.
let user = rng1.gen_range(0, users);
let item = rng1.gen_range(0, items);
if noisy {
println!("[INPUT: insert] ({}, {})", user, item);
}
input.insert((user, item));
// remove an old item.
let user = rng2.gen_range(0, users);
let item = rng2.gen_range(0, items);
if noisy {
println!("[INPUT: remove] ({}, {})", user, item);
}
input.remove((user, item));
}
}
input.advance_to(round * batch);
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}
|