diff options
Diffstat (limited to 'third_party/rust/rayon/src/iter/plumbing')
-rw-r--r-- | third_party/rust/rayon/src/iter/plumbing/README.md | 315 | ||||
-rw-r--r-- | third_party/rust/rayon/src/iter/plumbing/mod.rs | 484 |
2 files changed, 799 insertions, 0 deletions
diff --git a/third_party/rust/rayon/src/iter/plumbing/README.md b/third_party/rust/rayon/src/iter/plumbing/README.md new file mode 100644 index 0000000000..dbee36ba00 --- /dev/null +++ b/third_party/rust/rayon/src/iter/plumbing/README.md @@ -0,0 +1,315 @@ +# Parallel Iterators + +These are some notes on the design of the parallel iterator traits. +This file does not describe how to **use** parallel iterators. + +## The challenge + +Parallel iterators are more complicated than sequential iterators. +The reason is that they have to be able to split themselves up and +operate in parallel across the two halves. + +The current design for parallel iterators has two distinct modes in +which they can be used; as we will see, not all iterators support both +modes (which is why there are two): + +- **Pull mode** (the `Producer` and `UnindexedProducer` traits): in this mode, + the iterator is asked to produce the next item using a call to `next`. This + is basically like a normal iterator, but with a twist: you can split the + iterator in half to produce disjoint items in separate threads. + - in the `Producer` trait, splitting is done with `split_at`, which accepts + an index where the split should be performed. Only indexed iterators can + work in this mode, as they know exactly how much data they will produce, + and how to locate the requested index. + - in the `UnindexedProducer` trait, splitting is done with `split`, which + simply requests that the producer divide itself *approximately* in half. + This is useful when the exact length and/or layout is unknown, as with + `String` characters, or when the length might exceed `usize`, as with + `Range<u64>` on 32-bit platforms. + - In theory, any `Producer` could act unindexed, but we don't currently + use that possibility. When you know the exact length, a `split` can + simply be implemented as `split_at(length/2)`. +- **Push mode** (the `Consumer` and `UnindexedConsumer` traits): in + this mode, the iterator instead is *given* each item in turn, which + is then processed. This is the opposite of a normal iterator. It's + more like a `for_each` call: each time a new item is produced, the + `consume` method is called with that item. (The traits themselves are + a bit more complex, as they support state that can be threaded + through and ultimately reduced.) Unlike producers, there are two + variants of consumers. The difference is how the split is performed: + - in the `Consumer` trait, splitting is done with `split_at`, which + accepts an index where the split should be performed. All + iterators can work in this mode. The resulting halves thus have an + idea about how much data they expect to consume. + - in the `UnindexedConsumer` trait, splitting is done with + `split_off_left`. There is no index: the resulting halves must be + prepared to process any amount of data, and they don't know where that + data falls in the overall stream. + - Not all consumers can operate in this mode. It works for + `for_each` and `reduce`, for example, but it does not work for + `collect_into_vec`, since in that case the position of each item is + important for knowing where it ends up in the target collection. + +## How iterator execution proceeds + +We'll walk through this example iterator chain to start. This chain +demonstrates more-or-less the full complexity of what can happen. + +```rust +vec1.par_iter() + .zip(vec2.par_iter()) + .flat_map(some_function) + .for_each(some_other_function) +``` + +To handle an iterator chain, we start by creating consumers. This +works from the end. So in this case, the call to `for_each` is the +final step, so it will create a `ForEachConsumer` that, given an item, +just calls `some_other_function` with that item. (`ForEachConsumer` is +a very simple consumer because it doesn't need to thread any state +between items at all.) + +Now, the `for_each` call will pass this consumer to the base iterator, +which is the `flat_map`. It will do this by calling the `drive_unindexed` +method on the `ParallelIterator` trait. `drive_unindexed` basically +says "produce items for this iterator and feed them to this consumer"; +it only works for unindexed consumers. + +(As an aside, it is interesting that only some consumers can work in +unindexed mode, but all producers can *drive* an unindexed consumer. +In contrast, only some producers can drive an *indexed* consumer, but +all consumers can be supplied indexes. Isn't variance neat.) + +As it happens, `FlatMap` only works with unindexed consumers anyway. +This is because flat-map basically has no idea how many items it will +produce. If you ask flat-map to produce the 22nd item, it can't do it, +at least not without some intermediate state. It doesn't know whether +processing the first item will create 1 item, 3 items, or 100; +therefore, to produce an arbitrary item, it would basically just have +to start at the beginning and execute sequentially, which is not what +we want. But for unindexed consumers, this doesn't matter, since they +don't need to know how much data they will get. + +Therefore, `FlatMap` can wrap the `ForEachConsumer` with a +`FlatMapConsumer` that feeds to it. This `FlatMapConsumer` will be +given one item. It will then invoke `some_function` to get a parallel +iterator out. It will then ask this new parallel iterator to drive the +`ForEachConsumer`. The `drive_unindexed` method on `flat_map` can then +pass the `FlatMapConsumer` up the chain to the previous item, which is +`zip`. At this point, something interesting happens. + +## Switching from push to pull mode + +If you think about `zip`, it can't really be implemented as a +consumer, at least not without an intermediate thread and some +channels or something (or maybe coroutines). The problem is that it +has to walk two iterators *in lockstep*. Basically, it can't call two +`drive` methods simultaneously, it can only call one at a time. So at +this point, the `zip` iterator needs to switch from *push mode* into +*pull mode*. + +You'll note that `Zip` is only usable if its inputs implement +`IndexedParallelIterator`, meaning that they can produce data starting +at random points in the stream. This need to switch to push mode is +exactly why. If we want to split a zip iterator at position 22, we +need to be able to start zipping items from index 22 right away, +without having to start from index 0. + +Anyway, so at this point, the `drive_unindexed` method for `Zip` stops +creating consumers. Instead, it creates a *producer*, a `ZipProducer`, +to be exact, and calls the `bridge` function in the `internals` +module. Creating a `ZipProducer` will in turn create producers for +the two iterators being zipped. This is possible because they both +implement `IndexedParallelIterator`. + +The `bridge` function will then connect the consumer, which is +handling the `flat_map` and `for_each`, with the producer, which is +handling the `zip` and its predecessors. It will split down until the +chunks seem reasonably small, then pull items from the producer and +feed them to the consumer. + +## The base case + +The other time that `bridge` gets used is when we bottom out in an +indexed producer, such as a slice or range. There is also a +`bridge_unindexed` equivalent for - you guessed it - unindexed producers, +such as string characters. + +<a name="producer-callback"> + +## What on earth is `ProducerCallback`? + +We saw that when you call a parallel action method like +`par_iter.reduce()`, that will create a "reducing" consumer and then +invoke `par_iter.drive_unindexed()` (or `par_iter.drive()`) as +appropriate. This may create yet more consumers as we proceed up the +parallel iterator chain. But at some point we're going to get to the +start of the chain, or to a parallel iterator (like `zip()`) that has +to coordinate multiple inputs. At that point, we need to start +converting parallel iterators into producers. + +The way we do this is by invoking the method `with_producer()`, defined on +`IndexedParallelIterator`. This is a callback scheme. In an ideal world, +it would work like this: + +```rust +base_iter.with_producer(|base_producer| { + // here, `base_producer` is the producer for `base_iter` +}); +``` + +In that case, we could implement a combinator like `map()` by getting +the producer for the base iterator, wrapping it to make our own +`MapProducer`, and then passing that to the callback. Something like +this: + +```rust +struct MapProducer<'f, P, F: 'f> { + base: P, + map_op: &'f F, +} + +impl<I, F> IndexedParallelIterator for Map<I, F> + where I: IndexedParallelIterator, + F: MapOp<I::Item>, +{ + fn with_producer<CB>(self, callback: CB) -> CB::Output { + let map_op = &self.map_op; + self.base_iter.with_producer(|base_producer| { + // Here `producer` is the producer for `self.base_iter`. + // Wrap that to make a `MapProducer` + let map_producer = MapProducer { + base: base_producer, + map_op: map_op + }; + + // invoke the callback with the wrapped version + callback(map_producer) + }); + } +}); +``` + +This example demonstrates some of the power of the callback scheme. +It winds up being a very flexible setup. For one thing, it means we +can take ownership of `par_iter`; we can then in turn give ownership +away of its bits and pieces into the producer (this is very useful if +the iterator owns an `&mut` slice, for example), or create shared +references and put *those* in the producer. In the case of map, for +example, the parallel iterator owns the `map_op`, and we borrow +references to it which we then put into the `MapProducer` (this means +the `MapProducer` can easily split itself and share those references). +The `with_producer` method can also create resources that are needed +during the parallel execution, since the producer does not have to be +returned. + +Unfortunately there is a catch. We can't actually use closures the way +I showed you. To see why, think about the type that `map_producer` +would have to have. If we were going to write the `with_producer` +method using a closure, it would have to look something like this: + +```rust +pub trait IndexedParallelIterator: ParallelIterator { + type Producer; + fn with_producer<CB, R>(self, callback: CB) -> R + where CB: FnOnce(Self::Producer) -> R; + ... +} +``` + +Note that we had to add this associated type `Producer` so that +we could specify the argument of the callback to be `Self::Producer`. +Now, imagine trying to write that `MapProducer` impl using this style: + +```rust +impl<I, F> IndexedParallelIterator for Map<I, F> + where I: IndexedParallelIterator, + F: MapOp<I::Item>, +{ + type MapProducer = MapProducer<'f, P::Producer, F>; + // ^^ wait, what is this `'f`? + + fn with_producer<CB, R>(self, callback: CB) -> R + where CB: FnOnce(Self::Producer) -> R + { + let map_op = &self.map_op; + // ^^^^^^ `'f` is (conceptually) the lifetime of this reference, + // so it will be different for each call to `with_producer`! + } +} +``` + +This may look familiar to you: it's the same problem that we have +trying to define an `Iterable` trait. Basically, the producer type +needs to include a lifetime (here, `'f`) that refers to the body of +`with_producer` and hence is not in scope at the impl level. + +If we had [associated type constructors][1598], we could solve this +problem that way. But there is another solution. We can use a +dedicated callback trait like `ProducerCallback`, instead of `FnOnce`: + +[1598]: https://github.com/rust-lang/rfcs/pull/1598 + +```rust +pub trait ProducerCallback<T> { + type Output; + fn callback<P>(self, producer: P) -> Self::Output + where P: Producer<Item=T>; +} +``` + +Using this trait, the signature of `with_producer()` looks like this: + +```rust +fn with_producer<CB: ProducerCallback<Self::Item>>(self, callback: CB) -> CB::Output; +``` + +Notice that this signature **never has to name the producer type** -- +there is no associated type `Producer` anymore. This is because the +`callback()` method is generically over **all** producers `P`. + +The problem is that now the `||` sugar doesn't work anymore. So we +have to manually create the callback struct, which is a mite tedious. +So our `MapProducer` code looks like this: + +```rust +impl<I, F> IndexedParallelIterator for Map<I, F> + where I: IndexedParallelIterator, + F: MapOp<I::Item>, +{ + fn with_producer<CB>(self, callback: CB) -> CB::Output + where CB: ProducerCallback<Self::Item> + { + return self.base.with_producer(Callback { callback: callback, map_op: self.map_op }); + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + // Manual version of the closure sugar: create an instance + // of a struct that implements `ProducerCallback`. + + // The struct declaration. Each field is something that need to capture from the + // creating scope. + struct Callback<CB, F> { + callback: CB, + map_op: F, + } + + // Implement the `ProducerCallback` trait. This is pure boilerplate. + impl<T, F, CB> ProducerCallback<T> for Callback<CB, F> + where F: MapOp<T>, + CB: ProducerCallback<F::Output> + { + type Output = CB::Output; + + fn callback<P>(self, base: P) -> CB::Output + where P: Producer<Item=T> + { + // The body of the closure is here: + let producer = MapProducer { base: base, + map_op: &self.map_op }; + self.callback.callback(producer) + } + } + } +} +``` + +OK, a bit tedious, but it works! diff --git a/third_party/rust/rayon/src/iter/plumbing/mod.rs b/third_party/rust/rayon/src/iter/plumbing/mod.rs new file mode 100644 index 0000000000..71d4fb4c3d --- /dev/null +++ b/third_party/rust/rayon/src/iter/plumbing/mod.rs @@ -0,0 +1,484 @@ +//! Traits and functions used to implement parallel iteration. These are +//! low-level details -- users of parallel iterators should not need to +//! interact with them directly. See [the `plumbing` README][r] for a general overview. +//! +//! [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md + +use crate::join_context; + +use super::IndexedParallelIterator; + +use std::cmp; +use std::usize; + +/// The `ProducerCallback` trait is a kind of generic closure, +/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in +/// the plumbing README][r] for more details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback +/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html +pub trait ProducerCallback<T> { + /// The type of value returned by this callback. Analogous to + /// [`Output` from the `FnOnce` trait][Output]. + /// + /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output + type Output; + + /// Invokes the callback with the given producer as argument. The + /// key point of this trait is that this method is generic over + /// `P`, and hence implementors must be defined for any producer. + fn callback<P>(self, producer: P) -> Self::Output + where + P: Producer<Item = T>; +} + +/// A `Producer` is effectively a "splittable `IntoIterator`". That +/// is, a producer is a value which can be converted into an iterator +/// at any time: at that point, it simply produces items on demand, +/// like any iterator. But what makes a `Producer` special is that, +/// *before* we convert to an iterator, we can also **split** it at a +/// particular point using the `split_at` method. This will yield up +/// two producers, one producing the items before that point, and one +/// producing the items after that point (these two producers can then +/// independently be split further, or be converted into iterators). +/// In Rayon, this splitting is used to divide between threads. +/// See [the `plumbing` README][r] for further details. +/// +/// Note that each producer will always produce a fixed number of +/// items N. However, this number N is not queryable through the API; +/// the consumer is expected to track it. +/// +/// NB. You might expect `Producer` to extend the `IntoIterator` +/// trait. However, [rust-lang/rust#20671][20671] prevents us from +/// declaring the DoubleEndedIterator and ExactSizeIterator +/// constraints on a required IntoIterator trait, so we inline +/// IntoIterator here until that issue is fixed. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md +/// [20671]: https://github.com/rust-lang/rust/issues/20671 +pub trait Producer: Send + Sized { + /// The type of item that will be produced by this producer once + /// it is converted into an iterator. + type Item; + + /// The type of iterator we will become. + type IntoIter: Iterator<Item = Self::Item> + DoubleEndedIterator + ExactSizeIterator; + + /// Convert `self` into an iterator; at this point, no more parallel splits + /// are possible. + fn into_iter(self) -> Self::IntoIter; + + /// The minimum number of items that we will process + /// sequentially. Defaults to 1, which means that we will split + /// all the way down to a single item. This can be raised higher + /// using the [`with_min_len`] method, which will force us to + /// create sequential tasks at a larger granularity. Note that + /// Rayon automatically normally attempts to adjust the size of + /// parallel splits to reduce overhead, so this should not be + /// needed. + /// + /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len + fn min_len(&self) -> usize { + 1 + } + + /// The maximum number of items that we will process + /// sequentially. Defaults to MAX, which means that we can choose + /// not to split at all. This can be lowered using the + /// [`with_max_len`] method, which will force us to create more + /// parallel tasks. Note that Rayon automatically normally + /// attempts to adjust the size of parallel splits to reduce + /// overhead, so this should not be needed. + /// + /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len + fn max_len(&self) -> usize { + usize::MAX + } + + /// Split into two producers; one produces items `0..index`, the + /// other `index..N`. Index must be less than or equal to `N`. + fn split_at(self, index: usize) -> (Self, Self); + + /// Iterate the producer, feeding each element to `folder`, and + /// stop when the folder is full (or all elements have been consumed). + /// + /// The provided implementation is sufficient for most iterables. + fn fold_with<F>(self, folder: F) -> F + where + F: Folder<Self::Item>, + { + folder.consume_iter(self.into_iter()) + } +} + +/// A consumer is effectively a [generalized "fold" operation][fold], +/// and in fact each consumer will eventually be converted into a +/// [`Folder`]. What makes a consumer special is that, like a +/// [`Producer`], it can be **split** into multiple consumers using +/// the `split_at` method. When a consumer is split, it produces two +/// consumers, as well as a **reducer**. The two consumers can be fed +/// items independently, and when they are done the reducer is used to +/// combine their two results into one. See [the `plumbing` +/// README][r] for further details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold +/// [`Folder`]: trait.Folder.html +/// [`Producer`]: trait.Producer.html +pub trait Consumer<Item>: Send + Sized { + /// The type of folder that this consumer can be converted into. + type Folder: Folder<Item, Result = Self::Result>; + + /// The type of reducer that is produced if this consumer is split. + type Reducer: Reducer<Self::Result>; + + /// The type of result that this consumer will ultimately produce. + type Result: Send; + + /// Divide the consumer into two consumers, one processing items + /// `0..index` and one processing items from `index..`. Also + /// produces a reducer that can be used to reduce the results at + /// the end. + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); + + /// Convert the consumer into a folder that can consume items + /// sequentially, eventually producing a final result. + fn into_folder(self) -> Self::Folder; + + /// Hint whether this `Consumer` would like to stop processing + /// further items, e.g. if a search has been completed. + fn full(&self) -> bool; +} + +/// The `Folder` trait encapsulates [the standard fold +/// operation][fold]. It can be fed many items using the `consume` +/// method. At the end, once all items have been consumed, it can then +/// be converted (using `complete`) into a final value. +/// +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold +pub trait Folder<Item>: Sized { + /// The type of result that will ultimately be produced by the folder. + type Result; + + /// Consume next item and return new sequential state. + fn consume(self, item: Item) -> Self; + + /// Consume items from the iterator until full, and return new sequential state. + /// + /// This method is **optional**. The default simply iterates over + /// `iter`, invoking `consume` and checking after each iteration + /// whether `full` returns false. + /// + /// The main reason to override it is if you can provide a more + /// specialized, efficient implementation. + fn consume_iter<I>(mut self, iter: I) -> Self + where + I: IntoIterator<Item = Item>, + { + for item in iter { + self = self.consume(item); + if self.full() { + break; + } + } + self + } + + /// Finish consuming items, produce final result. + fn complete(self) -> Self::Result; + + /// Hint whether this `Folder` would like to stop processing + /// further items, e.g. if a search has been completed. + fn full(&self) -> bool; +} + +/// The reducer is the final step of a `Consumer` -- after a consumer +/// has been split into two parts, and each of those parts has been +/// fully processed, we are left with two results. The reducer is then +/// used to combine those two results into one. See [the `plumbing` +/// README][r] for further details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md +pub trait Reducer<Result> { + /// Reduce two final results into one; this is executed after a + /// split. + fn reduce(self, left: Result, right: Result) -> Result; +} + +/// A stateless consumer can be freely copied. These consumers can be +/// used like regular consumers, but they also support a +/// `split_off_left` method that does not take an index to split, but +/// simply splits at some arbitrary point (`for_each`, for example, +/// produces an unindexed consumer). +pub trait UnindexedConsumer<I>: Consumer<I> { + /// Splits off a "left" consumer and returns it. The `self` + /// consumer should then be used to consume the "right" portion of + /// the data. (The ordering matters for methods like find_first -- + /// values produced by the returned value are given precedence + /// over values produced by `self`.) Once the left and right + /// halves have been fully consumed, you should reduce the results + /// with the result of `to_reducer`. + fn split_off_left(&self) -> Self; + + /// Creates a reducer that can be used to combine the results from + /// a split consumer. + fn to_reducer(&self) -> Self::Reducer; +} + +/// A variant on `Producer` which does not know its exact length or +/// cannot represent it in a `usize`. These producers act like +/// ordinary producers except that they cannot be told to split at a +/// particular point. Instead, you just ask them to split 'somewhere'. +/// +/// (In principle, `Producer` could extend this trait; however, it +/// does not because to do so would require producers to carry their +/// own length with them.) +pub trait UnindexedProducer: Send + Sized { + /// The type of item returned by this producer. + type Item; + + /// Split midway into a new producer if possible, otherwise return `None`. + fn split(self) -> (Self, Option<Self>); + + /// Iterate the producer, feeding each element to `folder`, and + /// stop when the folder is full (or all elements have been consumed). + fn fold_with<F>(self, folder: F) -> F + where + F: Folder<Self::Item>; +} + +/// A splitter controls the policy for splitting into smaller work items. +/// +/// Thief-splitting is an adaptive policy that starts by splitting into +/// enough jobs for every worker thread, and then resets itself whenever a +/// job is actually stolen into a different thread. +#[derive(Clone, Copy)] +struct Splitter { + /// The `splits` tell us approximately how many remaining times we'd + /// like to split this job. We always just divide it by two though, so + /// the effective number of pieces will be `next_power_of_two()`. + splits: usize, +} + +impl Splitter { + #[inline] + fn new() -> Splitter { + Splitter { + splits: crate::current_num_threads(), + } + } + + #[inline] + fn try_split(&mut self, stolen: bool) -> bool { + let Splitter { splits } = *self; + + if stolen { + // This job was stolen! Reset the number of desired splits to the + // thread count, if that's more than we had remaining anyway. + self.splits = cmp::max(crate::current_num_threads(), self.splits / 2); + true + } else if splits > 0 { + // We have splits remaining, make it so. + self.splits /= 2; + true + } else { + // Not stolen, and no more splits -- we're done! + false + } + } +} + +/// The length splitter is built on thief-splitting, but additionally takes +/// into account the remaining length of the iterator. +#[derive(Clone, Copy)] +struct LengthSplitter { + inner: Splitter, + + /// The smallest we're willing to divide into. Usually this is just 1, + /// but you can choose a larger working size with `with_min_len()`. + min: usize, +} + +impl LengthSplitter { + /// Creates a new splitter based on lengths. + /// + /// The `min` is a hard lower bound. We'll never split below that, but + /// of course an iterator might start out smaller already. + /// + /// The `max` is an upper bound on the working size, used to determine + /// the minimum number of times we need to split to get under that limit. + /// The adaptive algorithm may very well split even further, but never + /// smaller than the `min`. + #[inline] + fn new(min: usize, max: usize, len: usize) -> LengthSplitter { + let mut splitter = LengthSplitter { + inner: Splitter::new(), + min: cmp::max(min, 1), + }; + + // Divide the given length by the max working length to get the minimum + // number of splits we need to get under that max. This rounds down, + // but the splitter actually gives `next_power_of_two()` pieces anyway. + // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. + let min_splits = len / cmp::max(max, 1); + + // Only update the value if it's not splitting enough already. + if min_splits > splitter.inner.splits { + splitter.inner.splits = min_splits; + } + + splitter + } + + #[inline] + fn try_split(&mut self, len: usize, stolen: bool) -> bool { + // If splitting wouldn't make us too small, try the inner splitter. + len / 2 >= self.min && self.inner.try_split(stolen) + } +} + +/// This helper function is used to "connect" a parallel iterator to a +/// consumer. It will convert the `par_iter` into a producer P and +/// then pull items from P and feed them to `consumer`, splitting and +/// creating parallel threads as needed. +/// +/// This is useful when you are implementing your own parallel +/// iterators: it is often used as the definition of the +/// [`drive_unindexed`] or [`drive`] methods. +/// +/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed +/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive +pub fn bridge<I, C>(par_iter: I, consumer: C) -> C::Result +where + I: IndexedParallelIterator, + C: Consumer<I::Item>, +{ + let len = par_iter.len(); + return par_iter.with_producer(Callback { len, consumer }); + + struct Callback<C> { + len: usize, + consumer: C, + } + + impl<C, I> ProducerCallback<I> for Callback<C> + where + C: Consumer<I>, + { + type Output = C::Result; + fn callback<P>(self, producer: P) -> C::Result + where + P: Producer<Item = I>, + { + bridge_producer_consumer(self.len, producer, self.consumer) + } + } +} + +/// This helper function is used to "connect" a producer and a +/// consumer. You may prefer to call [`bridge`], which wraps this +/// function. This function will draw items from `producer` and feed +/// them to `consumer`, splitting and creating parallel tasks when +/// needed. +/// +/// This is useful when you are implementing your own parallel +/// iterators: it is often used as the definition of the +/// [`drive_unindexed`] or [`drive`] methods. +/// +/// [`bridge`]: fn.bridge.html +/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed +/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive +pub fn bridge_producer_consumer<P, C>(len: usize, producer: P, consumer: C) -> C::Result +where + P: Producer, + C: Consumer<P::Item>, +{ + let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); + return helper(len, false, splitter, producer, consumer); + + fn helper<P, C>( + len: usize, + migrated: bool, + mut splitter: LengthSplitter, + producer: P, + consumer: C, + ) -> C::Result + where + P: Producer, + C: Consumer<P::Item>, + { + if consumer.full() { + consumer.into_folder().complete() + } else if splitter.try_split(len, migrated) { + let mid = len / 2; + let (left_producer, right_producer) = producer.split_at(mid); + let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); + let (left_result, right_result) = join_context( + |context| { + helper( + mid, + context.migrated(), + splitter, + left_producer, + left_consumer, + ) + }, + |context| { + helper( + len - mid, + context.migrated(), + splitter, + right_producer, + right_consumer, + ) + }, + ); + reducer.reduce(left_result, right_result) + } else { + producer.fold_with(consumer.into_folder()).complete() + } + } +} + +/// A variant of [`bridge_producer_consumer`] where the producer is an unindexed producer. +/// +/// [`bridge_producer_consumer`]: fn.bridge_producer_consumer.html +pub fn bridge_unindexed<P, C>(producer: P, consumer: C) -> C::Result +where + P: UnindexedProducer, + C: UnindexedConsumer<P::Item>, +{ + let splitter = Splitter::new(); + bridge_unindexed_producer_consumer(false, splitter, producer, consumer) +} + +fn bridge_unindexed_producer_consumer<P, C>( + migrated: bool, + mut splitter: Splitter, + producer: P, + consumer: C, +) -> C::Result +where + P: UnindexedProducer, + C: UnindexedConsumer<P::Item>, +{ + if consumer.full() { + consumer.into_folder().complete() + } else if splitter.try_split(migrated) { + match producer.split() { + (left_producer, Some(right_producer)) => { + let (reducer, left_consumer, right_consumer) = + (consumer.to_reducer(), consumer.split_off_left(), consumer); + let bridge = bridge_unindexed_producer_consumer; + let (left_result, right_result) = join_context( + |context| bridge(context.migrated(), splitter, left_producer, left_consumer), + |context| bridge(context.migrated(), splitter, right_producer, right_consumer), + ); + reducer.reduce(left_result, right_result) + } + (producer, None) => producer.fold_with(consumer.into_folder()).complete(), + } + } else { + producer.fold_with(consumer.into_folder()).complete() + } +} |