diff options
Diffstat (limited to 'vendor/datafrog/src/join.rs')
-rw-r--r-- | vendor/datafrog/src/join.rs | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/vendor/datafrog/src/join.rs b/vendor/datafrog/src/join.rs new file mode 100644 index 000000000..94270af77 --- /dev/null +++ b/vendor/datafrog/src/join.rs @@ -0,0 +1,180 @@ +//! Join functionality. + +use super::{Relation, Variable}; +use std::cell::Ref; +use std::ops::Deref; + +/// Implements `join`. Note that `input1` must be a variable, but +/// `input2` can be either a variable or a relation. This is necessary +/// because relations have no "recent" tuples, so the fn would be a +/// guaranteed no-op if both arguments were relations. See also +/// `join_into_relation`. +pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( + input1: &Variable<(Key, Val1)>, + input2: impl JoinInput<'me, (Key, Val2)>, + output: &Variable<Result>, + mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, +) { + let mut results = Vec::new(); + + let recent1 = input1.recent(); + let recent2 = input2.recent(); + + { + // scoped to let `closure` drop borrow of `results`. + + let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2)); + + for batch2 in input2.stable().iter() { + join_helper(&recent1, &batch2, &mut closure); + } + + for batch1 in input1.stable().iter() { + join_helper(&batch1, &recent2, &mut closure); + } + + join_helper(&recent1, &recent2, &mut closure); + } + + output.insert(Relation::from_vec(results)); +} + +/// Join, but for two relations. +pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( + input1: &Relation<(Key, Val1)>, + input2: &Relation<(Key, Val2)>, + mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, +) -> Relation<Result> { + let mut results = Vec::new(); + + join_helper(&input1.elements, &input2.elements, |k, v1, v2| { + results.push(logic(k, v1, v2)); + }); + + Relation::from_vec(results) +} + +/// Moves all recent tuples from `input1` that are not present in `input2` into `output`. +pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>( + input1: impl JoinInput<'me, (Key, Val)>, + input2: &Relation<Key>, + mut logic: impl FnMut(&Key, &Val) -> Result, +) -> Relation<Result> { + let mut tuples2 = &input2[..]; + + let results = input1 + .recent() + .iter() + .filter(|(ref key, _)| { + tuples2 = gallop(tuples2, |k| k < key); + tuples2.first() != Some(key) + }) + .map(|(ref key, ref val)| logic(key, val)) + .collect::<Vec<_>>(); + + Relation::from_vec(results) +} + +fn join_helper<K: Ord, V1, V2>( + mut slice1: &[(K, V1)], + mut slice2: &[(K, V2)], + mut result: impl FnMut(&K, &V1, &V2), +) { + while !slice1.is_empty() && !slice2.is_empty() { + use std::cmp::Ordering; + + // If the keys match produce tuples, else advance the smaller key until they might. + match slice1[0].0.cmp(&slice2[0].0) { + Ordering::Less => { + slice1 = gallop(slice1, |x| x.0 < slice2[0].0); + } + Ordering::Equal => { + // Determine the number of matching keys in each slice. + let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count(); + let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count(); + + // Produce results from the cross-product of matches. + for index1 in 0..count1 { + for s2 in slice2[..count2].iter() { + result(&slice1[0].0, &slice1[index1].1, &s2.1); + } + } + + // Advance slices past this key. + slice1 = &slice1[count1..]; + slice2 = &slice2[count2..]; + } + Ordering::Greater => { + slice2 = gallop(slice2, |x| x.0 < slice1[0].0); + } + } + } +} + +pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] { + // if empty slice, or already >= element, return + if !slice.is_empty() && cmp(&slice[0]) { + let mut step = 1; + while step < slice.len() && cmp(&slice[step]) { + slice = &slice[step..]; + step <<= 1; + } + + step >>= 1; + while step > 0 { + if step < slice.len() && cmp(&slice[step]) { + slice = &slice[step..]; + } + step >>= 1; + } + + slice = &slice[1..]; // advance one, as we always stayed < value + } + + slice +} + +/// An input that can be used with `from_join`; either a `Variable` or a `Relation`. +pub trait JoinInput<'me, Tuple: Ord>: Copy { + /// If we are on iteration N of the loop, these are the tuples + /// added on iteration N-1. (For a `Relation`, this is always an + /// empty slice.) + type RecentTuples: Deref<Target = [Tuple]>; + + /// If we are on iteration N of the loop, these are the tuples + /// added on iteration N - 2 or before. (For a `Relation`, this is + /// just `self`.) + type StableTuples: Deref<Target = [Relation<Tuple>]>; + + /// Get the set of recent tuples. + fn recent(self) -> Self::RecentTuples; + + /// Get the set of stable tuples. + fn stable(self) -> Self::StableTuples; +} + +impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> { + type RecentTuples = Ref<'me, [Tuple]>; + type StableTuples = Ref<'me, [Relation<Tuple>]>; + + fn recent(self) -> Self::RecentTuples { + Ref::map(self.recent.borrow(), |r| &r.elements[..]) + } + + fn stable(self) -> Self::StableTuples { + Ref::map(self.stable.borrow(), |v| &v[..]) + } +} + +impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> { + type RecentTuples = &'me [Tuple]; + type StableTuples = &'me [Relation<Tuple>]; + + fn recent(self) -> Self::RecentTuples { + &[] + } + + fn stable(self) -> Self::StableTuples { + std::slice::from_ref(self) + } +} |