summaryrefslogtreecommitdiffstats
path: root/vendor/datafrog/src/join.rs
blob: 94270af777daa3d2031d2b5768a8f5ff4f5a3a5f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
//! Join functionality.

use super::{Relation, Variable};
use std::cell::Ref;
use std::ops::Deref;

/// Implements `join`. Note that `input1` must be a variable, but
/// `input2` can be either a variable or a relation. This is necessary
/// because relations have no "recent" tuples, so the fn would be a
/// guaranteed no-op if both arguments were relations.  See also
/// `join_into_relation`.
pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
    input1: &Variable<(Key, Val1)>,
    input2: impl JoinInput<'me, (Key, Val2)>,
    output: &Variable<Result>,
    mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
) {
    let mut results = Vec::new();

    let recent1 = input1.recent();
    let recent2 = input2.recent();

    {
        // scoped to let `closure` drop borrow of `results`.

        let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));

        for batch2 in input2.stable().iter() {
            join_helper(&recent1, &batch2, &mut closure);
        }

        for batch1 in input1.stable().iter() {
            join_helper(&batch1, &recent2, &mut closure);
        }

        join_helper(&recent1, &recent2, &mut closure);
    }

    output.insert(Relation::from_vec(results));
}

/// Join, but for two relations.
pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
    input1: &Relation<(Key, Val1)>,
    input2: &Relation<(Key, Val2)>,
    mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
) -> Relation<Result> {
    let mut results = Vec::new();

    join_helper(&input1.elements, &input2.elements, |k, v1, v2| {
        results.push(logic(k, v1, v2));
    });

    Relation::from_vec(results)
}

/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
    input1: impl JoinInput<'me, (Key, Val)>,
    input2: &Relation<Key>,
    mut logic: impl FnMut(&Key, &Val) -> Result,
) -> Relation<Result> {
    let mut tuples2 = &input2[..];

    let results = input1
        .recent()
        .iter()
        .filter(|(ref key, _)| {
            tuples2 = gallop(tuples2, |k| k < key);
            tuples2.first() != Some(key)
        })
        .map(|(ref key, ref val)| logic(key, val))
        .collect::<Vec<_>>();

    Relation::from_vec(results)
}

fn join_helper<K: Ord, V1, V2>(
    mut slice1: &[(K, V1)],
    mut slice2: &[(K, V2)],
    mut result: impl FnMut(&K, &V1, &V2),
) {
    while !slice1.is_empty() && !slice2.is_empty() {
        use std::cmp::Ordering;

        // If the keys match produce tuples, else advance the smaller key until they might.
        match slice1[0].0.cmp(&slice2[0].0) {
            Ordering::Less => {
                slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
            }
            Ordering::Equal => {
                // Determine the number of matching keys in each slice.
                let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
                let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();

                // Produce results from the cross-product of matches.
                for index1 in 0..count1 {
                    for s2 in slice2[..count2].iter() {
                        result(&slice1[0].0, &slice1[index1].1, &s2.1);
                    }
                }

                // Advance slices past this key.
                slice1 = &slice1[count1..];
                slice2 = &slice2[count2..];
            }
            Ordering::Greater => {
                slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
            }
        }
    }
}

pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
    // if empty slice, or already >= element, return
    if !slice.is_empty() && cmp(&slice[0]) {
        let mut step = 1;
        while step < slice.len() && cmp(&slice[step]) {
            slice = &slice[step..];
            step <<= 1;
        }

        step >>= 1;
        while step > 0 {
            if step < slice.len() && cmp(&slice[step]) {
                slice = &slice[step..];
            }
            step >>= 1;
        }

        slice = &slice[1..]; // advance one, as we always stayed < value
    }

    slice
}

/// An input that can be used with `from_join`; either a `Variable` or a `Relation`.
pub trait JoinInput<'me, Tuple: Ord>: Copy {
    /// If we are on iteration N of the loop, these are the tuples
    /// added on iteration N-1. (For a `Relation`, this is always an
    /// empty slice.)
    type RecentTuples: Deref<Target = [Tuple]>;

    /// If we are on iteration N of the loop, these are the tuples
    /// added on iteration N - 2 or before. (For a `Relation`, this is
    /// just `self`.)
    type StableTuples: Deref<Target = [Relation<Tuple>]>;

    /// Get the set of recent tuples.
    fn recent(self) -> Self::RecentTuples;

    /// Get the set of stable tuples.
    fn stable(self) -> Self::StableTuples;
}

impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> {
    type RecentTuples = Ref<'me, [Tuple]>;
    type StableTuples = Ref<'me, [Relation<Tuple>]>;

    fn recent(self) -> Self::RecentTuples {
        Ref::map(self.recent.borrow(), |r| &r.elements[..])
    }

    fn stable(self) -> Self::StableTuples {
        Ref::map(self.stable.borrow(), |v| &v[..])
    }
}

impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> {
    type RecentTuples = &'me [Tuple];
    type StableTuples = &'me [Relation<Tuple>];

    fn recent(self) -> Self::RecentTuples {
        &[]
    }

    fn stable(self) -> Self::StableTuples {
        std::slice::from_ref(self)
    }
}