use super::{current, park, Builder, JoinInner, Result, Thread}; use crate::fmt; use crate::io; use crate::marker::PhantomData; use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use crate::sync::Arc; /// A scope to spawn scoped threads in. /// /// See [`scope`] for details. #[stable(feature = "scoped_threads", since = "1.63.0")] pub struct Scope<'scope, 'env: 'scope> { data: Arc, /// Invariance over 'scope, to make sure 'scope cannot shrink, /// which is necessary for soundness. /// /// Without invariance, this would compile fine but be unsound: /// /// ```compile_fail,E0373 /// std::thread::scope(|s| { /// s.spawn(|| { /// let a = String::from("abcd"); /// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped /// }); /// }); /// ``` scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } /// An owned permission to join on a scoped thread (block on its termination). /// /// See [`Scope::spawn`] for details. #[stable(feature = "scoped_threads", since = "1.63.0")] pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>); pub(super) struct ScopeData { num_running_threads: AtomicUsize, a_thread_panicked: AtomicBool, main_thread: Thread, } impl ScopeData { pub(super) fn increment_num_running_threads(&self) { // We check for 'overflow' with usize::MAX / 2, to make sure there's no // chance it overflows to 0, which would result in unsoundness. if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 { // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles. self.decrement_num_running_threads(false); panic!("too many running threads in thread scope"); } } pub(super) fn decrement_num_running_threads(&self, panic: bool) { if panic { self.a_thread_panicked.store(true, Ordering::Relaxed); } if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { self.main_thread.unpark(); } } } /// Create a scope for spawning scoped threads. /// /// The function passed to `scope` will be provided a [`Scope`] object, /// through which scoped threads can be [spawned][`Scope::spawn`]. /// /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data, /// as the scope guarantees all threads will be joined at the end of the scope. /// /// All threads spawned within the scope that haven't been manually joined /// will be automatically joined before this function returns. /// /// # Panics /// /// If any of the automatically joined threads panicked, this function will panic. /// /// If you want to handle panics from spawned threads, /// [`join`][ScopedJoinHandle::join] them before the end of the scope. /// /// # Example /// /// ``` /// use std::thread; /// /// let mut a = vec![1, 2, 3]; /// let mut x = 0; /// /// thread::scope(|s| { /// s.spawn(|| { /// println!("hello from the first scoped thread"); /// // We can borrow `a` here. /// dbg!(&a); /// }); /// s.spawn(|| { /// println!("hello from the second scoped thread"); /// // We can even mutably borrow `x` here, /// // because no other threads are using it. /// x += a[0] + a[2]; /// }); /// println!("hello from the main thread"); /// }); /// /// // After the scope, we can modify and access our variables again: /// a.push(4); /// assert_eq!(x, a.len()); /// ``` /// /// # Lifetimes /// /// Scoped threads involve two lifetimes: `'scope` and `'env`. /// /// The `'scope` lifetime represents the lifetime of the scope itself. /// That is: the time during which new scoped threads may be spawned, /// and also the time during which they might still be running. /// Once this lifetime ends, all scoped threads are joined. /// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts. /// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns. /// /// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads. /// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`. /// It can be as small as the call to `scope`, meaning that anything that outlives this call, /// such as local variables defined right before the scope, can be borrowed by the scoped threads. /// /// The `'env: 'scope` bound is part of the definition of the `Scope` type. #[track_caller] #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn scope<'env, F, T>(f: F) -> T where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, { // We put the `ScopeData` into an `Arc` so that other threads can finish their // `decrement_num_running_threads` even after this function returns. let scope = Scope { data: Arc::new(ScopeData { num_running_threads: AtomicUsize::new(0), main_thread: current(), a_thread_panicked: AtomicBool::new(false), }), env: PhantomData, scope: PhantomData, }; // Run `f`, but catch panics so we can make sure to wait for all the threads to join. let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); // Wait until all the threads are finished. while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { park(); } // Throw any panic from `f`, or the return value of `f` if no thread panicked. match result { Err(e) => resume_unwind(e), Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => { panic!("a scoped thread panicked") } Ok(result) => result, } } impl<'scope, 'env> Scope<'scope, 'env> { /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it. /// /// Unlike non-scoped threads, threads spawned with this function may /// borrow non-`'static` data from the outside the scope. See [`scope`] for /// details. /// /// The join handle provides a [`join`] method that can be used to join the spawned /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing /// the panic payload. /// /// If the join handle is dropped, the spawned thread will implicitly joined at the /// end of the scope. In that case, if the spawned thread panics, [`scope`] will /// panic after all threads are joined. /// /// This call will create a thread using default parameters of [`Builder`]. /// If you want to specify the stack size or the name of the thread, use /// [`Builder::spawn_scoped`] instead. /// /// # Panics /// /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`] /// to recover from such errors. /// /// [`join`]: ScopedJoinHandle::join #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn spawn(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> where F: FnOnce() -> T + Send + 'scope, T: Send + 'scope, { Builder::new().spawn_scoped(self, f).expect("failed to spawn thread") } } impl Builder { /// Spawns a new scoped thread using the settings set through this `Builder`. /// /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to /// capture any failure to create the thread at the OS level. /// /// [`io::Result`]: crate::io::Result /// /// # Panics /// /// Panics if a thread name was set and it contained null bytes. /// /// # Example /// /// ``` /// use std::thread; /// /// let mut a = vec![1, 2, 3]; /// let mut x = 0; /// /// thread::scope(|s| { /// thread::Builder::new() /// .name("first".to_string()) /// .spawn_scoped(s, || /// { /// println!("hello from the {:?} scoped thread", thread::current().name()); /// // We can borrow `a` here. /// dbg!(&a); /// }) /// .unwrap(); /// thread::Builder::new() /// .name("second".to_string()) /// .spawn_scoped(s, || /// { /// println!("hello from the {:?} scoped thread", thread::current().name()); /// // We can even mutably borrow `x` here, /// // because no other threads are using it. /// x += a[0] + a[2]; /// }) /// .unwrap(); /// println!("hello from the main thread"); /// }); /// /// // After the scope, we can modify and access our variables again: /// a.push(4); /// assert_eq!(x, a.len()); /// ``` #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn spawn_scoped<'scope, 'env, F, T>( self, scope: &'scope Scope<'scope, 'env>, f: F, ) -> io::Result> where F: FnOnce() -> T + Send + 'scope, T: Send + 'scope, { Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?)) } } impl<'scope, T> ScopedJoinHandle<'scope, T> { /// Extracts a handle to the underlying thread. /// /// # Examples /// /// ``` /// use std::thread; /// /// thread::scope(|s| { /// let t = s.spawn(|| { /// println!("hello"); /// }); /// println!("thread id: {:?}", t.thread().id()); /// }); /// ``` #[must_use] #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn thread(&self) -> &Thread { &self.0.thread } /// Waits for the associated thread to finish. /// /// This function will return immediately if the associated thread has already finished. /// /// In terms of [atomic memory orderings], the completion of the associated /// thread synchronizes with this function returning. /// In other words, all operations performed by that thread /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses) /// all operations that happen after `join` returns. /// /// If the associated thread panics, [`Err`] is returned with the panic payload. /// /// [atomic memory orderings]: crate::sync::atomic /// /// # Examples /// /// ``` /// use std::thread; /// /// thread::scope(|s| { /// let t = s.spawn(|| { /// panic!("oh no"); /// }); /// assert!(t.join().is_err()); /// }); /// ``` #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn join(self) -> Result { self.0.join() } /// Checks if the associated thread has finished running its main function. /// /// `is_finished` supports implementing a non-blocking join operation, by checking /// `is_finished`, and calling `join` if it returns `false`. This function does not block. To /// block while waiting on the thread to finish, use [`join`][Self::join]. /// /// This might return `true` for a brief moment after the thread's main /// function has returned, but before the thread itself has stopped running. /// However, once this returns `true`, [`join`][Self::join] can be expected /// to return quickly, without blocking for any significant amount of time. #[stable(feature = "scoped_threads", since = "1.63.0")] pub fn is_finished(&self) -> bool { Arc::strong_count(&self.0.packet) == 1 } } #[stable(feature = "scoped_threads", since = "1.63.0")] impl fmt::Debug for Scope<'_, '_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Scope") .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed)) .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed)) .field("main_thread", &self.data.main_thread) .finish_non_exhaustive() } } #[stable(feature = "scoped_threads", since = "1.63.0")] impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ScopedJoinHandle").finish_non_exhaustive() } }