summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/serial.rs
blob: 00723b2c3c4db72eb3bc5df48b731b44f5d16622 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use crate::parallel::Reduce;

#[cfg(not(feature = "parallel"))]
mod not_parallel {
    /// Runs `left` and then `right`, one after another, returning their output when both are done.
    pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
        (left(), right())
    }

    /// A scope for spawning threads.
    pub struct Scope<'env> {
        _marker: std::marker::PhantomData<&'env mut &'env ()>,
    }

    pub struct ThreadBuilder;

    /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
    pub fn build_thread() -> ThreadBuilder {
        ThreadBuilder
    }

    #[allow(unsafe_code)]
    unsafe impl Sync for Scope<'_> {}

    impl ThreadBuilder {
        pub fn name(self, _new: String) -> Self {
            self
        }
        pub fn spawn_scoped<'a, 'env, F, T>(
            &self,
            scope: &'a Scope<'env>,
            f: F,
        ) -> std::io::Result<ScopedJoinHandle<'a, T>>
        where
            F: FnOnce() -> T,
            F: Send + 'env,
            T: Send + 'env,
        {
            Ok(scope.spawn(f))
        }
    }

    impl<'env> Scope<'env> {
        pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
        where
            F: FnOnce() -> T,
            F: Send + 'env,
            T: Send + 'env,
        {
            ScopedJoinHandle {
                result: f(),
                _marker: Default::default(),
            }
        }
    }

    /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
    /// Note that this implementation will run the spawned functions immediately.
    pub fn threads<'env, F, R>(f: F) -> R
    where
        F: FnOnce(&Scope<'env>) -> R,
    {
        f(&Scope {
            _marker: Default::default(),
        })
    }

    /// A handle that can be used to join its scoped thread.
    ///
    /// This struct is created by the [`Scope::spawn`] method and the
    /// [`ScopedThreadBuilder::spawn`] method.
    pub struct ScopedJoinHandle<'scope, T> {
        /// Holds the result of the inner closure.
        result: T,
        _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
    }

    impl<T> ScopedJoinHandle<'_, T> {
        pub fn join(self) -> std::thread::Result<T> {
            Ok(self.result)
        }
    }

    /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
    /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
    /// for file-io as it won't make use of sorted inputs well.
    // TODO: better docs
    pub fn in_parallel_with_slice<I, S, R, E>(
        input: &mut [I],
        _thread_limit: Option<usize>,
        mut new_thread_state: impl FnMut(usize) -> S + Clone,
        mut consume: impl FnMut(&mut I, &mut S) -> Result<(), E> + Clone,
        mut periodic: impl FnMut() -> Option<std::time::Duration>,
        state_to_rval: impl FnOnce(S) -> R + Clone,
    ) -> Result<Vec<R>, E> {
        let mut state = new_thread_state(0);
        for item in input {
            consume(item, &mut state)?;
            if periodic().is_none() {
                break;
            }
        }
        Ok(vec![state_to_rval(state)])
    }
}

#[cfg(not(feature = "parallel"))]
pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};

/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
/// whose task is to aggregate these outputs into the final result returned by this function.
///
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
/// * For `reducer`, see the [`Reduce`] trait
/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
///   similar to the parallel version.
///
/// **This serial version performing all calculations on the current thread.**
pub fn in_parallel<I, S, O, R>(
    input: impl Iterator<Item = I>,
    _thread_limit: Option<usize>,
    new_thread_state: impl Fn(usize) -> S,
    consume: impl Fn(I, &mut S) -> O,
    mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
    R: Reduce<Input = O>,
{
    let mut state = new_thread_state(0);
    for item in input {
        drop(reducer.feed(consume(item, &mut state))?);
    }
    reducer.finalize()
}