//! Join functionality. use super::{Relation, Variable}; use std::cell::Ref; use std::ops::Deref; /// Implements `join`. Note that `input1` must be a variable, but /// `input2` can be either a variable or a relation. This is necessary /// because relations have no "recent" tuples, so the fn would be a /// guaranteed no-op if both arguments were relations. See also /// `join_into_relation`. pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( input1: &Variable<(Key, Val1)>, input2: impl JoinInput<'me, (Key, Val2)>, output: &Variable, mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, ) { let mut results = Vec::new(); let recent1 = input1.recent(); let recent2 = input2.recent(); { // scoped to let `closure` drop borrow of `results`. let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2)); for batch2 in input2.stable().iter() { join_helper(&recent1, &batch2, &mut closure); } for batch1 in input1.stable().iter() { join_helper(&batch1, &recent2, &mut closure); } join_helper(&recent1, &recent2, &mut closure); } output.insert(Relation::from_vec(results)); } /// Join, but for two relations. pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( input1: &Relation<(Key, Val1)>, input2: &Relation<(Key, Val2)>, mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, ) -> Relation { let mut results = Vec::new(); join_helper(&input1.elements, &input2.elements, |k, v1, v2| { results.push(logic(k, v1, v2)); }); Relation::from_vec(results) } /// Moves all recent tuples from `input1` that are not present in `input2` into `output`. pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>( input1: impl JoinInput<'me, (Key, Val)>, input2: &Relation, mut logic: impl FnMut(&Key, &Val) -> Result, ) -> Relation { let mut tuples2 = &input2[..]; let results = input1 .recent() .iter() .filter(|(ref key, _)| { tuples2 = gallop(tuples2, |k| k < key); tuples2.first() != Some(key) }) .map(|(ref key, ref val)| logic(key, val)) .collect::>(); Relation::from_vec(results) } fn join_helper( mut slice1: &[(K, V1)], mut slice2: &[(K, V2)], mut result: impl FnMut(&K, &V1, &V2), ) { while !slice1.is_empty() && !slice2.is_empty() { use std::cmp::Ordering; // If the keys match produce tuples, else advance the smaller key until they might. match slice1[0].0.cmp(&slice2[0].0) { Ordering::Less => { slice1 = gallop(slice1, |x| x.0 < slice2[0].0); } Ordering::Equal => { // Determine the number of matching keys in each slice. let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count(); let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count(); // Produce results from the cross-product of matches. for index1 in 0..count1 { for s2 in slice2[..count2].iter() { result(&slice1[0].0, &slice1[index1].1, &s2.1); } } // Advance slices past this key. slice1 = &slice1[count1..]; slice2 = &slice2[count2..]; } Ordering::Greater => { slice2 = gallop(slice2, |x| x.0 < slice1[0].0); } } } } pub(crate) fn gallop(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] { // if empty slice, or already >= element, return if !slice.is_empty() && cmp(&slice[0]) { let mut step = 1; while step < slice.len() && cmp(&slice[step]) { slice = &slice[step..]; step <<= 1; } step >>= 1; while step > 0 { if step < slice.len() && cmp(&slice[step]) { slice = &slice[step..]; } step >>= 1; } slice = &slice[1..]; // advance one, as we always stayed < value } slice } /// An input that can be used with `from_join`; either a `Variable` or a `Relation`. pub trait JoinInput<'me, Tuple: Ord>: Copy { /// If we are on iteration N of the loop, these are the tuples /// added on iteration N-1. (For a `Relation`, this is always an /// empty slice.) type RecentTuples: Deref; /// If we are on iteration N of the loop, these are the tuples /// added on iteration N - 2 or before. (For a `Relation`, this is /// just `self`.) type StableTuples: Deref]>; /// Get the set of recent tuples. fn recent(self) -> Self::RecentTuples; /// Get the set of stable tuples. fn stable(self) -> Self::StableTuples; } impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable { type RecentTuples = Ref<'me, [Tuple]>; type StableTuples = Ref<'me, [Relation]>; fn recent(self) -> Self::RecentTuples { Ref::map(self.recent.borrow(), |r| &r.elements[..]) } fn stable(self) -> Self::StableTuples { Ref::map(self.stable.borrow(), |v| &v[..]) } } impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation { type RecentTuples = &'me [Tuple]; type StableTuples = &'me [Relation]; fn recent(self) -> Self::RecentTuples { &[] } fn stable(self) -> Self::StableTuples { std::slice::from_ref(self) } }