summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src/scope/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/rayon-core/src/scope/mod.rs')
-rw-r--r--vendor/rayon-core/src/scope/mod.rs72
1 files changed, 39 insertions, 33 deletions
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs
index 25cda832e..f460dd79d 100644
--- a/vendor/rayon-core/src/scope/mod.rs
+++ b/vendor/rayon-core/src/scope/mod.rs
@@ -13,7 +13,7 @@ use crate::unwind;
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
-use std::mem;
+use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
@@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> {
BODY: FnOnce(&Scope<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> {
BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> {
BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = HeapJob::new(move || {
+ let job = HeapJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
- scope.base.execute_job(move || body(scope))
+ let scope = scope_ptr.as_ref();
+ ScopeBase::execute_job(&scope.base, move || body(scope))
});
let job_ref = self.base.heap_job_ref(job);
@@ -615,7 +615,7 @@ impl<'scope> ScopeFifo<'scope> {
// SAFETY: this job will execute before the scope ends.
unsafe { worker.push(fifo.push(job_ref)) };
}
- None => self.base.registry.inject(&[job_ref]),
+ None => self.base.registry.inject(job_ref),
}
}
@@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> {
BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope,
{
let scope_ptr = ScopePtr(self);
- let job = ArcJob::new(move || {
+ let job = ArcJob::new(move || unsafe {
// SAFETY: this job will execute before the scope ends.
- let scope = unsafe { scope_ptr.as_ref() };
+ let scope = scope_ptr.as_ref();
let body = &body;
let func = move || BroadcastContext::with(move |ctx| body(scope, ctx));
- scope.base.execute_job(func);
+ ScopeBase::execute_job(&scope.base, func)
});
self.base.inject_broadcast(job)
}
@@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> {
where
FUNC: FnOnce() -> R,
{
- let result = self.execute_job_closure(func);
+ let result = unsafe { Self::execute_job_closure(self, func) };
self.job_completed_latch.wait(owner);
self.maybe_propagate_panic();
result.unwrap() // only None if `op` panicked, and that would have been propagated
@@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> {
/// Executes `func` as a job, either aborting or executing as
/// appropriate.
- fn execute_job<FUNC>(&self, func: FUNC)
+ unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC)
where
FUNC: FnOnce(),
{
- let _: Option<()> = self.execute_job_closure(func);
+ let _: Option<()> = Self::execute_job_closure(this, func);
}
/// Executes `func` as a job in scope. Adjusts the "job completed"
/// counters and also catches any panic and stores it into
/// `scope`.
- fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R>
+ unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R>
where
FUNC: FnOnce() -> R,
{
match unwind::halt_unwinding(func) {
Ok(r) => {
- self.job_completed_latch.set();
+ Latch::set(&(*this).job_completed_latch);
Some(r)
}
Err(err) => {
- self.job_panicked(err);
- self.job_completed_latch.set();
+ (*this).job_panicked(err);
+ Latch::set(&(*this).job_completed_latch);
None
}
}
@@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> {
fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) {
// capture the first error we see, free the rest
- let nil = ptr::null_mut();
- let mut err = Box::new(err); // box up the fat ptr
- if self
- .panic
- .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed)
- .is_ok()
- {
- mem::forget(err); // ownership now transferred into self.panic
+ if self.panic.load(Ordering::Relaxed).is_null() {
+ let nil = ptr::null_mut();
+ let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr
+ let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err;
+ if self
+ .panic
+ .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ // ownership now transferred into self.panic
+ } else {
+ // another panic raced in ahead of us, so drop ours
+ let _: Box<Box<_>> = ManuallyDrop::into_inner(err);
+ }
}
}
@@ -791,14 +797,14 @@ impl ScopeLatch {
}
impl Latch for ScopeLatch {
- fn set(&self) {
- match self {
+ unsafe fn set(this: *const Self) {
+ match &*this {
ScopeLatch::Stealing {
latch,
registry,
worker_index,
- } => latch.set_and_tickle_one(registry, *worker_index),
- ScopeLatch::Blocking { latch } => latch.set(),
+ } => CountLatch::set_and_tickle_one(latch, registry, *worker_index),
+ ScopeLatch::Blocking { latch } => Latch::set(latch),
}
}
}