summaryrefslogtreecommitdiffstats
path: root/library/std/src/thread/scoped.rs
diff options
context:
space:
mode:
Diffstat (limited to 'library/std/src/thread/scoped.rs')
-rw-r--r--library/std/src/thread/scoped.rs343
1 files changed, 343 insertions, 0 deletions
diff --git a/library/std/src/thread/scoped.rs b/library/std/src/thread/scoped.rs
new file mode 100644
index 000000000..e6dbf35bd
--- /dev/null
+++ b/library/std/src/thread/scoped.rs
@@ -0,0 +1,343 @@
+use super::{current, park, Builder, JoinInner, Result, Thread};
+use crate::fmt;
+use crate::io;
+use crate::marker::PhantomData;
+use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
+use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use crate::sync::Arc;
+
+/// A scope to spawn scoped threads in.
+///
+/// See [`scope`] for details.
+#[stable(feature = "scoped_threads", since = "1.63.0")]
+pub struct Scope<'scope, 'env: 'scope> {
+ data: Arc<ScopeData>,
+ /// Invariance over 'scope, to make sure 'scope cannot shrink,
+ /// which is necessary for soundness.
+ ///
+ /// Without invariance, this would compile fine but be unsound:
+ ///
+ /// ```compile_fail,E0373
+ /// std::thread::scope(|s| {
+ /// s.spawn(|| {
+ /// let a = String::from("abcd");
+ /// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
+ /// });
+ /// });
+ /// ```
+ scope: PhantomData<&'scope mut &'scope ()>,
+ env: PhantomData<&'env mut &'env ()>,
+}
+
+/// An owned permission to join on a scoped thread (block on its termination).
+///
+/// See [`Scope::spawn`] for details.
+#[stable(feature = "scoped_threads", since = "1.63.0")]
+pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
+
+pub(super) struct ScopeData {
+ num_running_threads: AtomicUsize,
+ a_thread_panicked: AtomicBool,
+ main_thread: Thread,
+}
+
+impl ScopeData {
+ pub(super) fn increment_num_running_threads(&self) {
+ // We check for 'overflow' with usize::MAX / 2, to make sure there's no
+ // chance it overflows to 0, which would result in unsoundness.
+ if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
+ // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles.
+ self.decrement_num_running_threads(false);
+ panic!("too many running threads in thread scope");
+ }
+ }
+ pub(super) fn decrement_num_running_threads(&self, panic: bool) {
+ if panic {
+ self.a_thread_panicked.store(true, Ordering::Relaxed);
+ }
+ if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 {
+ self.main_thread.unpark();
+ }
+ }
+}
+
+/// Create a scope for spawning scoped threads.
+///
+/// The function passed to `scope` will be provided a [`Scope`] object,
+/// through which scoped threads can be [spawned][`Scope::spawn`].
+///
+/// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
+/// as the scope guarantees all threads will be joined at the end of the scope.
+///
+/// All threads spawned within the scope that haven't been manually joined
+/// will be automatically joined before this function returns.
+///
+/// # Panics
+///
+/// If any of the automatically joined threads panicked, this function will panic.
+///
+/// If you want to handle panics from spawned threads,
+/// [`join`][ScopedJoinHandle::join] them before the end of the scope.
+///
+/// # Example
+///
+/// ```
+/// use std::thread;
+///
+/// let mut a = vec![1, 2, 3];
+/// let mut x = 0;
+///
+/// thread::scope(|s| {
+/// s.spawn(|| {
+/// println!("hello from the first scoped thread");
+/// // We can borrow `a` here.
+/// dbg!(&a);
+/// });
+/// s.spawn(|| {
+/// println!("hello from the second scoped thread");
+/// // We can even mutably borrow `x` here,
+/// // because no other threads are using it.
+/// x += a[0] + a[2];
+/// });
+/// println!("hello from the main thread");
+/// });
+///
+/// // After the scope, we can modify and access our variables again:
+/// a.push(4);
+/// assert_eq!(x, a.len());
+/// ```
+///
+/// # Lifetimes
+///
+/// Scoped threads involve two lifetimes: `'scope` and `'env`.
+///
+/// The `'scope` lifetime represents the lifetime of the scope itself.
+/// That is: the time during which new scoped threads may be spawned,
+/// and also the time during which they might still be running.
+/// Once this lifetime ends, all scoped threads are joined.
+/// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts.
+/// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns.
+///
+/// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads.
+/// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`.
+/// It can be as small as the call to `scope`, meaning that anything that outlives this call,
+/// such as local variables defined right before the scope, can be borrowed by the scoped threads.
+///
+/// The `'env: 'scope` bound is part of the definition of the `Scope` type.
+#[track_caller]
+#[stable(feature = "scoped_threads", since = "1.63.0")]
+pub fn scope<'env, F, T>(f: F) -> T
+where
+ F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
+{
+ // We put the `ScopeData` into an `Arc` so that other threads can finish their
+ // `decrement_num_running_threads` even after this function returns.
+ let scope = Scope {
+ data: Arc::new(ScopeData {
+ num_running_threads: AtomicUsize::new(0),
+ main_thread: current(),
+ a_thread_panicked: AtomicBool::new(false),
+ }),
+ env: PhantomData,
+ scope: PhantomData,
+ };
+
+ // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
+ let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
+
+ // Wait until all the threads are finished.
+ while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
+ park();
+ }
+
+ // Throw any panic from `f`, or the return value of `f` if no thread panicked.
+ match result {
+ Err(e) => resume_unwind(e),
+ Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
+ panic!("a scoped thread panicked")
+ }
+ Ok(result) => result,
+ }
+}
+
+impl<'scope, 'env> Scope<'scope, 'env> {
+ /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
+ ///
+ /// Unlike non-scoped threads, threads spawned with this function may
+ /// borrow non-`'static` data from the outside the scope. See [`scope`] for
+ /// details.
+ ///
+ /// The join handle provides a [`join`] method that can be used to join the spawned
+ /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
+ /// the panic payload.
+ ///
+ /// If the join handle is dropped, the spawned thread will implicitly joined at the
+ /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
+ /// panic after all threads are joined.
+ ///
+ /// This call will create a thread using default parameters of [`Builder`].
+ /// If you want to specify the stack size or the name of the thread, use
+ /// [`Builder::spawn_scoped`] instead.
+ ///
+ /// # Panics
+ ///
+ /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
+ /// to recover from such errors.
+ ///
+ /// [`join`]: ScopedJoinHandle::join
+ #[stable(feature = "scoped_threads", since = "1.63.0")]
+ pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ where
+ F: FnOnce() -> T + Send + 'scope,
+ T: Send + 'scope,
+ {
+ Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
+ }
+}
+
+impl Builder {
+ /// Spawns a new scoped thread using the settings set through this `Builder`.
+ ///
+ /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
+ /// capture any failure to create the thread at the OS level.
+ ///
+ /// [`io::Result`]: crate::io::Result
+ ///
+ /// # Panics
+ ///
+ /// Panics if a thread name was set and it contained null bytes.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// let mut a = vec![1, 2, 3];
+ /// let mut x = 0;
+ ///
+ /// thread::scope(|s| {
+ /// thread::Builder::new()
+ /// .name("first".to_string())
+ /// .spawn_scoped(s, ||
+ /// {
+ /// println!("hello from the {:?} scoped thread", thread::current().name());
+ /// // We can borrow `a` here.
+ /// dbg!(&a);
+ /// })
+ /// .unwrap();
+ /// thread::Builder::new()
+ /// .name("second".to_string())
+ /// .spawn_scoped(s, ||
+ /// {
+ /// println!("hello from the {:?} scoped thread", thread::current().name());
+ /// // We can even mutably borrow `x` here,
+ /// // because no other threads are using it.
+ /// x += a[0] + a[2];
+ /// })
+ /// .unwrap();
+ /// println!("hello from the main thread");
+ /// });
+ ///
+ /// // After the scope, we can modify and access our variables again:
+ /// a.push(4);
+ /// assert_eq!(x, a.len());
+ /// ```
+ #[stable(feature = "scoped_threads", since = "1.63.0")]
+ pub fn spawn_scoped<'scope, 'env, F, T>(
+ self,
+ scope: &'scope Scope<'scope, 'env>,
+ f: F,
+ ) -> io::Result<ScopedJoinHandle<'scope, T>>
+ where
+ F: FnOnce() -> T + Send + 'scope,
+ T: Send + 'scope,
+ {
+ Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?))
+ }
+}
+
+impl<'scope, T> ScopedJoinHandle<'scope, T> {
+ /// Extracts a handle to the underlying thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let t = s.spawn(|| {
+ /// println!("hello");
+ /// });
+ /// println!("thread id: {:?}", t.thread().id());
+ /// });
+ /// ```
+ #[must_use]
+ #[stable(feature = "scoped_threads", since = "1.63.0")]
+ pub fn thread(&self) -> &Thread {
+ &self.0.thread
+ }
+
+ /// Waits for the associated thread to finish.
+ ///
+ /// This function will return immediately if the associated thread has already finished.
+ ///
+ /// In terms of [atomic memory orderings], the completion of the associated
+ /// thread synchronizes with this function returning.
+ /// In other words, all operations performed by that thread
+ /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
+ /// all operations that happen after `join` returns.
+ ///
+ /// If the associated thread panics, [`Err`] is returned with the panic payload.
+ ///
+ /// [atomic memory orderings]: crate::sync::atomic
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let t = s.spawn(|| {
+ /// panic!("oh no");
+ /// });
+ /// assert!(t.join().is_err());
+ /// });
+ /// ```
+ #[stable(feature = "scoped_threads", since = "1.63.0")]
+ pub fn join(self) -> Result<T> {
+ self.0.join()
+ }
+
+ /// Checks if the associated thread has finished running its main function.
+ ///
+ /// `is_finished` supports implementing a non-blocking join operation, by checking
+ /// `is_finished`, and calling `join` if it returns `false`. This function does not block. To
+ /// block while waiting on the thread to finish, use [`join`][Self::join].
+ ///
+ /// This might return `true` for a brief moment after the thread's main
+ /// function has returned, but before the thread itself has stopped running.
+ /// However, once this returns `true`, [`join`][Self::join] can be expected
+ /// to return quickly, without blocking for any significant amount of time.
+ #[stable(feature = "scoped_threads", since = "1.63.0")]
+ pub fn is_finished(&self) -> bool {
+ Arc::strong_count(&self.0.packet) == 1
+ }
+}
+
+#[stable(feature = "scoped_threads", since = "1.63.0")]
+impl fmt::Debug for Scope<'_, '_> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Scope")
+ .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
+ .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
+ .field("main_thread", &self.data.main_thread)
+ .finish_non_exhaustive()
+ }
+}
+
+#[stable(feature = "scoped_threads", since = "1.63.0")]
+impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ScopedJoinHandle").finish_non_exhaustive()
+ }
+}