use crate::parallel::Reduce; #[cfg(not(feature = "parallel"))] mod not_parallel { use std::sync::atomic::{AtomicBool, AtomicIsize}; /// Runs `left` and then `right`, one after another, returning their output when both are done. pub fn join(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) { (left(), right()) } /// A scope for spawning threads. pub struct Scope<'scope, 'env: 'scope> { _scope: std::marker::PhantomData<&'scope mut &'scope ()>, _env: std::marker::PhantomData<&'env mut &'env ()>, } pub struct ThreadBuilder; /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning. pub fn build_thread() -> ThreadBuilder { ThreadBuilder } #[allow(unsafe_code)] unsafe impl Sync for Scope<'_, '_> {} impl ThreadBuilder { pub fn name(self, _new: String) -> Self { self } pub fn spawn_scoped<'scope, 'env, F, T>( &self, scope: &'scope Scope<'scope, 'env>, f: F, ) -> std::io::Result> where F: FnOnce() -> T + 'scope, T: 'scope, { Ok(scope.spawn(f)) } } impl<'scope, 'env> Scope<'scope, 'env> { /// Provided with this scope, let `f` start new threads that live within it. pub fn spawn(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where F: FnOnce() -> T + 'scope, T: 'scope, { ScopedJoinHandle { result: f(), _marker: Default::default(), } } } /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call. /// Note that this implementation will run the spawned functions immediately. pub fn threads<'env, F, R>(f: F) -> R where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R, { f(&Scope { _scope: Default::default(), _env: Default::default(), }) } /// A handle that can be used to join its scoped thread. /// /// This struct is created by the [`Scope::spawn`] method and the /// [`ScopedThreadBuilder::spawn`] method. pub struct ScopedJoinHandle<'scope, T> { /// Holds the result of the inner closure. result: T, _marker: std::marker::PhantomData<&'scope mut &'scope ()>, } impl ScopedJoinHandle<'_, T> { pub fn join(self) -> std::thread::Result { Ok(self.result) } pub fn is_finished(&self) -> bool { true } } /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state. /// This is only good for operations where near-random access isn't detrimental, so it's not usually great /// for file-io as it won't make use of sorted inputs well. // TODO: better docs pub fn in_parallel_with_slice( input: &mut [I], _thread_limit: Option, mut new_thread_state: impl FnMut(usize) -> S + Clone, mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone, mut periodic: impl FnMut() -> Option, state_to_rval: impl FnOnce(S) -> R + Clone, ) -> Result, E> { let mut state = new_thread_state(0); let should_interrupt = &AtomicBool::default(); let threads_left = &AtomicIsize::default(); for item in input { consume(item, &mut state, threads_left, should_interrupt)?; if periodic().is_none() { break; } } Ok(vec![state_to_rval(state)]) } } #[cfg(not(feature = "parallel"))] pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle}; /// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`, /// whose task is to aggregate these outputs into the final result returned by this function. /// /// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume` /// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state. /// * For `reducer`, see the [`Reduce`] trait /// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature /// similar to the parallel version. /// /// **This serial version performing all calculations on the current thread.** pub fn in_parallel( input: impl Iterator, _thread_limit: Option, new_thread_state: impl Fn(usize) -> S, mut consume: impl FnMut(I, &mut S) -> O, mut reducer: R, ) -> Result<::Output, ::Error> where R: Reduce, { let mut state = new_thread_state(0); for item in input { drop(reducer.feed(consume(item, &mut state))?); } reducer.finalize() }