diff options
Diffstat (limited to 'vendor/gix-features/src/parallel/eager_iter.rs')
-rw-r--r-- | vendor/gix-features/src/parallel/eager_iter.rs | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/vendor/gix-features/src/parallel/eager_iter.rs b/vendor/gix-features/src/parallel/eager_iter.rs new file mode 100644 index 000000000..60123f54c --- /dev/null +++ b/vendor/gix-features/src/parallel/eager_iter.rs @@ -0,0 +1,124 @@ +/// Evaluate any iterator in their own thread. +/// +/// This is particularly useful if the wrapped iterator performs IO and/or heavy computations. +/// Use [`EagerIter::new()`] for instantiation. +pub struct EagerIter<I: Iterator> { + receiver: std::sync::mpsc::Receiver<Vec<I::Item>>, + chunk: Option<std::vec::IntoIter<I::Item>>, + size_hint: (usize, Option<usize>), +} + +impl<I> EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIter` which evaluates `iter` in its own thread, + /// with a given `chunk_size` allowing a maximum `chunks_in_flight`. + /// + /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`. + /// This helps to reduce the overhead imposed by transferring many small items. + /// If this number is 1, each item will become a single chunk. 0 is invalid. + /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items + /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter` + /// if its items aren't consumed fast enough. + pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight); + let size_hint = iter.size_hint(); + assert!(chunk_size > 0, "non-zero chunk size is needed"); + + std::thread::spawn(move || { + let mut out = Vec::with_capacity(chunk_size); + for item in iter { + out.push(item); + if out.len() == chunk_size { + if sender.send(out).is_err() { + return; + } + out = Vec::with_capacity(chunk_size); + } + } + if !out.is_empty() { + sender.send(out).ok(); + } + }); + EagerIter { + receiver, + chunk: None, + size_hint, + } + } + + fn fill_buf_and_pop(&mut self) -> Option<I::Item> { + self.chunk = self.receiver.recv().ok().map(|v| { + assert!(!v.is_empty()); + v.into_iter() + }); + self.chunk.as_mut().and_then(|c| c.next()) + } +} + +impl<I> Iterator for EagerIter<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self.chunk.as_mut() { + Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()), + None => self.fill_buf_and_pop(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.size_hint + } +} + +/// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition. +pub enum EagerIterIf<I: Iterator> { + /// A separate thread will eagerly evaluate iterator `I`. + Eager(EagerIter<I>), + /// The current thread evaluates `I`. + OnDemand(I), +} + +impl<I> EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + /// Return a new `EagerIterIf` if `condition()` returns true. + /// + /// For all other parameters, please see [`EagerIter::new()`]. + pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { + if condition() { + EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight)) + } else { + EagerIterIf::OnDemand(iter) + } + } +} +impl<I> Iterator for EagerIterIf<I> +where + I: Iterator + Send + 'static, + <I as Iterator>::Item: Send, +{ + type Item = I::Item; + + fn next(&mut self) -> Option<Self::Item> { + match self { + EagerIterIf::OnDemand(i) => i.next(), + EagerIterIf::Eager(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + EagerIterIf::OnDemand(i) => i.size_hint(), + EagerIterIf::Eager(i) => i.size_hint(), + } + } +} |