diff options
Diffstat (limited to 'vendor/rayon-core/src/scope/mod.rs')
-rw-r--r-- | vendor/rayon-core/src/scope/mod.rs | 72 |
1 files changed, 39 insertions, 33 deletions
diff --git a/vendor/rayon-core/src/scope/mod.rs b/vendor/rayon-core/src/scope/mod.rs index 25cda832e..f460dd79d 100644 --- a/vendor/rayon-core/src/scope/mod.rs +++ b/vendor/rayon-core/src/scope/mod.rs @@ -13,7 +13,7 @@ use crate::unwind; use std::any::Any; use std::fmt; use std::marker::PhantomData; -use std::mem; +use std::mem::ManuallyDrop; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; @@ -540,10 +540,10 @@ impl<'scope> Scope<'scope> { BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -562,12 +562,12 @@ impl<'scope> Scope<'scope> { BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -600,10 +600,10 @@ impl<'scope> ScopeFifo<'scope> { BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); - let job = HeapJob::new(move || { + let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; - scope.base.execute_job(move || body(scope)) + let scope = scope_ptr.as_ref(); + ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); @@ -615,7 +615,7 @@ impl<'scope> ScopeFifo<'scope> { // SAFETY: this job will execute before the scope ends. unsafe { worker.push(fifo.push(job_ref)) }; } - None => self.base.registry.inject(&[job_ref]), + None => self.base.registry.inject(job_ref), } } @@ -628,12 +628,12 @@ impl<'scope> ScopeFifo<'scope> { BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); - let job = ArcJob::new(move || { + let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. - let scope = unsafe { scope_ptr.as_ref() }; + let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); - scope.base.execute_job(func); + ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } @@ -688,7 +688,7 @@ impl<'scope> ScopeBase<'scope> { where FUNC: FnOnce() -> R, { - let result = self.execute_job_closure(func); + let result = unsafe { Self::execute_job_closure(self, func) }; self.job_completed_latch.wait(owner); self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated @@ -696,28 +696,28 @@ impl<'scope> ScopeBase<'scope> { /// Executes `func` as a job, either aborting or executing as /// appropriate. - fn execute_job<FUNC>(&self, func: FUNC) + unsafe fn execute_job<FUNC>(this: *const Self, func: FUNC) where FUNC: FnOnce(), { - let _: Option<()> = self.execute_job_closure(func); + let _: Option<()> = Self::execute_job_closure(this, func); } /// Executes `func` as a job in scope. Adjusts the "job completed" /// counters and also catches any panic and stores it into /// `scope`. - fn execute_job_closure<FUNC, R>(&self, func: FUNC) -> Option<R> + unsafe fn execute_job_closure<FUNC, R>(this: *const Self, func: FUNC) -> Option<R> where FUNC: FnOnce() -> R, { match unwind::halt_unwinding(func) { Ok(r) => { - self.job_completed_latch.set(); + Latch::set(&(*this).job_completed_latch); Some(r) } Err(err) => { - self.job_panicked(err); - self.job_completed_latch.set(); + (*this).job_panicked(err); + Latch::set(&(*this).job_completed_latch); None } } @@ -725,14 +725,20 @@ impl<'scope> ScopeBase<'scope> { fn job_panicked(&self, err: Box<dyn Any + Send + 'static>) { // capture the first error we see, free the rest - let nil = ptr::null_mut(); - let mut err = Box::new(err); // box up the fat ptr - if self - .panic - .compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed) - .is_ok() - { - mem::forget(err); // ownership now transferred into self.panic + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr + let err_ptr: *mut Box<dyn Any + Send + 'static> = &mut **err; + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // ownership now transferred into self.panic + } else { + // another panic raced in ahead of us, so drop ours + let _: Box<Box<_>> = ManuallyDrop::into_inner(err); + } } } @@ -791,14 +797,14 @@ impl ScopeLatch { } impl Latch for ScopeLatch { - fn set(&self) { - match self { + unsafe fn set(this: *const Self) { + match &*this { ScopeLatch::Stealing { latch, registry, worker_index, - } => latch.set_and_tickle_one(registry, *worker_index), - ScopeLatch::Blocking { latch } => latch.set(), + } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), + ScopeLatch::Blocking { latch } => Latch::set(latch), } } } |