summaryrefslogtreecommitdiffstats
path: root/vendor/rayon/src/iter/par_bridge.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon/src/iter/par_bridge.rs')
-rw-r--r--vendor/rayon/src/iter/par_bridge.rs159
1 files changed, 52 insertions, 107 deletions
diff --git a/vendor/rayon/src/iter/par_bridge.rs b/vendor/rayon/src/iter/par_bridge.rs
index 339ac1a32..8398274b3 100644
--- a/vendor/rayon/src/iter/par_bridge.rs
+++ b/vendor/rayon/src/iter/par_bridge.rs
@@ -1,12 +1,9 @@
-use crossbeam_deque::{Steal, Stealer, Worker};
-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::{Mutex, TryLockError};
-use std::thread::yield_now;
+use std::sync::Mutex;
-use crate::current_num_threads;
use crate::iter::plumbing::{bridge_unindexed, Folder, UnindexedConsumer, UnindexedProducer};
use crate::iter::ParallelIterator;
+use crate::{current_num_threads, current_thread_index};
/// Conversion trait to convert an `Iterator` to a `ParallelIterator`.
///
@@ -78,71 +75,46 @@ where
where
C: UnindexedConsumer<Self::Item>,
{
- let split_count = AtomicUsize::new(current_num_threads());
- let worker = Worker::new_fifo();
- let stealer = worker.stealer();
- let done = AtomicBool::new(false);
- let iter = Mutex::new((self.iter, worker));
+ let num_threads = current_num_threads();
+ let threads_started: Vec<_> = (0..num_threads).map(|_| AtomicBool::new(false)).collect();
bridge_unindexed(
- IterParallelProducer {
- split_count: &split_count,
- done: &done,
- iter: &iter,
- items: stealer,
+ &IterParallelProducer {
+ split_count: AtomicUsize::new(num_threads),
+ iter: Mutex::new(self.iter.fuse()),
+ threads_started: &threads_started,
},
consumer,
)
}
}
-struct IterParallelProducer<'a, Iter: Iterator> {
- split_count: &'a AtomicUsize,
- done: &'a AtomicBool,
- iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
- items: Stealer<Iter::Item>,
-}
-
-// manual clone because T doesn't need to be Clone, but the derive assumes it should be
-impl<'a, Iter: Iterator + 'a> Clone for IterParallelProducer<'a, Iter> {
- fn clone(&self) -> Self {
- IterParallelProducer {
- split_count: self.split_count,
- done: self.done,
- iter: self.iter,
- items: self.items.clone(),
- }
- }
+struct IterParallelProducer<'a, Iter> {
+ split_count: AtomicUsize,
+ iter: Mutex<std::iter::Fuse<Iter>>,
+ threads_started: &'a [AtomicBool],
}
-impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<'a, Iter>
-where
- Iter::Item: Send,
-{
+impl<Iter: Iterator + Send> UnindexedProducer for &IterParallelProducer<'_, Iter> {
type Item = Iter::Item;
fn split(self) -> (Self, Option<Self>) {
let mut count = self.split_count.load(Ordering::SeqCst);
loop {
- // Check if the iterator is exhausted *and* we've consumed every item from it.
- let done = self.done.load(Ordering::SeqCst) && self.items.is_empty();
-
- match count.checked_sub(1) {
- Some(new_count) if !done => {
- match self.split_count.compare_exchange_weak(
- count,
- new_count,
- Ordering::SeqCst,
- Ordering::SeqCst,
- ) {
- Ok(_) => return (self.clone(), Some(self)),
- Err(last_count) => count = last_count,
- }
- }
- _ => {
- return (self, None);
+ // Check if the iterator is exhausted
+ if let Some(new_count) = count.checked_sub(1) {
+ match self.split_count.compare_exchange_weak(
+ count,
+ new_count,
+ Ordering::SeqCst,
+ Ordering::SeqCst,
+ ) {
+ Ok(_) => return (self, Some(self)),
+ Err(last_count) => count = last_count,
}
+ } else {
+ return (self, None);
}
}
}
@@ -151,66 +123,39 @@ where
where
F: Folder<Self::Item>,
{
+ // Guard against work-stealing-induced recursion, in case `Iter::next()`
+ // calls rayon internally, so we don't deadlock our mutex. We might also
+ // be recursing via `folder` methods, which doesn't present a mutex hazard,
+ // but it's lower overhead for us to just check this once, rather than
+ // updating additional shared state on every mutex lock/unlock.
+ // (If this isn't a rayon thread, then there's no work-stealing anyway...)
+ if let Some(i) = current_thread_index() {
+ // Note: If the number of threads in the pool ever grows dynamically, then
+ // we'll end up sharing flags and may falsely detect recursion -- that's
+ // still fine for overall correctness, just not optimal for parallelism.
+ let thread_started = &self.threads_started[i % self.threads_started.len()];
+ if thread_started.swap(true, Ordering::Relaxed) {
+ // We can't make progress with a nested mutex, so just return and let
+ // the outermost loop continue with the rest of the iterator items.
+ return folder;
+ }
+ }
+
loop {
- match self.items.steal() {
- Steal::Success(it) => {
+ if let Ok(mut iter) = self.iter.lock() {
+ if let Some(it) = iter.next() {
+ drop(iter);
folder = folder.consume(it);
if folder.full() {
return folder;
}
+ } else {
+ return folder;
}
- Steal::Empty => {
- // Don't storm the mutex if we're already done.
- if self.done.load(Ordering::SeqCst) {
- // Someone might have pushed more between our `steal()` and `done.load()`
- if self.items.is_empty() {
- // The iterator is out of items, no use in continuing
- return folder;
- }
- } else {
- // our cache is out of items, time to load more from the iterator
- match self.iter.try_lock() {
- Ok(mut guard) => {
- // Check `done` again in case we raced with the previous lock
- // holder on its way out.
- if self.done.load(Ordering::SeqCst) {
- if self.items.is_empty() {
- return folder;
- }
- continue;
- }
-
- let count = current_num_threads();
- let count = (count * count) * 2;
-
- let (ref mut iter, ref worker) = *guard;
-
- // while worker.len() < count {
- // FIXME the new deque doesn't let us count items. We can just
- // push a number of items, but that doesn't consider active
- // stealers elsewhere.
- for _ in 0..count {
- if let Some(it) = iter.next() {
- worker.push(it);
- } else {
- self.done.store(true, Ordering::SeqCst);
- break;
- }
- }
- }
- Err(TryLockError::WouldBlock) => {
- // someone else has the mutex, just sit tight until it's ready
- yield_now(); //TODO: use a thread-pool-aware yield? (#548)
- }
- Err(TryLockError::Poisoned(_)) => {
- // any panics from other threads will have been caught by the pool,
- // and will be re-thrown when joined - just exit
- return folder;
- }
- }
- }
- }
- Steal::Retry => (),
+ } else {
+ // any panics from other threads will have been caught by the pool,
+ // and will be re-thrown when joined - just exit
+ return folder;
}
}
}