summaryrefslogtreecommitdiffstats
path: root/vendor/gix-features/src/parallel/serial.rs
blob: 7665d3ffa7a68e7da516b5e619056d050e68b380 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
use crate::parallel::Reduce;

#[cfg(not(feature = "parallel"))]
mod not_parallel {
    use std::sync::atomic::{AtomicBool, AtomicIsize};

    /// Runs `left` and then `right`, one after another, returning their output when both are done.
    pub fn join<O1, O2>(left: impl FnOnce() -> O1, right: impl FnOnce() -> O2) -> (O1, O2) {
        (left(), right())
    }

    /// A scope for spawning threads.
    pub struct Scope<'scope, 'env: 'scope> {
        _scope: std::marker::PhantomData<&'scope mut &'scope ()>,
        _env: std::marker::PhantomData<&'env mut &'env ()>,
    }

    pub struct ThreadBuilder;

    /// Create a builder for threads which allows them to be spawned into a scope and configured prior to spawning.
    pub fn build_thread() -> ThreadBuilder {
        ThreadBuilder
    }

    #[allow(unsafe_code)]
    unsafe impl Sync for Scope<'_, '_> {}

    impl ThreadBuilder {
        pub fn name(self, _new: String) -> Self {
            self
        }
        pub fn spawn_scoped<'scope, 'env, F, T>(
            &self,
            scope: &'scope Scope<'scope, 'env>,
            f: F,
        ) -> std::io::Result<ScopedJoinHandle<'scope, T>>
        where
            F: FnOnce() -> T + 'scope,
            T: 'scope,
        {
            Ok(scope.spawn(f))
        }
    }

    impl<'scope, 'env> Scope<'scope, 'env> {
        /// Provided with this scope, let `f` start new threads that live within it.
        pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
        where
            F: FnOnce() -> T + 'scope,
            T: 'scope,
        {
            ScopedJoinHandle {
                result: f(),
                _marker: Default::default(),
            }
        }
    }

    /// Runs `f` with a scope to be used for spawning threads that will not outlive the function call.
    /// Note that this implementation will run the spawned functions immediately.
    pub fn threads<'env, F, R>(f: F) -> R
    where
        F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> R,
    {
        f(&Scope {
            _scope: Default::default(),
            _env: Default::default(),
        })
    }

    /// A handle that can be used to join its scoped thread.
    ///
    /// This struct is created by the [`Scope::spawn`] method and the
    /// [`ScopedThreadBuilder::spawn`] method.
    pub struct ScopedJoinHandle<'scope, T> {
        /// Holds the result of the inner closure.
        result: T,
        _marker: std::marker::PhantomData<&'scope mut &'scope ()>,
    }

    impl<T> ScopedJoinHandle<'_, T> {
        pub fn join(self) -> std::thread::Result<T> {
            Ok(self.result)
        }
        pub fn is_finished(&self) -> bool {
            true
        }
    }

    /// An experiment to have fine-grained per-item parallelization with built-in aggregation via thread state.
    /// This is only good for operations where near-random access isn't detrimental, so it's not usually great
    /// for file-io as it won't make use of sorted inputs well.
    // TODO: better docs
    pub fn in_parallel_with_slice<I, S, R, E>(
        input: &mut [I],
        _thread_limit: Option<usize>,
        new_thread_state: impl FnOnce(usize) -> S + Clone,
        mut consume: impl FnMut(&mut I, &mut S, &AtomicIsize, &AtomicBool) -> Result<(), E> + Clone,
        mut periodic: impl FnMut() -> Option<std::time::Duration>,
        state_to_rval: impl FnOnce(S) -> R + Clone,
    ) -> Result<Vec<R>, E> {
        let mut state = new_thread_state(0);
        let should_interrupt = &AtomicBool::default();
        let threads_left = &AtomicIsize::default();
        for item in input {
            consume(item, &mut state, threads_left, should_interrupt)?;
            if periodic().is_none() {
                break;
            }
        }
        Ok(vec![state_to_rval(state)])
    }
}

#[cfg(not(feature = "parallel"))]
pub use not_parallel::{build_thread, in_parallel_with_slice, join, threads, Scope, ScopedJoinHandle};

/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
/// whose task is to aggregate these outputs into the final result returned by this function.
///
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
/// * For `reducer`, see the [`Reduce`] trait
/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
///   similar to the parallel version.
///
/// **This serial version performing all calculations on the current thread.**
pub fn in_parallel<I, S, O, R>(
    input: impl Iterator<Item = I>,
    _thread_limit: Option<usize>,
    new_thread_state: impl FnOnce(usize) -> S,
    mut consume: impl FnMut(I, &mut S) -> O,
    mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
    R: Reduce<Input = O>,
{
    let mut state = new_thread_state(0);
    for item in input {
        drop(reducer.feed(consume(item, &mut state))?);
    }
    reducer.finalize()
}

/// Read items from `input` and `consume` them in multiple threads,
/// whose output output is collected by a `reducer`. Its task is to
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
/// Caall `finalize` to finish the computation, once per thread, if there was no error sending results earlier.
///
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
///   created by `new_thread_state(…)`.
/// * `finalize(State) -> Output` is called to potentially process remaining work that was placed in `State`.
/// * For `reducer`, see the [`Reduce`] trait
#[cfg(not(feature = "parallel"))]
pub fn in_parallel_with_finalize<I, S, O, R>(
    input: impl Iterator<Item = I>,
    _thread_limit: Option<usize>,
    new_thread_state: impl FnOnce(usize) -> S,
    mut consume: impl FnMut(I, &mut S) -> O,
    finalize: impl FnOnce(S) -> O + Send + Clone,
    mut reducer: R,
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
where
    R: Reduce<Input = O>,
{
    let mut state = new_thread_state(0);
    for item in input {
        drop(reducer.feed(consume(item, &mut state))?);
    }
    reducer.feed(finalize(state))?;
    reducer.finalize()
}