diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:19:41 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:19:41 +0000 |
commit | 4f9fe856a25ab29345b90e7725509e9ee38a37be (patch) | |
tree | e4ffd8a9374cae7b21f7cbfb352927e0e074aff6 /vendor/rayon/src | |
parent | Adding upstream version 1.68.2+dfsg1. (diff) | |
download | rustc-upstream/1.69.0+dfsg1.tar.xz rustc-upstream/1.69.0+dfsg1.zip |
Adding upstream version 1.69.0+dfsg1.upstream/1.69.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon/src')
-rw-r--r-- | vendor/rayon/src/iter/par_bridge.rs | 159 |
1 files changed, 52 insertions, 107 deletions
diff --git a/vendor/rayon/src/iter/par_bridge.rs b/vendor/rayon/src/iter/par_bridge.rs index 339ac1a32..8398274b3 100644 --- a/vendor/rayon/src/iter/par_bridge.rs +++ b/vendor/rayon/src/iter/par_bridge.rs @@ -1,12 +1,9 @@ -use crossbeam_deque::{Steal, Stealer, Worker}; - use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Mutex, TryLockError}; -use std::thread::yield_now; +use std::sync::Mutex; -use crate::current_num_threads; use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer}; use crate::iter::ParallelIterator; +use crate::{current_num_threads, current_thread_index}; /// Conversion trait to convert an `Iterator` to a `ParallelIterator`. /// @@ -78,71 +75,46 @@ where where C: UnindexedConsumer<Self::Item>, { - let split_count = AtomicUsize::new(current_num_threads()); - let worker = Worker::new_fifo(); - let stealer = worker.stealer(); - let done = AtomicBool::new(false); - let iter = Mutex::new((self.iter, worker)); + let num_threads = current_num_threads(); + let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect(); bridge_unindexed( - IterParallelProducer { - split_count: &split_count, - done: &done, - iter: &iter, - items: stealer, + &IterParallelProducer { + split_count: AtomicUsize::new(num_threads), + iter: Mutex::new(self.iter.fuse()), + threads_started: &threads_started, }, consumer, ) } } -struct IterParallelProducer<'a, Iter: Iterator> { - split_count: &'a AtomicUsize, - done: &'a AtomicBool, - iter: &'a Mutex<(Iter, Worker<Iter::Item>)>, - items: Stealer<Iter::Item>, -} - -// manual clone because T doesn't need to be Clone, but the derive assumes it should be -impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> { - fn clone(&self) -> Self { - IterParallelProducer { - split_count: self.split_count, - done: self.done, - iter: self.iter, - items: self.items.clone(), - } - } +struct IterParallelProducer<'a, Iter> { + split_count: AtomicUsize, + iter: Mutex<std::iter::Fuse<Iter>>, + threads_started: &'a [AtomicBool], } -impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter> -where - Iter::Item: Send, -{ +impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> { type Item = Iter::Item; fn split(self) -> (Self, Option<Self>) { let mut count = self.split_count.load(Ordering::SeqCst); loop { - // Check if the iterator is exhausted *and* we've consumed every item from it. - let done = self.done.load(Ordering::SeqCst) && self.items.is_empty(); - - match count.checked_sub(1) { - Some(new_count) if !done => { - match self.split_count.compare_exchange_weak( - count, - new_count, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => return (self.clone(), Some(self)), - Err(last_count) => count = last_count, - } - } - _ => { - return (self, None); + // Check if the iterator is exhausted + if let Some(new_count) = count.checked_sub(1) { + match self.split_count.compare_exchange_weak( + count, + new_count, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => return (self, Some(self)), + Err(last_count) => count = last_count, } + } else { + return (self, None); } } } @@ -151,66 +123,39 @@ where where F: Folder<Self::Item>, { + // Guard against work-stealing-induced recursion, in case `Iter::next()` + // calls rayon internally, so we don't deadlock our mutex. We might also + // be recursing via `folder` methods, which doesn't present a mutex hazard, + // but it's lower overhead for us to just check this once, rather than + // updating additional shared state on every mutex lock/unlock. + // (If this isn't a rayon thread, then there's no work-stealing anyway...) + if let Some(i) = current_thread_index() { + // Note: If the number of threads in the pool ever grows dynamically, then + // we'll end up sharing flags and may falsely detect recursion -- that's + // still fine for overall correctness, just not optimal for parallelism. + let thread_started = &self.threads_started[i % self.threads_started.len()]; + if thread_started.swap(true, Ordering::Relaxed) { + // We can't make progress with a nested mutex, so just return and let + // the outermost loop continue with the rest of the iterator items. + return folder; + } + } + loop { - match self.items.steal() { - Steal::Success(it) => { + if let Ok(mut iter) = self.iter.lock() { + if let Some(it) = iter.next() { + drop(iter); folder = folder.consume(it); if folder.full() { return folder; } + } else { + return folder; } - Steal::Empty => { - // Don't storm the mutex if we're already done. - if self.done.load(Ordering::SeqCst) { - // Someone might have pushed more between our `steal()` and `done.load()` - if self.items.is_empty() { - // The iterator is out of items, no use in continuing - return folder; - } - } else { - // our cache is out of items, time to load more from the iterator - match self.iter.try_lock() { - Ok(mut guard) => { - // Check `done` again in case we raced with the previous lock - // holder on its way out. - if self.done.load(Ordering::SeqCst) { - if self.items.is_empty() { - return folder; - } - continue; - } - - let count = current_num_threads(); - let count = (count * count) * 2; - - let (ref mut iter, ref worker) = *guard; - - // while worker.len() < count { - // FIXME the new deque doesn't let us count items. We can just - // push a number of items, but that doesn't consider active - // stealers elsewhere. - for _ in 0..count { - if let Some(it) = iter.next() { - worker.push(it); - } else { - self.done.store(true, Ordering::SeqCst); - break; - } - } - } - Err(TryLockError::WouldBlock) => { - // someone else has the mutex, just sit tight until it's ready - yield_now(); //TODO: use a thread-pool-aware yield? (#548) - } - Err(TryLockError::Poisoned(_)) => { - // any panics from other threads will have been caught by the pool, - // and will be re-thrown when joined - just exit - return folder; - } - } - } - } - Steal::Retry => (), + } else { + // any panics from other threads will have been caught by the pool, + // and will be re-thrown when joined - just exit + return folder; } } } |