//! Methods for custom fork-join scopes, created by the [`scope()`] //! and [`in_place_scope()`] functions. These are a more flexible alternative to [`join()`]. //! //! [`scope()`]: fn.scope.html //! [`in_place_scope()`]: fn.in_place_scope.html //! [`join()`]: ../join/join.fn.html use crate::broadcast::BroadcastContext; use crate::job::{ArcJob, HeapJob, JobFifo, JobRef}; use crate::latch::{CountLatch, CountLockLatch, Latch}; use crate::registry::{global_registry, in_worker, Registry, WorkerThread}; use crate::unwind; use std::any::Any; use std::fmt; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; #[cfg(test)] mod test; /// Represents a fork-join scope which can be used to spawn any number of tasks. /// See [`scope()`] for more information. /// ///[`scope()`]: fn.scope.html pub struct Scope<'scope> { base: ScopeBase<'scope>, } /// Represents a fork-join scope which can be used to spawn any number of tasks. /// Those spawned from the same thread are prioritized in relative FIFO order. /// See [`scope_fifo()`] for more information. /// ///[`scope_fifo()`]: fn.scope_fifo.html pub struct ScopeFifo<'scope> { base: ScopeBase<'scope>, fifos: Vec, } pub(super) enum ScopeLatch { /// A latch for scopes created on a rayon thread which will participate in work- /// stealing while it waits for completion. This thread is not necessarily part /// of the same registry as the scope itself! Stealing { latch: CountLatch, /// If a worker thread in registry A calls `in_place_scope` on a ThreadPool /// with registry B, when a job completes in a thread of registry B, we may /// need to call `latch.set_and_tickle_one()` to wake the thread in registry A. /// That means we need a reference to registry A (since at that point we will /// only have a reference to registry B), so we stash it here. registry: Arc, /// The index of the worker to wake in `registry` worker_index: usize, }, /// A latch for scopes created on a non-rayon thread which will block to wait. Blocking { latch: CountLockLatch }, } struct ScopeBase<'scope> { /// thread registry where `scope()` was executed or where `in_place_scope()` /// should spawn jobs. registry: Arc, /// if some job panicked, the error is stored here; it will be /// propagated to the one who created the scope panic: AtomicPtr>, /// latch to track job counts job_completed_latch: ScopeLatch, /// You can think of a scope as containing a list of closures to execute, /// all of which outlive `'scope`. They're not actually required to be /// `Sync`, but it's still safe to let the `Scope` implement `Sync` because /// the closures are only *moved* across threads to be executed. marker: PhantomData) + Send + Sync + 'scope>>, } /// Creates a "fork-join" scope `s` and invokes the closure with a /// reference to `s`. This closure can then spawn asynchronous tasks /// into `s`. Those tasks may run asynchronously with respect to the /// closure; they may themselves spawn additional tasks into `s`. When /// the closure returns, it will block until all tasks that have been /// spawned into `s` complete. /// /// `scope()` is a more flexible building block compared to `join()`, /// since a loop can be used to spawn any number of tasks without /// recursing. However, that flexibility comes at a performance price: /// tasks spawned using `scope()` must be allocated onto the heap, /// whereas `join()` can make exclusive use of the stack. **Prefer /// `join()` (or, even better, parallel iterators) where possible.** /// /// # Example /// /// The Rayon `join()` function launches two closures and waits for them /// to stop. One could implement `join()` using a scope like so, although /// it would be less efficient than the real implementation: /// /// ```rust /// # use rayon_core as rayon; /// pub fn join(oper_a: A, oper_b: B) -> (RA, RB) /// where A: FnOnce() -> RA + Send, /// B: FnOnce() -> RB + Send, /// RA: Send, /// RB: Send, /// { /// let mut result_a: Option = None; /// let mut result_b: Option = None; /// rayon::scope(|s| { /// s.spawn(|_| result_a = Some(oper_a())); /// s.spawn(|_| result_b = Some(oper_b())); /// }); /// (result_a.unwrap(), result_b.unwrap()) /// } /// ``` /// /// # A note on threading /// /// The closure given to `scope()` executes in the Rayon thread-pool, /// as do those given to `spawn()`. This means that you can't access /// thread-local variables (well, you can, but they may have /// unexpected values). /// /// # Task execution /// /// Task execution potentially starts as soon as `spawn()` is called. /// The task will end sometime before `scope()` returns. Note that the /// *closure* given to scope may return much earlier. In general /// the lifetime of a scope created like `scope(body)` goes something like this: /// /// - Scope begins when `scope(body)` is called /// - Scope body `body()` is invoked /// - Scope tasks may be spawned /// - Scope body returns /// - Scope tasks execute, possibly spawning more tasks /// - Once all tasks are done, scope ends and `scope()` returns /// /// To see how and when tasks are joined, consider this example: /// /// ```rust /// # use rayon_core as rayon; /// // point start /// rayon::scope(|s| { /// s.spawn(|s| { // task s.1 /// s.spawn(|s| { // task s.1.1 /// rayon::scope(|t| { /// t.spawn(|_| ()); // task t.1 /// t.spawn(|_| ()); // task t.2 /// }); /// }); /// }); /// s.spawn(|s| { // task s.2 /// }); /// // point mid /// }); /// // point end /// ``` /// /// The various tasks that are run will execute roughly like so: /// /// ```notrust /// | (start) /// | /// | (scope `s` created) /// +-----------------------------------------------+ (task s.2) /// +-------+ (task s.1) | /// | | | /// | +---+ (task s.1.1) | /// | | | | /// | | | (scope `t` created) | /// | | +----------------+ (task t.2) | /// | | +---+ (task t.1) | | /// | (mid) | | | | | /// : | + <-+------------+ (scope `t` ends) | /// : | | | /// |<------+---+-----------------------------------+ (scope `s` ends) /// | /// | (end) /// ``` /// /// The point here is that everything spawned into scope `s` will /// terminate (at latest) at the same point -- right before the /// original call to `rayon::scope` returns. This includes new /// subtasks created by other subtasks (e.g., task `s.1.1`). If a new /// scope is created (such as `t`), the things spawned into that scope /// will be joined before that scope returns, which in turn occurs /// before the creating task (task `s.1.1` in this case) finishes. /// /// There is no guaranteed order of execution for spawns in a scope, /// given that other threads may steal tasks at any time. However, they /// are generally prioritized in a LIFO order on the thread from which /// they were spawned. So in this example, absent any stealing, we can /// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other /// threads always steal from the other end of the deque, like FIFO /// order. The idea is that "recent" tasks are most likely to be fresh /// in the local CPU's cache, while other threads can steal older /// "stale" tasks. For an alternate approach, consider /// [`scope_fifo()`] instead. /// /// [`scope_fifo()`]: fn.scope_fifo.html /// /// # Accessing stack data /// /// In general, spawned tasks may access stack data in place that /// outlives the scope itself. Other data must be fully owned by the /// spawned task. /// /// ```rust /// # use rayon_core as rayon; /// let ok: Vec = vec![1, 2, 3]; /// rayon::scope(|s| { /// let bad: Vec = vec![4, 5, 6]; /// s.spawn(|_| { /// // We can access `ok` because outlives the scope `s`. /// println!("ok: {:?}", ok); /// /// // If we just try to use `bad` here, the closure will borrow `bad` /// // (because we are just printing it out, and that only requires a /// // borrow), which will result in a compilation error. Read on /// // for options. /// // println!("bad: {:?}", bad); /// }); /// }); /// ``` /// /// As the comments example above suggest, to reference `bad` we must /// take ownership of it. One way to do this is to detach the closure /// from the surrounding stack frame, using the `move` keyword. This /// will cause it to take ownership of *all* the variables it touches, /// in this case including both `ok` *and* `bad`: /// /// ```rust /// # use rayon_core as rayon; /// let ok: Vec = vec![1, 2, 3]; /// rayon::scope(|s| { /// let bad: Vec = vec![4, 5, 6]; /// s.spawn(move |_| { /// println!("ok: {:?}", ok); /// println!("bad: {:?}", bad); /// }); /// /// // That closure is fine, but now we can't use `ok` anywhere else, /// // since it is owned by the previous task: /// // s.spawn(|_| println!("ok: {:?}", ok)); /// }); /// ``` /// /// While this works, it could be a problem if we want to use `ok` elsewhere. /// There are two choices. We can keep the closure as a `move` closure, but /// instead of referencing the variable `ok`, we create a shadowed variable that /// is a borrow of `ok` and capture *that*: /// /// ```rust /// # use rayon_core as rayon; /// let ok: Vec = vec![1, 2, 3]; /// rayon::scope(|s| { /// let bad: Vec = vec![4, 5, 6]; /// let ok: &Vec = &ok; // shadow the original `ok` /// s.spawn(move |_| { /// println!("ok: {:?}", ok); // captures the shadowed version /// println!("bad: {:?}", bad); /// }); /// /// // Now we too can use the shadowed `ok`, since `&Vec` references /// // can be shared freely. Note that we need a `move` closure here though, /// // because otherwise we'd be trying to borrow the shadowed `ok`, /// // and that doesn't outlive `scope`. /// s.spawn(move |_| println!("ok: {:?}", ok)); /// }); /// ``` /// /// Another option is not to use the `move` keyword but instead to take ownership /// of individual variables: /// /// ```rust /// # use rayon_core as rayon; /// let ok: Vec = vec![1, 2, 3]; /// rayon::scope(|s| { /// let bad: Vec = vec![4, 5, 6]; /// s.spawn(|_| { /// // Transfer ownership of `bad` into a local variable (also named `bad`). /// // This will force the closure to take ownership of `bad` from the environment. /// let bad = bad; /// println!("ok: {:?}", ok); // `ok` is only borrowed. /// println!("bad: {:?}", bad); // refers to our local variable, above. /// }); /// /// s.spawn(|_| println!("ok: {:?}", ok)); // we too can borrow `ok` /// }); /// ``` /// /// # Panics /// /// If a panic occurs, either in the closure given to `scope()` or in /// any of the spawned jobs, that panic will be propagated and the /// call to `scope()` will panic. If multiple panics occurs, it is /// non-deterministic which of their panic values will propagate. /// Regardless, once a task is spawned using `scope.spawn()`, it will /// execute, even if the spawning task should later panic. `scope()` /// returns once all spawned jobs have completed, and any panics are /// propagated at that point. pub fn scope<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R + Send, R: Send, { in_worker(|owner_thread, _| { let scope = Scope::<'scope>::new(Some(owner_thread), None); scope.base.complete(Some(owner_thread), || op(&scope)) }) } /// Creates a "fork-join" scope `s` with FIFO order, and invokes the /// closure with a reference to `s`. This closure can then spawn /// asynchronous tasks into `s`. Those tasks may run asynchronously with /// respect to the closure; they may themselves spawn additional tasks /// into `s`. When the closure returns, it will block until all tasks /// that have been spawned into `s` complete. /// /// # Task execution /// /// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a /// difference in the order of execution. Consider a similar example: /// /// [`scope()`]: fn.scope.html /// /// ```rust /// # use rayon_core as rayon; /// // point start /// rayon::scope_fifo(|s| { /// s.spawn_fifo(|s| { // task s.1 /// s.spawn_fifo(|s| { // task s.1.1 /// rayon::scope_fifo(|t| { /// t.spawn_fifo(|_| ()); // task t.1 /// t.spawn_fifo(|_| ()); // task t.2 /// }); /// }); /// }); /// s.spawn_fifo(|s| { // task s.2 /// }); /// // point mid /// }); /// // point end /// ``` /// /// The various tasks that are run will execute roughly like so: /// /// ```notrust /// | (start) /// | /// | (FIFO scope `s` created) /// +--------------------+ (task s.1) /// +-------+ (task s.2) | /// | | +---+ (task s.1.1) /// | | | | /// | | | | (FIFO scope `t` created) /// | | | +----------------+ (task t.1) /// | | | +---+ (task t.2) | /// | (mid) | | | | | /// : | | + <-+------------+ (scope `t` ends) /// : | | | /// |<------+------------+---+ (scope `s` ends) /// | /// | (end) /// ``` /// /// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on /// the thread from which they were spawned, as opposed to `scope()`'s /// LIFO. So in this example, we can expect `s.1` to execute before /// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in /// FIFO order, as usual. Overall, this has roughly the same order as /// the now-deprecated [`breadth_first`] option, except the effect is /// isolated to a particular scope. If spawns are intermingled from any /// combination of `scope()` and `scope_fifo()`, or from different /// threads, their order is only specified with respect to spawns in the /// same scope and thread. /// /// For more details on this design, see Rayon [RFC #1]. /// /// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md /// /// # Panics /// /// If a panic occurs, either in the closure given to `scope_fifo()` or /// in any of the spawned jobs, that panic will be propagated and the /// call to `scope_fifo()` will panic. If multiple panics occurs, it is /// non-deterministic which of their panic values will propagate. /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it /// will execute, even if the spawning task should later panic. /// `scope_fifo()` returns once all spawned jobs have completed, and any /// panics are propagated at that point. pub fn scope_fifo<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R + Send, R: Send, { in_worker(|owner_thread, _| { let scope = ScopeFifo::<'scope>::new(Some(owner_thread), None); scope.base.complete(Some(owner_thread), || op(&scope)) }) } /// Creates a "fork-join" scope `s` and invokes the closure with a /// reference to `s`. This closure can then spawn asynchronous tasks /// into `s`. Those tasks may run asynchronously with respect to the /// closure; they may themselves spawn additional tasks into `s`. When /// the closure returns, it will block until all tasks that have been /// spawned into `s` complete. /// /// This is just like `scope()` except the closure runs on the same thread /// that calls `in_place_scope()`. Only work that it spawns runs in the /// thread pool. /// /// # Panics /// /// If a panic occurs, either in the closure given to `in_place_scope()` or in /// any of the spawned jobs, that panic will be propagated and the /// call to `in_place_scope()` will panic. If multiple panics occurs, it is /// non-deterministic which of their panic values will propagate. /// Regardless, once a task is spawned using `scope.spawn()`, it will /// execute, even if the spawning task should later panic. `in_place_scope()` /// returns once all spawned jobs have completed, and any panics are /// propagated at that point. pub fn in_place_scope<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R, { do_in_place_scope(None, op) } pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc>, op: OP) -> R where OP: FnOnce(&Scope<'scope>) -> R, { let thread = unsafe { WorkerThread::current().as_ref() }; let scope = Scope::<'scope>::new(thread, registry); scope.base.complete(thread, || op(&scope)) } /// Creates a "fork-join" scope `s` with FIFO order, and invokes the /// closure with a reference to `s`. This closure can then spawn /// asynchronous tasks into `s`. Those tasks may run asynchronously with /// respect to the closure; they may themselves spawn additional tasks /// into `s`. When the closure returns, it will block until all tasks /// that have been spawned into `s` complete. /// /// This is just like `scope_fifo()` except the closure runs on the same thread /// that calls `in_place_scope_fifo()`. Only work that it spawns runs in the /// thread pool. /// /// # Panics /// /// If a panic occurs, either in the closure given to `in_place_scope_fifo()` or in /// any of the spawned jobs, that panic will be propagated and the /// call to `in_place_scope_fifo()` will panic. If multiple panics occurs, it is /// non-deterministic which of their panic values will propagate. /// Regardless, once a task is spawned using `scope.spawn_fifo()`, it will /// execute, even if the spawning task should later panic. `in_place_scope_fifo()` /// returns once all spawned jobs have completed, and any panics are /// propagated at that point. pub fn in_place_scope_fifo<'scope, OP, R>(op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R, { do_in_place_scope_fifo(None, op) } pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc>, op: OP) -> R where OP: FnOnce(&ScopeFifo<'scope>) -> R, { let thread = unsafe { WorkerThread::current().as_ref() }; let scope = ScopeFifo::<'scope>::new(thread, registry); scope.base.complete(thread, || op(&scope)) } impl<'scope> Scope<'scope> { fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { let base = ScopeBase::new(owner, registry); Scope { base } } /// Spawns a job into the fork-join scope `self`. This job will /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its /// own reference to the scope `self` as argument. This can be /// used to inject new jobs into `self`. /// /// # Returns /// /// Nothing. The spawned closures cannot pass back values to the /// caller directly, though they can write to local variables on /// the stack (if those variables outlive the scope) or /// communicate through shared channels. /// /// (The intention is to eventually integrate with Rust futures to /// support spawns of functions that compute a value.) /// /// # Examples /// /// ```rust /// # use rayon_core as rayon; /// let mut value_a = None; /// let mut value_b = None; /// let mut value_c = None; /// rayon::scope(|s| { /// s.spawn(|s1| { /// // ^ this is the same scope as `s`; this handle `s1` /// // is intended for use by the spawned task, /// // since scope handles cannot cross thread boundaries. /// /// value_a = Some(22); /// /// // the scope `s` will not end until all these tasks are done /// s1.spawn(|_| { /// value_b = Some(44); /// }); /// }); /// /// s.spawn(|_| { /// value_c = Some(66); /// }); /// }); /// assert_eq!(value_a, Some(22)); /// assert_eq!(value_b, Some(44)); /// assert_eq!(value_c, Some(66)); /// ``` /// /// # See also /// /// The [`scope` function] has more extensive documentation about /// task spawning. /// /// [`scope` function]: fn.scope.html pub fn spawn(&self, body: BODY) where BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. let scope = scope_ptr.as_ref(); ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); // Since `Scope` implements `Sync`, we can't be sure that we're still in a // thread of this pool, so we can't just push to the local worker thread. // Also, this might be an in-place scope. self.base.registry.inject_or_push(job_ref); } /// Spawns a job into every thread of the fork-join scope `self`. This job will /// execute on each thread sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its own reference /// to the scope `self` as argument, as well as a `BroadcastContext`. pub fn spawn_broadcast(&self, body: BODY) where BODY: Fn(&Scope<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } } impl<'scope> ScopeFifo<'scope> { fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { let base = ScopeBase::new(owner, registry); let num_threads = base.registry.num_threads(); let fifos = (0..num_threads).map(|_| JobFifo::new()).collect(); ScopeFifo { base, fifos } } /// Spawns a job into the fork-join scope `self`. This job will /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its /// own reference to the scope `self` as argument. This can be /// used to inject new jobs into `self`. /// /// # See also /// /// This method is akin to [`Scope::spawn()`], but with a FIFO /// priority. The [`scope_fifo` function] has more details about /// this distinction. /// /// [`Scope::spawn()`]: struct.Scope.html#method.spawn /// [`scope_fifo` function]: fn.scope_fifo.html pub fn spawn_fifo(&self, body: BODY) where BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, { let scope_ptr = ScopePtr(self); let job = HeapJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. let scope = scope_ptr.as_ref(); ScopeBase::execute_job(&scope.base, move || body(scope)) }); let job_ref = self.base.heap_job_ref(job); // If we're in the pool, use our scope's private fifo for this thread to execute // in a locally-FIFO order. Otherwise, just use the pool's global injector. match self.base.registry.current_thread() { Some(worker) => { let fifo = &self.fifos[worker.index()]; // SAFETY: this job will execute before the scope ends. unsafe { worker.push(fifo.push(job_ref)) }; } None => self.base.registry.inject(job_ref), } } /// Spawns a job into every thread of the fork-join scope `self`. This job will /// execute on each thread sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its own reference /// to the scope `self` as argument, as well as a `BroadcastContext`. pub fn spawn_broadcast(&self, body: BODY) where BODY: Fn(&ScopeFifo<'scope>, BroadcastContext<'_>) + Send + Sync + 'scope, { let scope_ptr = ScopePtr(self); let job = ArcJob::new(move || unsafe { // SAFETY: this job will execute before the scope ends. let scope = scope_ptr.as_ref(); let body = &body; let func = move || BroadcastContext::with(move |ctx| body(scope, ctx)); ScopeBase::execute_job(&scope.base, func) }); self.base.inject_broadcast(job) } } impl<'scope> ScopeBase<'scope> { /// Creates the base of a new scope for the given registry fn new(owner: Option<&WorkerThread>, registry: Option<&Arc>) -> Self { let registry = registry.unwrap_or_else(|| match owner { Some(owner) => owner.registry(), None => global_registry(), }); ScopeBase { registry: Arc::clone(registry), panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: ScopeLatch::new(owner), marker: PhantomData, } } fn increment(&self) { self.job_completed_latch.increment(); } fn heap_job_ref(&self, job: Box>) -> JobRef where FUNC: FnOnce() + Send + 'scope, { unsafe { self.increment(); job.into_job_ref() } } fn inject_broadcast(&self, job: Arc>) where FUNC: Fn() + Send + Sync + 'scope, { let n_threads = self.registry.num_threads(); let job_refs = (0..n_threads).map(|_| unsafe { self.increment(); ArcJob::as_job_ref(&job) }); self.registry.inject_broadcast(job_refs); } /// Executes `func` as a job, either aborting or executing as /// appropriate. fn complete(&self, owner: Option<&WorkerThread>, func: FUNC) -> R where FUNC: FnOnce() -> R, { let result = unsafe { Self::execute_job_closure(self, func) }; self.job_completed_latch.wait(owner); self.maybe_propagate_panic(); result.unwrap() // only None if `op` panicked, and that would have been propagated } /// Executes `func` as a job, either aborting or executing as /// appropriate. unsafe fn execute_job(this: *const Self, func: FUNC) where FUNC: FnOnce(), { let _: Option<()> = Self::execute_job_closure(this, func); } /// Executes `func` as a job in scope. Adjusts the "job completed" /// counters and also catches any panic and stores it into /// `scope`. unsafe fn execute_job_closure(this: *const Self, func: FUNC) -> Option where FUNC: FnOnce() -> R, { match unwind::halt_unwinding(func) { Ok(r) => { Latch::set(&(*this).job_completed_latch); Some(r) } Err(err) => { (*this).job_panicked(err); Latch::set(&(*this).job_completed_latch); None } } } fn job_panicked(&self, err: Box) { // capture the first error we see, free the rest if self.panic.load(Ordering::Relaxed).is_null() { let nil = ptr::null_mut(); let mut err = ManuallyDrop::new(Box::new(err)); // box up the fat ptr let err_ptr: *mut Box = &mut **err; if self .panic .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) .is_ok() { // ownership now transferred into self.panic } else { // another panic raced in ahead of us, so drop ours let _: Box> = ManuallyDrop::into_inner(err); } } } fn maybe_propagate_panic(&self) { // propagate panic, if any occurred; at this point, all // outstanding jobs have completed, so we can use a relaxed // ordering: let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); if !panic.is_null() { let value = unsafe { Box::from_raw(panic) }; unwind::resume_unwinding(*value); } } } impl ScopeLatch { fn new(owner: Option<&WorkerThread>) -> Self { Self::with_count(1, owner) } pub(super) fn with_count(count: usize, owner: Option<&WorkerThread>) -> Self { match owner { Some(owner) => ScopeLatch::Stealing { latch: CountLatch::with_count(count), registry: Arc::clone(owner.registry()), worker_index: owner.index(), }, None => ScopeLatch::Blocking { latch: CountLockLatch::with_count(count), }, } } fn increment(&self) { match self { ScopeLatch::Stealing { latch, .. } => latch.increment(), ScopeLatch::Blocking { latch } => latch.increment(), } } pub(super) fn wait(&self, owner: Option<&WorkerThread>) { match self { ScopeLatch::Stealing { latch, registry, worker_index, } => unsafe { let owner = owner.expect("owner thread"); debug_assert_eq!(registry.id(), owner.registry().id()); debug_assert_eq!(*worker_index, owner.index()); owner.wait_until(latch); }, ScopeLatch::Blocking { latch } => latch.wait(), } } } impl Latch for ScopeLatch { unsafe fn set(this: *const Self) { match &*this { ScopeLatch::Stealing { latch, registry, worker_index, } => CountLatch::set_and_tickle_one(latch, registry, *worker_index), ScopeLatch::Blocking { latch } => Latch::set(latch), } } } impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Scope") .field("pool_id", &self.base.registry.id()) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() } } impl<'scope> fmt::Debug for ScopeFifo<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("ScopeFifo") .field("num_fifos", &self.fifos.len()) .field("pool_id", &self.base.registry.id()) .field("panic", &self.base.panic) .field("job_completed_latch", &self.base.job_completed_latch) .finish() } } impl fmt::Debug for ScopeLatch { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ScopeLatch::Stealing { latch, .. } => fmt .debug_tuple("ScopeLatch::Stealing") .field(latch) .finish(), ScopeLatch::Blocking { latch } => fmt .debug_tuple("ScopeLatch::Blocking") .field(latch) .finish(), } } } /// Used to capture a scope `&Self` pointer in jobs, without faking a lifetime. /// /// Unsafe code is still required to dereference the pointer, but that's fine in /// scope jobs that are guaranteed to execute before the scope ends. struct ScopePtr(*const T); // SAFETY: !Send for raw pointers is not for safety, just as a lint unsafe impl Send for ScopePtr {} // SAFETY: !Sync for raw pointers is not for safety, just as a lint unsafe impl Sync for ScopePtr {} impl ScopePtr { // Helper to avoid disjoint captures of `scope_ptr.0` unsafe fn as_ref(&self) -> &T { &*self.0 } }