/// Evaluate any iterator in their own thread. /// /// This is particularly useful if the wrapped iterator performs IO and/or heavy computations. /// Use [`EagerIter::new()`] for instantiation. pub struct EagerIter { receiver: std::sync::mpsc::Receiver>, chunk: Option>, size_hint: (usize, Option), } impl EagerIter where I: Iterator + Send + 'static, ::Item: Send, { /// Return a new `EagerIter` which evaluates `iter` in its own thread, /// with a given `chunk_size` allowing a maximum `chunks_in_flight`. /// /// * `chunk_size` describes how many items returned by `iter` will be a single item of this `EagerIter`. /// This helps to reduce the overhead imposed by transferring many small items. /// If this number is 1, each item will become a single chunk. 0 is invalid. /// * `chunks_in_flight` describes how many chunks can be kept in memory in case the consumer of the `EagerIter`s items /// isn't consuming them fast enough. Setting this number to 0 effectively turns off any caching, but blocks `EagerIter` /// if its items aren't consumed fast enough. pub fn new(iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { let (sender, receiver) = std::sync::mpsc::sync_channel(chunks_in_flight); let size_hint = iter.size_hint(); assert!(chunk_size > 0, "non-zero chunk size is needed"); std::thread::spawn(move || { let mut out = Vec::with_capacity(chunk_size); for item in iter { out.push(item); if out.len() == chunk_size { if sender.send(out).is_err() { return; } out = Vec::with_capacity(chunk_size); } } if !out.is_empty() { sender.send(out).ok(); } }); EagerIter { receiver, chunk: None, size_hint, } } fn fill_buf_and_pop(&mut self) -> Option { self.chunk = self.receiver.recv().ok().map(|v| { assert!(!v.is_empty()); v.into_iter() }); self.chunk.as_mut().and_then(Iterator::next) } } impl Iterator for EagerIter where I: Iterator + Send + 'static, ::Item: Send, { type Item = I::Item; fn next(&mut self) -> Option { match self.chunk.as_mut() { Some(chunk) => chunk.next().or_else(|| self.fill_buf_and_pop()), None => self.fill_buf_and_pop(), } } fn size_hint(&self) -> (usize, Option) { self.size_hint } } /// An conditional `EagerIter`, which may become a just-in-time iterator running in the main thread depending on a condition. pub enum EagerIterIf { /// A separate thread will eagerly evaluate iterator `I`. Eager(EagerIter), /// The current thread evaluates `I`. OnDemand(I), } impl EagerIterIf where I: Iterator + Send + 'static, ::Item: Send, { /// Return a new `EagerIterIf` if `condition()` returns true. /// /// For all other parameters, please see [`EagerIter::new()`]. pub fn new(condition: impl FnOnce() -> bool, iter: I, chunk_size: usize, chunks_in_flight: usize) -> Self { if condition() { EagerIterIf::Eager(EagerIter::new(iter, chunk_size, chunks_in_flight)) } else { EagerIterIf::OnDemand(iter) } } } impl Iterator for EagerIterIf where I: Iterator + Send + 'static, ::Item: Send, { type Item = I::Item; fn next(&mut self) -> Option { match self { EagerIterIf::OnDemand(i) => i.next(), EagerIterIf::Eager(i) => i.next(), } } fn size_hint(&self) -> (usize, Option) { match self { EagerIterIf::OnDemand(i) => i.size_hint(), EagerIterIf::Eager(i) => i.size_hint(), } } }