diff options
Diffstat (limited to 'vendor/datafrog/src')
-rw-r--r-- | vendor/datafrog/src/join.rs | 180 | ||||
-rw-r--r-- | vendor/datafrog/src/lib.rs | 567 | ||||
-rw-r--r-- | vendor/datafrog/src/map.rs | 13 | ||||
-rw-r--r-- | vendor/datafrog/src/test.rs | 195 | ||||
-rw-r--r-- | vendor/datafrog/src/treefrog.rs | 661 |
5 files changed, 1616 insertions, 0 deletions
diff --git a/vendor/datafrog/src/join.rs b/vendor/datafrog/src/join.rs new file mode 100644 index 000000000..94270af77 --- /dev/null +++ b/vendor/datafrog/src/join.rs @@ -0,0 +1,180 @@ +//! Join functionality. + +use super::{Relation, Variable}; +use std::cell::Ref; +use std::ops::Deref; + +/// Implements `join`. Note that `input1` must be a variable, but +/// `input2` can be either a variable or a relation. This is necessary +/// because relations have no "recent" tuples, so the fn would be a +/// guaranteed no-op if both arguments were relations. See also +/// `join_into_relation`. +pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( + input1: &Variable<(Key, Val1)>, + input2: impl JoinInput<'me, (Key, Val2)>, + output: &Variable<Result>, + mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, +) { + let mut results = Vec::new(); + + let recent1 = input1.recent(); + let recent2 = input2.recent(); + + { + // scoped to let `closure` drop borrow of `results`. + + let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2)); + + for batch2 in input2.stable().iter() { + join_helper(&recent1, &batch2, &mut closure); + } + + for batch1 in input1.stable().iter() { + join_helper(&batch1, &recent2, &mut closure); + } + + join_helper(&recent1, &recent2, &mut closure); + } + + output.insert(Relation::from_vec(results)); +} + +/// Join, but for two relations. +pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>( + input1: &Relation<(Key, Val1)>, + input2: &Relation<(Key, Val2)>, + mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result, +) -> Relation<Result> { + let mut results = Vec::new(); + + join_helper(&input1.elements, &input2.elements, |k, v1, v2| { + results.push(logic(k, v1, v2)); + }); + + Relation::from_vec(results) +} + +/// Moves all recent tuples from `input1` that are not present in `input2` into `output`. +pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>( + input1: impl JoinInput<'me, (Key, Val)>, + input2: &Relation<Key>, + mut logic: impl FnMut(&Key, &Val) -> Result, +) -> Relation<Result> { + let mut tuples2 = &input2[..]; + + let results = input1 + .recent() + .iter() + .filter(|(ref key, _)| { + tuples2 = gallop(tuples2, |k| k < key); + tuples2.first() != Some(key) + }) + .map(|(ref key, ref val)| logic(key, val)) + .collect::<Vec<_>>(); + + Relation::from_vec(results) +} + +fn join_helper<K: Ord, V1, V2>( + mut slice1: &[(K, V1)], + mut slice2: &[(K, V2)], + mut result: impl FnMut(&K, &V1, &V2), +) { + while !slice1.is_empty() && !slice2.is_empty() { + use std::cmp::Ordering; + + // If the keys match produce tuples, else advance the smaller key until they might. + match slice1[0].0.cmp(&slice2[0].0) { + Ordering::Less => { + slice1 = gallop(slice1, |x| x.0 < slice2[0].0); + } + Ordering::Equal => { + // Determine the number of matching keys in each slice. + let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count(); + let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count(); + + // Produce results from the cross-product of matches. + for index1 in 0..count1 { + for s2 in slice2[..count2].iter() { + result(&slice1[0].0, &slice1[index1].1, &s2.1); + } + } + + // Advance slices past this key. + slice1 = &slice1[count1..]; + slice2 = &slice2[count2..]; + } + Ordering::Greater => { + slice2 = gallop(slice2, |x| x.0 < slice1[0].0); + } + } + } +} + +pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] { + // if empty slice, or already >= element, return + if !slice.is_empty() && cmp(&slice[0]) { + let mut step = 1; + while step < slice.len() && cmp(&slice[step]) { + slice = &slice[step..]; + step <<= 1; + } + + step >>= 1; + while step > 0 { + if step < slice.len() && cmp(&slice[step]) { + slice = &slice[step..]; + } + step >>= 1; + } + + slice = &slice[1..]; // advance one, as we always stayed < value + } + + slice +} + +/// An input that can be used with `from_join`; either a `Variable` or a `Relation`. +pub trait JoinInput<'me, Tuple: Ord>: Copy { + /// If we are on iteration N of the loop, these are the tuples + /// added on iteration N-1. (For a `Relation`, this is always an + /// empty slice.) + type RecentTuples: Deref<Target = [Tuple]>; + + /// If we are on iteration N of the loop, these are the tuples + /// added on iteration N - 2 or before. (For a `Relation`, this is + /// just `self`.) + type StableTuples: Deref<Target = [Relation<Tuple>]>; + + /// Get the set of recent tuples. + fn recent(self) -> Self::RecentTuples; + + /// Get the set of stable tuples. + fn stable(self) -> Self::StableTuples; +} + +impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Variable<Tuple> { + type RecentTuples = Ref<'me, [Tuple]>; + type StableTuples = Ref<'me, [Relation<Tuple>]>; + + fn recent(self) -> Self::RecentTuples { + Ref::map(self.recent.borrow(), |r| &r.elements[..]) + } + + fn stable(self) -> Self::StableTuples { + Ref::map(self.stable.borrow(), |v| &v[..]) + } +} + +impl<'me, Tuple: Ord> JoinInput<'me, Tuple> for &'me Relation<Tuple> { + type RecentTuples = &'me [Tuple]; + type StableTuples = &'me [Relation<Tuple>]; + + fn recent(self) -> Self::RecentTuples { + &[] + } + + fn stable(self) -> Self::StableTuples { + std::slice::from_ref(self) + } +} diff --git a/vendor/datafrog/src/lib.rs b/vendor/datafrog/src/lib.rs new file mode 100644 index 000000000..d2f93239e --- /dev/null +++ b/vendor/datafrog/src/lib.rs @@ -0,0 +1,567 @@ +//! A lightweight Datalog engine in Rust +//! +//! The intended design is that one has static `Relation` types that are sets +//! of tuples, and `Variable` types that represent monotonically increasing +//! sets of tuples. +//! +//! The types are mostly wrappers around `Vec<Tuple>` indicating sorted-ness, +//! and the intent is that this code can be dropped in the middle of an otherwise +//! normal Rust program, run to completion, and then the results extracted as +//! vectors again. + +#![forbid(missing_docs)] + +use std::cell::RefCell; +use std::cmp::Ordering; +use std::iter::FromIterator; +use std::rc::Rc; + +mod join; +mod map; +mod test; +mod treefrog; +pub use crate::join::JoinInput; +pub use crate::treefrog::{ + extend_anti::ExtendAnti, + extend_with::ExtendWith, + filter_anti::FilterAnti, + filter_with::FilterWith, + filters::{PrefixFilter, ValueFilter}, + Leaper, Leapers, RelationLeaper, +}; + +/// A static, ordered list of key-value pairs. +/// +/// A relation represents a fixed set of key-value pairs. In many places in a +/// Datalog computation we want to be sure that certain relations are not able +/// to vary (for example, in antijoins). +#[derive(Clone)] +pub struct Relation<Tuple: Ord> { + /// Sorted list of distinct tuples. + pub elements: Vec<Tuple>, +} + +impl<Tuple: Ord> Relation<Tuple> { + /// Merges two relations into their union. + pub fn merge(self, other: Self) -> Self { + let Relation { + elements: mut elements1, + } = self; + let Relation { + elements: mut elements2, + } = other; + + // If one of the element lists is zero-length, we don't need to do any work + if elements1.is_empty() { + return Relation { + elements: elements2, + }; + } + + if elements2.is_empty() { + return Relation { + elements: elements1, + }; + } + + // Make sure that elements1 starts with the lower element + // Will not panic since both collections must have at least 1 element at this point + if elements1[0] > elements2[0] { + std::mem::swap(&mut elements1, &mut elements2); + } + + // Fast path for when all the new elements are after the exiting ones + if elements1[elements1.len() - 1] < elements2[0] { + elements1.extend(elements2.into_iter()); + // println!("fast path"); + return Relation { + elements: elements1, + }; + } + + let mut elements = Vec::with_capacity(elements1.len() + elements2.len()); + let mut elements1 = elements1.drain(..); + let mut elements2 = elements2.drain(..).peekable(); + + elements.push(elements1.next().unwrap()); + if elements.first() == elements2.peek() { + elements2.next(); + } + + for elem in elements1 { + while elements2.peek().map(|x| x.cmp(&elem)) == Some(Ordering::Less) { + elements.push(elements2.next().unwrap()); + } + if elements2.peek().map(|x| x.cmp(&elem)) == Some(Ordering::Equal) { + elements2.next(); + } + elements.push(elem); + } + + // Finish draining second list + elements.extend(elements2); + + Relation { elements } + } + + /// Creates a `Relation` from the elements of the `iterator`. + /// + /// Same as the `from_iter` method from `std::iter::FromIterator` trait. + pub fn from_iter<I>(iterator: I) -> Self + where + I: IntoIterator<Item = Tuple>, + { + iterator.into_iter().collect() + } + + /// Creates a `Relation` using the `leapjoin` logic; + /// see [`Variable::from_leapjoin`] + pub fn from_leapjoin<'leap, SourceTuple: Ord, Val: Ord + 'leap>( + source: &Relation<SourceTuple>, + leapers: impl Leapers<'leap, SourceTuple, Val>, + logic: impl FnMut(&SourceTuple, &Val) -> Tuple, + ) -> Self { + treefrog::leapjoin(&source.elements, leapers, logic) + } + + /// Creates a `Relation` by joining the values from `input1` and + /// `input2` and then applying `logic`. Like + /// [`Variable::from_join`] except for use where the inputs are + /// not varying across iterations. + pub fn from_join<Key: Ord, Val1: Ord, Val2: Ord>( + input1: &Relation<(Key, Val1)>, + input2: &Relation<(Key, Val2)>, + logic: impl FnMut(&Key, &Val1, &Val2) -> Tuple, + ) -> Self { + join::join_into_relation(input1, input2, logic) + } + + /// Creates a `Relation` by removing all values from `input1` that + /// share a key with `input2`, and then transforming the resulting + /// tuples with the `logic` closure. Like + /// [`Variable::from_antijoin`] except for use where the inputs + /// are not varying across iterations. + pub fn from_antijoin<Key: Ord, Val1: Ord>( + input1: &Relation<(Key, Val1)>, + input2: &Relation<Key>, + logic: impl FnMut(&Key, &Val1) -> Tuple, + ) -> Self { + join::antijoin(input1, input2, logic) + } + + /// Construct a new relation by mapping another one. Equivalent to + /// creating an iterator but perhaps more convenient. Analogous to + /// `Variable::from_map`. + pub fn from_map<T2: Ord>(input: &Relation<T2>, logic: impl FnMut(&T2) -> Tuple) -> Self { + input.iter().map(logic).collect() + } + + /// Creates a `Relation` from a vector of tuples. + pub fn from_vec(mut elements: Vec<Tuple>) -> Self { + elements.sort(); + elements.dedup(); + Relation { elements } + } +} + +impl<Tuple: Ord> From<Vec<Tuple>> for Relation<Tuple> { + fn from(iterator: Vec<Tuple>) -> Self { + Self::from_vec(iterator) + } +} + +impl<Tuple: Ord> FromIterator<Tuple> for Relation<Tuple> { + fn from_iter<I>(iterator: I) -> Self + where + I: IntoIterator<Item = Tuple>, + { + Relation::from_vec(iterator.into_iter().collect()) + } +} + +impl<'tuple, Tuple: 'tuple + Copy + Ord> FromIterator<&'tuple Tuple> for Relation<Tuple> { + fn from_iter<I>(iterator: I) -> Self + where + I: IntoIterator<Item = &'tuple Tuple>, + { + Relation::from_vec(iterator.into_iter().cloned().collect()) + } +} + +impl<Tuple: Ord> std::ops::Deref for Relation<Tuple> { + type Target = [Tuple]; + fn deref(&self) -> &Self::Target { + &self.elements[..] + } +} + +/// An iterative context for recursive evaluation. +/// +/// An `Iteration` tracks monotonic variables, and monitors their progress. +/// It can inform the user if they have ceased changing, at which point the +/// computation should be done. +pub struct Iteration { + variables: Vec<Box<dyn VariableTrait>>, +} + +impl Iteration { + /// Create a new iterative context. + pub fn new() -> Self { + Iteration { + variables: Vec::new(), + } + } + /// Reports whether any of the monitored variables have changed since + /// the most recent call. + pub fn changed(&mut self) -> bool { + let mut result = false; + for variable in self.variables.iter_mut() { + if variable.changed() { + result = true; + } + } + result + } + /// Creates a new named variable associated with the iterative context. + pub fn variable<Tuple: Ord + 'static>(&mut self, name: &str) -> Variable<Tuple> { + let variable = Variable::new(name); + self.variables.push(Box::new(variable.clone())); + variable + } + /// Creates a new named variable associated with the iterative context. + /// + /// This variable will not be maintained distinctly, and may advertise tuples as + /// recent multiple times (perhaps unboundedly many times). + pub fn variable_indistinct<Tuple: Ord + 'static>(&mut self, name: &str) -> Variable<Tuple> { + let mut variable = Variable::new(name); + variable.distinct = false; + self.variables.push(Box::new(variable.clone())); + variable + } +} + +/// A type that can report on whether it has changed. +trait VariableTrait { + /// Reports whether the variable has changed since it was last asked. + fn changed(&mut self) -> bool; +} + +/// An monotonically increasing set of `Tuple`s. +/// +/// There are three stages in the lifecycle of a tuple: +/// +/// 1. A tuple is added to `self.to_add`, but is not yet visible externally. +/// 2. Newly added tuples are then promoted to `self.recent` for one iteration. +/// 3. After one iteration, recent tuples are moved to `self.tuples` for posterity. +/// +/// Each time `self.changed()` is called, the `recent` relation is folded into `tuples`, +/// and the `to_add` relations are merged, potentially deduplicated against `tuples`, and +/// then made `recent`. This way, across calls to `changed()` all added tuples are in +/// `recent` at least once and eventually all are in `tuples`. +/// +/// A `Variable` may optionally be instructed not to de-duplicate its tuples, for reasons +/// of performance. Such a variable cannot be relied on to terminate iterative computation, +/// and it is important that any cycle of derivations have at least one de-duplicating +/// variable on it. +pub struct Variable<Tuple: Ord> { + /// Should the variable be maintained distinctly. + distinct: bool, + /// A useful name for the variable. + name: String, + /// A list of relations whose union are the accepted tuples. + pub stable: Rc<RefCell<Vec<Relation<Tuple>>>>, + /// A list of recent tuples, still to be processed. + pub recent: Rc<RefCell<Relation<Tuple>>>, + /// A list of future tuples, to be introduced. + to_add: Rc<RefCell<Vec<Relation<Tuple>>>>, +} + +// Operator implementations. +impl<Tuple: Ord> Variable<Tuple> { + /// Adds tuples that result from joining `input1` and `input2` -- + /// each of the inputs must be a set of (Key, Value) tuples. Both + /// `input1` and `input2` must have the same type of key (`K`) but + /// they can have distinct value types (`V1` and `V2` + /// respectively). The `logic` closure will be invoked for each + /// key that appears in both inputs; it is also given the two + /// values, and from those it should construct the resulting + /// value. + /// + /// Note that `input1` must be a variable, but `input2` can be a + /// relation or a variable. Therefore, you cannot join two + /// relations with this method. This is not because the result + /// would be wrong, but because it would be inefficient: the + /// result from such a join cannot vary across iterations (as + /// relations are fixed), so you should prefer to invoke `insert` + /// on a relation created by `Relation::from_join` instead. + /// + /// # Examples + /// + /// This example starts a collection with the pairs (x, x+1) and (x+1, x) for x in 0 .. 10. + /// It then adds pairs (y, z) for which (x, y) and (x, z) are present. Because the initial + /// pairs are symmetric, this should result in all pairs (x, y) for x and y in 0 .. 11. + /// + /// ``` + /// use datafrog::{Iteration, Relation}; + /// + /// let mut iteration = Iteration::new(); + /// let variable = iteration.variable::<(usize, usize)>("source"); + /// variable.extend((0 .. 10).map(|x| (x, x + 1))); + /// variable.extend((0 .. 10).map(|x| (x + 1, x))); + /// + /// while iteration.changed() { + /// variable.from_join(&variable, &variable, |&key, &val1, &val2| (val1, val2)); + /// } + /// + /// let result = variable.complete(); + /// assert_eq!(result.len(), 121); + /// ``` + pub fn from_join<'me, K: Ord, V1: Ord, V2: Ord>( + &self, + input1: &'me Variable<(K, V1)>, + input2: impl JoinInput<'me, (K, V2)>, + logic: impl FnMut(&K, &V1, &V2) -> Tuple, + ) { + join::join_into(input1, input2, self, logic) + } + + /// Adds tuples from `input1` whose key is not present in `input2`. + /// + /// Note that `input1` must be a variable: if you have a relation + /// instead, you can use `Relation::from_antijoin` and then + /// `Variable::insert`. Note that the result will not vary during + /// the iteration. + /// + /// # Examples + /// + /// This example starts a collection with the pairs (x, x+1) for x in 0 .. 10. It then + /// adds any pairs (x+1,x) for which x is not a multiple of three. That excludes four + /// pairs (for 0, 3, 6, and 9) which should leave us with 16 total pairs. + /// + /// ``` + /// use datafrog::{Iteration, Relation}; + /// + /// let mut iteration = Iteration::new(); + /// let variable = iteration.variable::<(usize, usize)>("source"); + /// variable.extend((0 .. 10).map(|x| (x, x + 1))); + /// + /// let relation: Relation<_> = (0 .. 10).filter(|x| x % 3 == 0).collect(); + /// + /// while iteration.changed() { + /// variable.from_antijoin(&variable, &relation, |&key, &val| (val, key)); + /// } + /// + /// let result = variable.complete(); + /// assert_eq!(result.len(), 16); + /// ``` + pub fn from_antijoin<K: Ord, V: Ord>( + &self, + input1: &Variable<(K, V)>, + input2: &Relation<K>, + logic: impl FnMut(&K, &V) -> Tuple, + ) { + self.insert(join::antijoin(input1, input2, logic)) + } + + /// Adds tuples that result from mapping `input`. + /// + /// # Examples + /// + /// This example starts a collection with the pairs (x, x) for x in 0 .. 10. It then + /// repeatedly adds any pairs (x, z) for (x, y) in the collection, where z is the Collatz + /// step for y: it is y/2 if y is even, and 3*y + 1 if y is odd. This produces all of the + /// pairs (x, y) where x visits y as part of its Collatz journey. + /// + /// ``` + /// use datafrog::{Iteration, Relation}; + /// + /// let mut iteration = Iteration::new(); + /// let variable = iteration.variable::<(usize, usize)>("source"); + /// variable.extend((0 .. 10).map(|x| (x, x))); + /// + /// while iteration.changed() { + /// variable.from_map(&variable, |&(key, val)| + /// if val % 2 == 0 { + /// (key, val/2) + /// } + /// else { + /// (key, 3*val + 1) + /// }); + /// } + /// + /// let result = variable.complete(); + /// assert_eq!(result.len(), 74); + /// ``` + pub fn from_map<T2: Ord>(&self, input: &Variable<T2>, logic: impl FnMut(&T2) -> Tuple) { + map::map_into(input, self, logic) + } + + /// Adds tuples that result from combining `source` with the + /// relations given in `leapers`. This operation is very flexible + /// and can be used to do a combination of joins and anti-joins. + /// The main limitation is that the things being combined must + /// consist of one dynamic variable (`source`) and then several + /// fixed relations (`leapers`). + /// + /// The idea is as follows: + /// + /// - You will be inserting new tuples that result from joining (and anti-joining) + /// some dynamic variable `source` of source tuples (`SourceTuple`) + /// with some set of values (of type `Val`). + /// - You provide these values by combining `source` with a set of leapers + /// `leapers`, each of which is derived from a fixed relation. The `leapers` + /// should be either a single leaper (of suitable type) or else a tuple of leapers. + /// You can create a leaper in one of two ways: + /// - Extension: In this case, you have a relation of type `(K, Val)` for some + /// type `K`. You provide a closure that maps from `SourceTuple` to the key + /// `K`. If you use `relation.extend_with`, then any `Val` values the + /// relation provides will be added to the set of values; if you use + /// `extend_anti`, then the `Val` values will be removed. + /// - Filtering: In this case, you have a relation of type `K` for some + /// type `K` and you provide a closure that maps from `SourceTuple` to + /// the key `K`. Filters don't provide values but they remove source + /// tuples. + /// - Finally, you get a callback `logic` that accepts each `(SourceTuple, Val)` + /// that was successfully joined (and not filtered) and which maps to the + /// type of this variable. + pub fn from_leapjoin<'leap, SourceTuple: Ord, Val: Ord + 'leap>( + &self, + source: &Variable<SourceTuple>, + leapers: impl Leapers<'leap, SourceTuple, Val>, + logic: impl FnMut(&SourceTuple, &Val) -> Tuple, + ) { + self.insert(treefrog::leapjoin(&source.recent.borrow(), leapers, logic)); + } +} + +impl<Tuple: Ord> Clone for Variable<Tuple> { + fn clone(&self) -> Self { + Variable { + distinct: self.distinct, + name: self.name.clone(), + stable: self.stable.clone(), + recent: self.recent.clone(), + to_add: self.to_add.clone(), + } + } +} + +impl<Tuple: Ord> Variable<Tuple> { + fn new(name: &str) -> Self { + Variable { + distinct: true, + name: name.to_string(), + stable: Rc::new(RefCell::new(Vec::new())), + recent: Rc::new(RefCell::new(Vec::new().into())), + to_add: Rc::new(RefCell::new(Vec::new())), + } + } + + /// Inserts a relation into the variable. + /// + /// This is most commonly used to load initial values into a variable. + /// it is not obvious that it should be commonly used otherwise, but + /// it should not be harmful. + pub fn insert(&self, relation: Relation<Tuple>) { + if !relation.is_empty() { + self.to_add.borrow_mut().push(relation); + } + } + + /// Extend the variable with values from the iterator. + /// + /// This is most commonly used to load initial values into a variable. + /// it is not obvious that it should be commonly used otherwise, but + /// it should not be harmful. + pub fn extend<T>(&self, iterator: impl IntoIterator<Item = T>) + where + Relation<Tuple>: FromIterator<T>, + { + self.insert(iterator.into_iter().collect()); + } + + /// Consumes the variable and returns a relation. + /// + /// This method removes the ability for the variable to develop, and + /// flattens all internal tuples down to one relation. The method + /// asserts that iteration has completed, in that `self.recent` and + /// `self.to_add` should both be empty. + pub fn complete(self) -> Relation<Tuple> { + assert!(self.recent.borrow().is_empty()); + assert!(self.to_add.borrow().is_empty()); + let mut result: Relation<Tuple> = Vec::new().into(); + while let Some(batch) = self.stable.borrow_mut().pop() { + result = result.merge(batch); + } + result + } +} + +impl<Tuple: Ord> VariableTrait for Variable<Tuple> { + fn changed(&mut self) -> bool { + // 1. Merge self.recent into self.stable. + if !self.recent.borrow().is_empty() { + let mut recent = + ::std::mem::replace(&mut (*self.recent.borrow_mut()), Vec::new().into()); + while self + .stable + .borrow() + .last() + .map(|x| x.len() <= 2 * recent.len()) + == Some(true) + { + let last = self.stable.borrow_mut().pop().unwrap(); + recent = recent.merge(last); + } + self.stable.borrow_mut().push(recent); + } + + // 2. Move self.to_add into self.recent. + let to_add = self.to_add.borrow_mut().pop(); + if let Some(mut to_add) = to_add { + while let Some(to_add_more) = self.to_add.borrow_mut().pop() { + to_add = to_add.merge(to_add_more); + } + // 2b. Restrict `to_add` to tuples not in `self.stable`. + if self.distinct { + for batch in self.stable.borrow().iter() { + let mut slice = &batch[..]; + // Only gallop if the slice is relatively large. + if slice.len() > 4 * to_add.elements.len() { + to_add.elements.retain(|x| { + slice = join::gallop(slice, |y| y < x); + slice.is_empty() || &slice[0] != x + }); + } else { + to_add.elements.retain(|x| { + while !slice.is_empty() && &slice[0] < x { + slice = &slice[1..]; + } + slice.is_empty() || &slice[0] != x + }); + } + } + } + *self.recent.borrow_mut() = to_add; + } + + // let mut total = 0; + // for tuple in self.stable.borrow().iter() { + // total += tuple.len(); + // } + + // println!("Variable\t{}\t{}\t{}", self.name, total, self.recent.borrow().len()); + + !self.recent.borrow().is_empty() + } +} + +// impl<Tuple: Ord> Drop for Variable<Tuple> { +// fn drop(&mut self) { +// let mut total = 0; +// for batch in self.stable.borrow().iter() { +// total += batch.len(); +// } +// println!("FINAL: {:?}\t{:?}", self.name, total); +// } +// } diff --git a/vendor/datafrog/src/map.rs b/vendor/datafrog/src/map.rs new file mode 100644 index 000000000..1a8c10128 --- /dev/null +++ b/vendor/datafrog/src/map.rs @@ -0,0 +1,13 @@ +//! Map functionality. + +use super::{Relation, Variable}; + +pub(crate) fn map_into<T1: Ord, T2: Ord>( + input: &Variable<T1>, + output: &Variable<T2>, + logic: impl FnMut(&T1) -> T2, +) { + let results: Vec<T2> = input.recent.borrow().iter().map(logic).collect(); + + output.insert(Relation::from_vec(results)); +} diff --git a/vendor/datafrog/src/test.rs b/vendor/datafrog/src/test.rs new file mode 100644 index 000000000..9d5af3561 --- /dev/null +++ b/vendor/datafrog/src/test.rs @@ -0,0 +1,195 @@ +#![cfg(test)] + +use crate::Iteration; +use crate::Relation; +use crate::RelationLeaper; +use proptest::prelude::*; +use proptest::{proptest, proptest_helper}; + +fn inputs() -> impl Strategy<Value = Vec<(u32, u32)>> { + prop::collection::vec((0_u32..100, 0_u32..100), 1..500) +} + +/// The original way to use datafrog -- computes reachable nodes from a set of edges +fn reachable_with_var_join(edges: &[(u32, u32)]) -> Relation<(u32, u32)> { + let edges: Relation<_> = edges.iter().collect(); + let mut iteration = Iteration::new(); + + let edges_by_successor = iteration.variable::<(u32, u32)>("edges_invert"); + edges_by_successor.extend(edges.iter().map(|&(n1, n2)| (n2, n1))); + + let reachable = iteration.variable::<(u32, u32)>("reachable"); + reachable.insert(edges); + + while iteration.changed() { + // reachable(N1, N3) :- edges(N1, N2), reachable(N2, N3). + reachable.from_join(&reachable, &edges_by_successor, |&_, &n3, &n1| (n1, n3)); + } + + reachable.complete() +} + +/// Like `reachable`, but using a relation as an input to `from_join` +fn reachable_with_relation_join(edges: &[(u32, u32)]) -> Relation<(u32, u32)> { + let edges: Relation<_> = edges.iter().collect(); + let mut iteration = Iteration::new(); + + // NB. Changed from `reachable_with_var_join`: + let edges_by_successor: Relation<_> = edges.iter().map(|&(n1, n2)| (n2, n1)).collect(); + + let reachable = iteration.variable::<(u32, u32)>("reachable"); + reachable.insert(edges); + + while iteration.changed() { + // reachable(N1, N3) :- edges(N1, N2), reachable(N2, N3). + reachable.from_join(&reachable, &edges_by_successor, |&_, &n3, &n1| (n1, n3)); + } + + reachable.complete() +} + +fn reachable_with_leapfrog(edges: &[(u32, u32)]) -> Relation<(u32, u32)> { + let edges: Relation<_> = edges.iter().collect(); + let mut iteration = Iteration::new(); + + let edges_by_successor: Relation<_> = edges.iter().map(|&(n1, n2)| (n2, n1)).collect(); + + let reachable = iteration.variable::<(u32, u32)>("reachable"); + reachable.insert(edges); + + while iteration.changed() { + // reachable(N1, N3) :- edges(N1, N2), reachable(N2, N3). + reachable.from_leapjoin( + &reachable, + edges_by_successor.extend_with(|&(n2, _)| n2), + |&(_, n3), &n1| (n1, n3), + ); + } + + reachable.complete() +} + +/// Computes a join where the values are summed -- uses iteration +/// variables (the original datafrog technique). +fn sum_join_via_var( + input1_slice: &[(u32, u32)], + input2_slice: &[(u32, u32)], +) -> Relation<(u32, u32)> { + let mut iteration = Iteration::new(); + + let input1 = iteration.variable::<(u32, u32)>("input1"); + input1.extend(input1_slice); + + let input2 = iteration.variable::<(u32, u32)>("input1"); + input2.extend(input2_slice); + + let output = iteration.variable::<(u32, u32)>("output"); + + while iteration.changed() { + // output(K1, V1 * 100 + V2) :- input1(K1, V1), input2(K1, V2). + output.from_join(&input1, &input2, |&k1, &v1, &v2| (k1, v1 * 100 + v2)); + } + + output.complete() +} + +/// Computes a join where the values are summed -- uses iteration +/// variables (the original datafrog technique). +fn sum_join_via_relation( + input1_slice: &[(u32, u32)], + input2_slice: &[(u32, u32)], +) -> Relation<(u32, u32)> { + let input1: Relation<_> = input1_slice.iter().collect(); + let input2: Relation<_> = input2_slice.iter().collect(); + Relation::from_join(&input1, &input2, |&k1, &v1, &v2| (k1, v1 * 100 + v2)) +} + +proptest! { + #[test] + fn reachable_leapfrog_vs_var_join(edges in inputs()) { + let reachable1 = reachable_with_var_join(&edges); + let reachable2 = reachable_with_leapfrog(&edges); + assert_eq!(reachable1.elements, reachable2.elements); + } + + #[test] + fn reachable_rel_join_vs_var_join(edges in inputs()) { + let reachable1 = reachable_with_var_join(&edges); + let reachable2 = reachable_with_relation_join(&edges); + assert_eq!(reachable1.elements, reachable2.elements); + } + + #[test] + fn sum_join_from_var_vs_rel((set1, set2) in (inputs(), inputs())) { + let output1 = sum_join_via_var(&set1, &set2); + let output2 = sum_join_via_relation(&set1, &set2); + assert_eq!(output1.elements, output2.elements); + } + + /// Test the behavior of `filter_anti` used on its own in a + /// leapjoin -- effectively it becomes an "intersection" + /// operation. + #[test] + fn filter_with_on_its_own((set1, set2) in (inputs(), inputs())) { + let input1: Relation<(u32, u32)> = set1.iter().collect(); + let input2: Relation<(u32, u32)> = set2.iter().collect(); + let intersection1 = Relation::from_leapjoin( + &input1, + input2.filter_with(|&tuple| tuple), + |&tuple, &()| tuple, + ); + + let intersection2: Relation<(u32, u32)> = input1.elements.iter() + .filter(|t| input2.elements.binary_search(&t).is_ok()) + .collect(); + + assert_eq!(intersection1.elements, intersection2.elements); + } + + /// Test the behavior of `filter_anti` used on its own in a + /// leapjoin -- effectively it becomes a "set minus" operation. + #[test] + fn filter_anti_on_its_own((set1, set2) in (inputs(), inputs())) { + let input1: Relation<(u32, u32)> = set1.iter().collect(); + let input2: Relation<(u32, u32)> = set2.iter().collect(); + + let difference1 = Relation::from_leapjoin( + &input1, + input2.filter_anti(|&tuple| tuple), + |&tuple, &()| tuple, + ); + + let difference2: Relation<(u32, u32)> = input1.elements.iter() + .filter(|t| input2.elements.binary_search(&t).is_err()) + .collect(); + + assert_eq!(difference1.elements, difference2.elements); + } +} + +/// Test that `from_leapjoin` matches against the tuples from an +/// `extend` that precedes first iteration. +/// +/// This was always true, but wasn't immediately obvious to me until I +/// re-read the code more carefully. -nikomatsakis +#[test] +fn leapjoin_from_extend() { + let doubles: Relation<(u32, u32)> = (0..10).map(|i| (i, i * 2)).collect(); + + let mut iteration = Iteration::new(); + + let variable = iteration.variable::<(u32, u32)>("variable"); + variable.extend(Some((2, 2))); + + while iteration.changed() { + variable.from_leapjoin( + &variable, + doubles.extend_with(|&(i, _)| i), + |&(i, _), &j| (i, j), + ); + } + + let variable = variable.complete(); + + assert_eq!(variable.elements, vec![(2, 2), (2, 4)]); +} diff --git a/vendor/datafrog/src/treefrog.rs b/vendor/datafrog/src/treefrog.rs new file mode 100644 index 000000000..2ad238fd0 --- /dev/null +++ b/vendor/datafrog/src/treefrog.rs @@ -0,0 +1,661 @@ +//! Join functionality. + +use super::Relation; + +/// Performs treefrog leapjoin using a list of leapers. +pub(crate) fn leapjoin<'leap, Tuple: Ord, Val: Ord + 'leap, Result: Ord>( + source: &[Tuple], + mut leapers: impl Leapers<'leap, Tuple, Val>, + mut logic: impl FnMut(&Tuple, &Val) -> Result, +) -> Relation<Result> { + let mut result = Vec::new(); // temp output storage. + let mut values = Vec::new(); // temp value storage. + + for tuple in source { + // Determine which leaper would propose the fewest values. + let mut min_index = usize::max_value(); + let mut min_count = usize::max_value(); + leapers.for_each_count(tuple, |index, count| { + if min_count > count { + min_count = count; + min_index = index; + } + }); + + // We had best have at least one relation restricting values. + assert!(min_count < usize::max_value()); + + // If there are values to propose: + if min_count > 0 { + // Push the values that `min_index` "proposes" into `values`. + leapers.propose(tuple, min_index, &mut values); + + // Give other leapers a chance to remove values from + // anti-joins or filters. + leapers.intersect(tuple, min_index, &mut values); + + // Push remaining items into result. + for val in values.drain(..) { + result.push(logic(tuple, val)); + } + } + } + + Relation::from_vec(result) +} + +/// Implemented for a tuple of leapers +pub trait Leapers<'leap, Tuple, Val> { + /// Internal method: + fn for_each_count(&mut self, tuple: &Tuple, op: impl FnMut(usize, usize)); + + /// Internal method: + fn propose(&mut self, tuple: &Tuple, min_index: usize, values: &mut Vec<&'leap Val>); + + /// Internal method: + fn intersect(&mut self, tuple: &Tuple, min_index: usize, values: &mut Vec<&'leap Val>); +} + +macro_rules! tuple_leapers { + ($($Ty:ident)*) => { + #[allow(unused_assignments, non_snake_case)] + impl<'leap, Tuple, Val, $($Ty),*> Leapers<'leap, Tuple, Val> for ($($Ty,)*) + where + $($Ty: Leaper<'leap, Tuple, Val>,)* + { + fn for_each_count(&mut self, tuple: &Tuple, mut op: impl FnMut(usize, usize)) { + let ($($Ty,)*) = self; + let mut index = 0; + $( + let count = $Ty.count(tuple); + op(index, count); + index += 1; + )* + } + + fn propose(&mut self, tuple: &Tuple, min_index: usize, values: &mut Vec<&'leap Val>) { + let ($($Ty,)*) = self; + let mut index = 0; + $( + if min_index == index { + return $Ty.propose(tuple, values); + } + index += 1; + )* + panic!("no match found for min_index={}", min_index); + } + + fn intersect(&mut self, tuple: &Tuple, min_index: usize, values: &mut Vec<&'leap Val>) { + let ($($Ty,)*) = self; + let mut index = 0; + $( + if min_index != index { + $Ty.intersect(tuple, values); + } + index += 1; + )* + } + } + } +} + +tuple_leapers!(A B); +tuple_leapers!(A B C); +tuple_leapers!(A B C D); +tuple_leapers!(A B C D E); +tuple_leapers!(A B C D E F); +tuple_leapers!(A B C D E F G); + +/// Methods to support treefrog leapjoin. +pub trait Leaper<'leap, Tuple, Val> { + /// Estimates the number of proposed values. + fn count(&mut self, prefix: &Tuple) -> usize; + /// Populates `values` with proposed values. + fn propose(&mut self, prefix: &Tuple, values: &mut Vec<&'leap Val>); + /// Restricts `values` to proposed values. + fn intersect(&mut self, prefix: &Tuple, values: &mut Vec<&'leap Val>); +} + +pub(crate) mod filters { + use super::Leaper; + use super::Leapers; + + /// A treefrog leaper that tests each of the tuples from the main + /// input (the "prefix"). Use like `PrefixFilter::from(|tuple| + /// ...)`; if the closure returns true, then the tuple is + /// retained, else it will be ignored. This leaper can be used in + /// isolation in which case it just acts like a filter on the + /// input (the "proposed value" will be `()` type). + pub struct PrefixFilter<Tuple, Func: Fn(&Tuple) -> bool> { + phantom: ::std::marker::PhantomData<Tuple>, + predicate: Func, + } + + impl<'leap, Tuple, Func> PrefixFilter<Tuple, Func> + where + Func: Fn(&Tuple) -> bool, + { + /// Creates a new filter based on the prefix + pub fn from(predicate: Func) -> Self { + PrefixFilter { + phantom: ::std::marker::PhantomData, + predicate, + } + } + } + + impl<'leap, Tuple, Val, Func> Leaper<'leap, Tuple, Val> for PrefixFilter<Tuple, Func> + where + Func: Fn(&Tuple) -> bool, + { + /// Estimates the number of proposed values. + fn count(&mut self, prefix: &Tuple) -> usize { + if (self.predicate)(prefix) { + usize::max_value() + } else { + 0 + } + } + /// Populates `values` with proposed values. + fn propose(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val>) { + panic!("PrefixFilter::propose(): variable apparently unbound"); + } + /// Restricts `values` to proposed values. + fn intersect(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val>) { + // We can only be here if we returned max_value() above. + } + } + + impl<'leap, Tuple, Func> Leapers<'leap, Tuple, ()> for PrefixFilter<Tuple, Func> + where + Func: Fn(&Tuple) -> bool, + { + fn for_each_count(&mut self, tuple: &Tuple, mut op: impl FnMut(usize, usize)) { + if <Self as Leaper<'_, Tuple, ()>>::count(self, tuple) == 0 { + op(0, 0) + } else { + // we will "propose" the `()` value if the predicate applies + op(0, 1) + } + } + + fn propose(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + assert_eq!(min_index, 0); + values.push(&()); + } + + fn intersect(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + assert_eq!(min_index, 0); + assert_eq!(values.len(), 1); + } + } + + /// A treefrog leaper based on a predicate of prefix and value. + /// Use like `ValueFilter::from(|tuple, value| ...)`. The closure + /// should return true if `value` ought to be retained. The + /// `value` will be a value proposed elsewhere by an `extend_with` + /// leaper. + /// + /// This leaper cannot be used in isolation, it must be combined + /// with other leapers. + pub struct ValueFilter<Tuple, Val, Func: Fn(&Tuple, &Val) -> bool> { + phantom: ::std::marker::PhantomData<(Tuple, Val)>, + predicate: Func, + } + + impl<'leap, Tuple, Val, Func> ValueFilter<Tuple, Val, Func> + where + Func: Fn(&Tuple, &Val) -> bool, + { + /// Creates a new filter based on the prefix + pub fn from(predicate: Func) -> Self { + ValueFilter { + phantom: ::std::marker::PhantomData, + predicate, + } + } + } + + impl<'leap, Tuple, Val, Func> Leaper<'leap, Tuple, Val> for ValueFilter<Tuple, Val, Func> + where + Func: Fn(&Tuple, &Val) -> bool, + { + /// Estimates the number of proposed values. + fn count(&mut self, _prefix: &Tuple) -> usize { + usize::max_value() + } + /// Populates `values` with proposed values. + fn propose(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val>) { + panic!("PrefixFilter::propose(): variable apparently unbound"); + } + /// Restricts `values` to proposed values. + fn intersect(&mut self, prefix: &Tuple, values: &mut Vec<&'leap Val>) { + values.retain(|val| (self.predicate)(prefix, val)); + } + } + +} + +/// Extension method for relations. +pub trait RelationLeaper<Key: Ord, Val: Ord> { + /// Extend with `Val` using the elements of the relation. + fn extend_with<'leap, Tuple: Ord, Func: Fn(&Tuple) -> Key>( + &'leap self, + key_func: Func, + ) -> extend_with::ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap; + /// Extend with `Val` using the complement of the relation. + fn extend_anti<'leap, Tuple: Ord, Func: Fn(&Tuple) -> Key>( + &'leap self, + key_func: Func, + ) -> extend_anti::ExtendAnti<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap; + /// Extend with any value if tuple is present in relation. + fn filter_with<'leap, Tuple: Ord, Func: Fn(&Tuple) -> (Key, Val)>( + &'leap self, + key_func: Func, + ) -> filter_with::FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap; + /// Extend with any value if tuple is absent from relation. + fn filter_anti<'leap, Tuple: Ord, Func: Fn(&Tuple) -> (Key, Val)>( + &'leap self, + key_func: Func, + ) -> filter_anti::FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap; +} + +impl<Key: Ord, Val: Ord> RelationLeaper<Key, Val> for Relation<(Key, Val)> { + fn extend_with<'leap, Tuple: Ord, Func: Fn(&Tuple) -> Key>( + &'leap self, + key_func: Func, + ) -> extend_with::ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap, + { + extend_with::ExtendWith::from(self, key_func) + } + fn extend_anti<'leap, Tuple: Ord, Func: Fn(&Tuple) -> Key>( + &'leap self, + key_func: Func, + ) -> extend_anti::ExtendAnti<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap, + { + extend_anti::ExtendAnti::from(self, key_func) + } + fn filter_with<'leap, Tuple: Ord, Func: Fn(&Tuple) -> (Key, Val)>( + &'leap self, + key_func: Func, + ) -> filter_with::FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap, + { + filter_with::FilterWith::from(self, key_func) + } + fn filter_anti<'leap, Tuple: Ord, Func: Fn(&Tuple) -> (Key, Val)>( + &'leap self, + key_func: Func, + ) -> filter_anti::FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: 'leap, + Val: 'leap, + { + filter_anti::FilterAnti::from(self, key_func) + } +} + +pub(crate) mod extend_with { + use super::{binary_search, Leaper, Leapers, Relation}; + use crate::join::gallop; + + /// Wraps a Relation<Tuple> as a leaper. + pub struct ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + relation: &'leap Relation<(Key, Val)>, + start: usize, + end: usize, + key_func: Func, + phantom: ::std::marker::PhantomData<Tuple>, + } + + impl<'leap, Key, Val, Tuple, Func> ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + /// Constructs a ExtendWith from a relation and key and value function. + pub fn from(relation: &'leap Relation<(Key, Val)>, key_func: Func) -> Self { + ExtendWith { + relation, + start: 0, + end: 0, + key_func, + phantom: ::std::marker::PhantomData, + } + } + } + + impl<'leap, Key, Val, Tuple, Func> Leaper<'leap, Tuple, Val> + for ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + fn count(&mut self, prefix: &Tuple) -> usize { + let key = (self.key_func)(prefix); + self.start = binary_search(&self.relation[..], |x| &x.0 < &key); + let slice1 = &self.relation[self.start..]; + let slice2 = gallop(slice1, |x| &x.0 <= &key); + self.end = self.relation.len() - slice2.len(); + slice1.len() - slice2.len() + } + fn propose(&mut self, _prefix: &Tuple, values: &mut Vec<&'leap Val>) { + let slice = &self.relation[self.start..self.end]; + values.extend(slice.iter().map(|&(_, ref val)| val)); + } + fn intersect(&mut self, _prefix: &Tuple, values: &mut Vec<&'leap Val>) { + let mut slice = &self.relation[self.start..self.end]; + values.retain(|v| { + slice = gallop(slice, |kv| &kv.1 < v); + slice.get(0).map(|kv| &kv.1) == Some(v) + }); + } + } + + impl<'leap, Key, Val, Tuple, Func> Leapers<'leap, Tuple, Val> + for ExtendWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + fn for_each_count(&mut self, tuple: &Tuple, mut op: impl FnMut(usize, usize)) { + op(0, self.count(tuple)) + } + + fn propose(&mut self, tuple: &Tuple, min_index: usize, values: &mut Vec<&'leap Val>) { + assert_eq!(min_index, 0); + Leaper::propose(self, tuple, values); + } + + fn intersect(&mut self, _: &Tuple, min_index: usize, _: &mut Vec<&'leap Val>) { + assert_eq!(min_index, 0); + } + } +} + +pub(crate) mod extend_anti { + use super::{binary_search, Leaper, Relation}; + use crate::join::gallop; + + /// Wraps a Relation<Tuple> as a leaper. + pub struct ExtendAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + relation: &'leap Relation<(Key, Val)>, + key_func: Func, + phantom: ::std::marker::PhantomData<Tuple>, + } + + impl<'leap, Key, Val, Tuple, Func> ExtendAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + /// Constructs a ExtendAnti from a relation and key and value function. + pub fn from(relation: &'leap Relation<(Key, Val)>, key_func: Func) -> Self { + ExtendAnti { + relation, + key_func, + phantom: ::std::marker::PhantomData, + } + } + } + + impl<'leap, Key: Ord, Val: Ord + 'leap, Tuple: Ord, Func> Leaper<'leap, Tuple, Val> + for ExtendAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> Key, + { + fn count(&mut self, _prefix: &Tuple) -> usize { + usize::max_value() + } + fn propose(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val>) { + panic!("ExtendAnti::propose(): variable apparently unbound."); + } + fn intersect(&mut self, prefix: &Tuple, values: &mut Vec<&'leap Val>) { + let key = (self.key_func)(prefix); + let start = binary_search(&self.relation[..], |x| &x.0 < &key); + let slice1 = &self.relation[start..]; + let slice2 = gallop(slice1, |x| &x.0 <= &key); + let mut slice = &slice1[..(slice1.len() - slice2.len())]; + if !slice.is_empty() { + values.retain(|v| { + slice = gallop(slice, |kv| &kv.1 < v); + slice.get(0).map(|kv| &kv.1) != Some(v) + }); + } + } + } +} + +pub(crate) mod filter_with { + + use super::{Leaper, Leapers, Relation}; + + /// Wraps a Relation<Tuple> as a leaper. + pub struct FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + relation: &'leap Relation<(Key, Val)>, + key_func: Func, + phantom: ::std::marker::PhantomData<Tuple>, + } + + impl<'leap, Key, Val, Tuple, Func> FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + /// Constructs a FilterWith from a relation and key and value function. + pub fn from(relation: &'leap Relation<(Key, Val)>, key_func: Func) -> Self { + FilterWith { + relation, + key_func, + phantom: ::std::marker::PhantomData, + } + } + } + + impl<'leap, Key, Val, Val2, Tuple, Func> Leaper<'leap, Tuple, Val2> + for FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + fn count(&mut self, prefix: &Tuple) -> usize { + let key_val = (self.key_func)(prefix); + if self.relation.binary_search(&key_val).is_ok() { + usize::max_value() + } else { + 0 + } + } + fn propose(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val2>) { + panic!("FilterWith::propose(): variable apparently unbound."); + } + fn intersect(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val2>) { + // Only here because we didn't return zero above, right? + } + } + + impl<'leap, Key, Val, Tuple, Func> Leapers<'leap, Tuple, ()> + for FilterWith<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + fn for_each_count(&mut self, tuple: &Tuple, mut op: impl FnMut(usize, usize)) { + if <Self as Leaper<Tuple, ()>>::count(self, tuple) == 0 { + op(0, 0) + } else { + op(0, 1) + } + } + + fn propose(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + assert_eq!(min_index, 0); + values.push(&()); + } + + fn intersect(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + assert_eq!(min_index, 0); + assert_eq!(values.len(), 1); + } + } +} + +pub(crate) mod filter_anti { + + use super::{Leaper, Leapers, Relation}; + + /// Wraps a Relation<Tuple> as a leaper. + pub struct FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + relation: &'leap Relation<(Key, Val)>, + key_func: Func, + phantom: ::std::marker::PhantomData<Tuple>, + } + + impl<'leap, Key, Val, Tuple, Func> FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + /// Constructs a FilterAnti from a relation and key and value function. + pub fn from(relation: &'leap Relation<(Key, Val)>, key_func: Func) -> Self { + FilterAnti { + relation, + key_func, + phantom: ::std::marker::PhantomData, + } + } + } + + impl<'leap, Key: Ord, Val: Ord + 'leap, Val2, Tuple: Ord, Func> Leaper<'leap, Tuple, Val2> + for FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + fn count(&mut self, prefix: &Tuple) -> usize { + let key_val = (self.key_func)(prefix); + if self.relation.binary_search(&key_val).is_ok() { + 0 + } else { + usize::max_value() + } + } + fn propose(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val2>) { + panic!("FilterAnti::propose(): variable apparently unbound."); + } + fn intersect(&mut self, _prefix: &Tuple, _values: &mut Vec<&'leap Val2>) { + // Only here because we didn't return zero above, right? + } + } + + impl<'leap, Key, Val, Tuple, Func> Leapers<'leap, Tuple, ()> + for FilterAnti<'leap, Key, Val, Tuple, Func> + where + Key: Ord + 'leap, + Val: Ord + 'leap, + Tuple: Ord, + Func: Fn(&Tuple) -> (Key, Val), + { + fn for_each_count(&mut self, tuple: &Tuple, mut op: impl FnMut(usize, usize)) { + if <Self as Leaper<Tuple, ()>>::count(self, tuple) == 0 { + op(0, 0) + } else { + op(0, 1) + } + } + + fn propose(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + // We only get here if `tuple` is *not* a member of `self.relation` + assert_eq!(min_index, 0); + values.push(&()); + } + + fn intersect(&mut self, _: &Tuple, min_index: usize, values: &mut Vec<&'leap ()>) { + // We only get here if `tuple` is not a member of `self.relation` + assert_eq!(min_index, 0); + assert_eq!(values.len(), 1); + } + } +} + +fn binary_search<T>(slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> usize { + // we maintain the invariant that `lo` many elements of `slice` satisfy `cmp`. + // `hi` is maintained at the first element we know does not satisfy `cmp`. + + let mut hi = slice.len(); + let mut lo = 0; + while lo < hi { + let mid = lo + (hi - lo) / 2; + if cmp(&slice[mid]) { + lo = mid + 1; + } else { + hi = mid; + } + } + lo +} |