From 698f8c2f01ea549d77d7dc3338a12e04c11057b9 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:02:58 +0200 Subject: Adding upstream version 1.64.0+dfsg1. Signed-off-by: Daniel Baumann --- vendor/rayon/src/iter/unzip.rs | 525 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 525 insertions(+) create mode 100644 vendor/rayon/src/iter/unzip.rs (limited to 'vendor/rayon/src/iter/unzip.rs') diff --git a/vendor/rayon/src/iter/unzip.rs b/vendor/rayon/src/iter/unzip.rs new file mode 100644 index 000000000..0b7074e92 --- /dev/null +++ b/vendor/rayon/src/iter/unzip.rs @@ -0,0 +1,525 @@ +use super::plumbing::*; +use super::*; + +/// This trait abstracts the different ways we can "unzip" one parallel +/// iterator into two distinct consumers, which we can handle almost +/// identically apart from how to process the individual items. +trait UnzipOp: Sync + Send { + /// The type of item expected by the left consumer. + type Left: Send; + + /// The type of item expected by the right consumer. + type Right: Send; + + /// Consumes one item and feeds it to one or both of the underlying folders. + fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) + where + FA: Folder, + FB: Folder; + + /// Reports whether this op may support indexed consumers. + /// - e.g. true for `unzip` where the item count passed through directly. + /// - e.g. false for `partition` where the sorting is not yet known. + fn indexable() -> bool { + false + } +} + +/// Runs an unzip-like operation into default `ParallelExtend` collections. +fn execute(pi: I, op: OP) -> (FromA, FromB) +where + I: ParallelIterator, + OP: UnzipOp, + FromA: Default + Send + ParallelExtend, + FromB: Default + Send + ParallelExtend, +{ + let mut a = FromA::default(); + let mut b = FromB::default(); + execute_into(&mut a, &mut b, pi, op); + (a, b) +} + +/// Runs an unzip-like operation into `ParallelExtend` collections. +fn execute_into(a: &mut FromA, b: &mut FromB, pi: I, op: OP) +where + I: ParallelIterator, + OP: UnzipOp, + FromA: Send + ParallelExtend, + FromB: Send + ParallelExtend, +{ + // We have no idea what the consumers will look like for these + // collections' `par_extend`, but we can intercept them in our own + // `drive_unindexed`. Start with the left side, type `A`: + let iter = UnzipA { base: pi, op, b }; + a.par_extend(iter); +} + +/// Unzips the items of a parallel iterator into a pair of arbitrary +/// `ParallelExtend` containers. +/// +/// This is called by `ParallelIterator::unzip`. +pub(super) fn unzip(pi: I) -> (FromA, FromB) +where + I: ParallelIterator, + FromA: Default + Send + ParallelExtend, + FromB: Default + Send + ParallelExtend, + A: Send, + B: Send, +{ + execute(pi, Unzip) +} + +/// Unzips an `IndexedParallelIterator` into two arbitrary `Consumer`s. +/// +/// This is called by `super::collect::unzip_into_vecs`. +pub(super) fn unzip_indexed(pi: I, left: CA, right: CB) -> (CA::Result, CB::Result) +where + I: IndexedParallelIterator, + CA: Consumer, + CB: Consumer, + A: Send, + B: Send, +{ + let consumer = UnzipConsumer { + op: &Unzip, + left, + right, + }; + pi.drive(consumer) +} + +/// An `UnzipOp` that splits a tuple directly into the two consumers. +struct Unzip; + +impl UnzipOp<(A, B)> for Unzip { + type Left = A; + type Right = B; + + fn consume(&self, item: (A, B), left: FA, right: FB) -> (FA, FB) + where + FA: Folder, + FB: Folder, + { + (left.consume(item.0), right.consume(item.1)) + } + + fn indexable() -> bool { + true + } +} + +/// Partitions the items of a parallel iterator into a pair of arbitrary +/// `ParallelExtend` containers. +/// +/// This is called by `ParallelIterator::partition`. +pub(super) fn partition(pi: I, predicate: P) -> (A, B) +where + I: ParallelIterator, + A: Default + Send + ParallelExtend, + B: Default + Send + ParallelExtend, + P: Fn(&I::Item) -> bool + Sync + Send, +{ + execute(pi, Partition { predicate }) +} + +/// An `UnzipOp` that routes items depending on a predicate function. +struct Partition

{ + predicate: P, +} + +impl UnzipOp for Partition

+where + P: Fn(&T) -> bool + Sync + Send, + T: Send, +{ + type Left = T; + type Right = T; + + fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) + where + FA: Folder, + FB: Folder, + { + if (self.predicate)(&item) { + (left.consume(item), right) + } else { + (left, right.consume(item)) + } + } +} + +/// Partitions and maps the items of a parallel iterator into a pair of +/// arbitrary `ParallelExtend` containers. +/// +/// This called by `ParallelIterator::partition_map`. +pub(super) fn partition_map(pi: I, predicate: P) -> (A, B) +where + I: ParallelIterator, + A: Default + Send + ParallelExtend, + B: Default + Send + ParallelExtend, + P: Fn(I::Item) -> Either + Sync + Send, + L: Send, + R: Send, +{ + execute(pi, PartitionMap { predicate }) +} + +/// An `UnzipOp` that routes items depending on how they are mapped `Either`. +struct PartitionMap

{ + predicate: P, +} + +impl UnzipOp for PartitionMap

+where + P: Fn(T) -> Either + Sync + Send, + L: Send, + R: Send, +{ + type Left = L; + type Right = R; + + fn consume(&self, item: T, left: FA, right: FB) -> (FA, FB) + where + FA: Folder, + FB: Folder, + { + match (self.predicate)(item) { + Either::Left(item) => (left.consume(item), right), + Either::Right(item) => (left, right.consume(item)), + } + } +} + +/// A fake iterator to intercept the `Consumer` for type `A`. +struct UnzipA<'b, I, OP, FromB> { + base: I, + op: OP, + b: &'b mut FromB, +} + +impl<'b, I, OP, FromB> ParallelIterator for UnzipA<'b, I, OP, FromB> +where + I: ParallelIterator, + OP: UnzipOp, + FromB: Send + ParallelExtend, +{ + type Item = OP::Left; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + let mut result = None; + { + // Now it's time to find the consumer for type `B` + let iter = UnzipB { + base: self.base, + op: self.op, + left_consumer: consumer, + left_result: &mut result, + }; + self.b.par_extend(iter); + } + // NB: If for some reason `b.par_extend` doesn't actually drive the + // iterator, then we won't have a result for the left side to return + // at all. We can't fake an arbitrary consumer's result, so panic. + result.expect("unzip consumers didn't execute!") + } + + fn opt_len(&self) -> Option { + if OP::indexable() { + self.base.opt_len() + } else { + None + } + } +} + +/// A fake iterator to intercept the `Consumer` for type `B`. +struct UnzipB<'r, I, OP, CA> +where + I: ParallelIterator, + OP: UnzipOp, + CA: UnindexedConsumer, + CA::Result: 'r, +{ + base: I, + op: OP, + left_consumer: CA, + left_result: &'r mut Option, +} + +impl<'r, I, OP, CA> ParallelIterator for UnzipB<'r, I, OP, CA> +where + I: ParallelIterator, + OP: UnzipOp, + CA: UnindexedConsumer, +{ + type Item = OP::Right; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + // Now that we have two consumers, we can unzip the real iterator. + let consumer = UnzipConsumer { + op: &self.op, + left: self.left_consumer, + right: consumer, + }; + + let result = self.base.drive_unindexed(consumer); + *self.left_result = Some(result.0); + result.1 + } + + fn opt_len(&self) -> Option { + if OP::indexable() { + self.base.opt_len() + } else { + None + } + } +} + +/// `Consumer` that unzips into two other `Consumer`s +struct UnzipConsumer<'a, OP, CA, CB> { + op: &'a OP, + left: CA, + right: CB, +} + +impl<'a, T, OP, CA, CB> Consumer for UnzipConsumer<'a, OP, CA, CB> +where + OP: UnzipOp, + CA: Consumer, + CB: Consumer, +{ + type Folder = UnzipFolder<'a, OP, CA::Folder, CB::Folder>; + type Reducer = UnzipReducer; + type Result = (CA::Result, CB::Result); + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left1, left2, left_reducer) = self.left.split_at(index); + let (right1, right2, right_reducer) = self.right.split_at(index); + + ( + UnzipConsumer { + op: self.op, + left: left1, + right: right1, + }, + UnzipConsumer { + op: self.op, + left: left2, + right: right2, + }, + UnzipReducer { + left: left_reducer, + right: right_reducer, + }, + ) + } + + fn into_folder(self) -> Self::Folder { + UnzipFolder { + op: self.op, + left: self.left.into_folder(), + right: self.right.into_folder(), + } + } + + fn full(&self) -> bool { + // don't stop until everyone is full + self.left.full() && self.right.full() + } +} + +impl<'a, T, OP, CA, CB> UnindexedConsumer for UnzipConsumer<'a, OP, CA, CB> +where + OP: UnzipOp, + CA: UnindexedConsumer, + CB: UnindexedConsumer, +{ + fn split_off_left(&self) -> Self { + UnzipConsumer { + op: self.op, + left: self.left.split_off_left(), + right: self.right.split_off_left(), + } + } + + fn to_reducer(&self) -> Self::Reducer { + UnzipReducer { + left: self.left.to_reducer(), + right: self.right.to_reducer(), + } + } +} + +/// `Folder` that unzips into two other `Folder`s +struct UnzipFolder<'a, OP, FA, FB> { + op: &'a OP, + left: FA, + right: FB, +} + +impl<'a, T, OP, FA, FB> Folder for UnzipFolder<'a, OP, FA, FB> +where + OP: UnzipOp, + FA: Folder, + FB: Folder, +{ + type Result = (FA::Result, FB::Result); + + fn consume(self, item: T) -> Self { + let (left, right) = self.op.consume(item, self.left, self.right); + UnzipFolder { + op: self.op, + left, + right, + } + } + + fn complete(self) -> Self::Result { + (self.left.complete(), self.right.complete()) + } + + fn full(&self) -> bool { + // don't stop until everyone is full + self.left.full() && self.right.full() + } +} + +/// `Reducer` that unzips into two other `Reducer`s +struct UnzipReducer { + left: RA, + right: RB, +} + +impl Reducer<(A, B)> for UnzipReducer +where + RA: Reducer, + RB: Reducer, +{ + fn reduce(self, left: (A, B), right: (A, B)) -> (A, B) { + ( + self.left.reduce(left.0, right.0), + self.right.reduce(left.1, right.1), + ) + } +} + +impl ParallelExtend<(A, B)> for (FromA, FromB) +where + A: Send, + B: Send, + FromA: Send + ParallelExtend, + FromB: Send + ParallelExtend, +{ + fn par_extend(&mut self, pi: I) + where + I: IntoParallelIterator, + { + execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), Unzip); + } +} + +impl ParallelExtend> for (A, B) +where + L: Send, + R: Send, + A: Send + ParallelExtend, + B: Send + ParallelExtend, +{ + fn par_extend(&mut self, pi: I) + where + I: IntoParallelIterator>, + { + execute_into(&mut self.0, &mut self.1, pi.into_par_iter(), UnEither); + } +} + +/// An `UnzipOp` that routes items depending on their `Either` variant. +struct UnEither; + +impl UnzipOp> for UnEither +where + L: Send, + R: Send, +{ + type Left = L; + type Right = R; + + fn consume(&self, item: Either, left: FL, right: FR) -> (FL, FR) + where + FL: Folder, + FR: Folder, + { + match item { + Either::Left(item) => (left.consume(item), right), + Either::Right(item) => (left, right.consume(item)), + } + } +} + +impl FromParallelIterator<(A, B)> for (FromA, FromB) +where + A: Send, + B: Send, + FromA: Send + FromParallelIterator, + FromB: Send + FromParallelIterator, +{ + fn from_par_iter(pi: I) -> Self + where + I: IntoParallelIterator, + { + let (a, b): (Collector, Collector) = pi.into_par_iter().unzip(); + (a.result.unwrap(), b.result.unwrap()) + } +} + +impl FromParallelIterator> for (A, B) +where + L: Send, + R: Send, + A: Send + FromParallelIterator, + B: Send + FromParallelIterator, +{ + fn from_par_iter(pi: I) -> Self + where + I: IntoParallelIterator>, + { + fn identity(x: T) -> T { + x + } + + let (a, b): (Collector, Collector) = pi.into_par_iter().partition_map(identity); + (a.result.unwrap(), b.result.unwrap()) + } +} + +/// Shim to implement a one-time `ParallelExtend` using `FromParallelIterator`. +struct Collector { + result: Option, +} + +impl Default for Collector { + fn default() -> Self { + Collector { result: None } + } +} + +impl ParallelExtend for Collector +where + T: Send, + FromT: Send + FromParallelIterator, +{ + fn par_extend(&mut self, pi: I) + where + I: IntoParallelIterator, + { + debug_assert!(self.result.is_none()); + self.result = Some(pi.into_par_iter().collect()); + } +} -- cgit v1.2.3