summaryrefslogtreecommitdiffstats
path: root/third_party/rust/crossbeam-utils/src/thread.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/thread.rs')
-rw-r--r--third_party/rust/crossbeam-utils/src/thread.rs587
1 files changed, 587 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/thread.rs b/third_party/rust/crossbeam-utils/src/thread.rs
new file mode 100644
index 0000000000..f1086d9ec3
--- /dev/null
+++ b/third_party/rust/crossbeam-utils/src/thread.rs
@@ -0,0 +1,587 @@
+//! Threads that can borrow variables from the stack.
+//!
+//! Create a scope when spawned threads need to access variables on the stack:
+//!
+//! ```
+//! use crossbeam_utils::thread;
+//!
+//! let people = vec![
+//! "Alice".to_string(),
+//! "Bob".to_string(),
+//! "Carol".to_string(),
+//! ];
+//!
+//! thread::scope(|s| {
+//! for person in &people {
+//! s.spawn(move |_| {
+//! println!("Hello, {}!", person);
+//! });
+//! }
+//! }).unwrap();
+//! ```
+//!
+//! # Why scoped threads?
+//!
+//! Suppose we wanted to re-write the previous example using plain threads:
+//!
+//! ```compile_fail,E0597
+//! use std::thread;
+//!
+//! let people = vec![
+//! "Alice".to_string(),
+//! "Bob".to_string(),
+//! "Carol".to_string(),
+//! ];
+//!
+//! let mut threads = Vec::new();
+//!
+//! for person in &people {
+//! threads.push(thread::spawn(move || {
+//! println!("Hello, {}!", person);
+//! }));
+//! }
+//!
+//! for thread in threads {
+//! thread.join().unwrap();
+//! }
+//! ```
+//!
+//! This doesn't work because the borrow checker complains about `people` not living long enough:
+//!
+//! ```text
+//! error[E0597]: `people` does not live long enough
+//! --> src/main.rs:12:20
+//! |
+//! 12 | for person in &people {
+//! | ^^^^^^ borrowed value does not live long enough
+//! ...
+//! 21 | }
+//! | - borrowed value only lives until here
+//! |
+//! = note: borrowed value must be valid for the static lifetime...
+//! ```
+//!
+//! The problem here is that spawned threads are not allowed to borrow variables on stack because
+//! the compiler cannot prove they will be joined before `people` is destroyed.
+//!
+//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
+//! before the scope ends.
+//!
+//! # How scoped threads work
+//!
+//! If a variable is borrowed by a thread, the thread must complete before the variable is
+//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
+//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
+//!
+//! A scope creates a clear boundary between variables outside the scope and threads inside the
+//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
+//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
+//! can safely access variables outside it.
+//!
+//! # Nesting scoped threads
+//!
+//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
+//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
+//! cannot be borrowed by scoped threads:
+//!
+//! ```compile_fail,E0373,E0521
+//! use crossbeam_utils::thread;
+//!
+//! thread::scope(|s| {
+//! s.spawn(|_| {
+//! // Not going to compile because we're trying to borrow `s`,
+//! // which lives *inside* the scope! :(
+//! s.spawn(|_| println!("nested thread"));
+//! });
+//! });
+//! ```
+//!
+//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
+//! argument, which can be used for spawning nested threads:
+//!
+//! ```
+//! use crossbeam_utils::thread;
+//!
+//! thread::scope(|s| {
+//! // Note the `|s|` here.
+//! s.spawn(|s| {
+//! // Yay, this works because we're using a fresh argument `s`! :)
+//! s.spawn(|_| println!("nested thread"));
+//! });
+//! }).unwrap();
+//! ```
+
+use std::fmt;
+use std::io;
+use std::marker::PhantomData;
+use std::mem;
+use std::panic;
+use std::sync::{Arc, Mutex};
+use std::thread;
+
+use crate::sync::WaitGroup;
+use cfg_if::cfg_if;
+
+type SharedVec<T> = Arc<Mutex<Vec<T>>>;
+type SharedOption<T> = Arc<Mutex<Option<T>>>;
+
+/// Creates a new scope for spawning threads.
+///
+/// All child threads that haven't been manually joined will be automatically joined just before
+/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
+/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
+/// returned containing errors from panicked threads. Note that if panics are implemented by
+/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
+///
+/// # Examples
+///
+/// ```
+/// use crossbeam_utils::thread;
+///
+/// let var = vec![1, 2, 3];
+///
+/// thread::scope(|s| {
+/// s.spawn(|_| {
+/// println!("A child thread borrowing `var`: {:?}", var);
+/// });
+/// }).unwrap();
+/// ```
+pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
+where
+ F: FnOnce(&Scope<'env>) -> R,
+{
+ let wg = WaitGroup::new();
+ let scope = Scope::<'env> {
+ handles: SharedVec::default(),
+ wait_group: wg.clone(),
+ _marker: PhantomData,
+ };
+
+ // Execute the scoped function, but catch any panics.
+ let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
+
+ // Wait until all nested scopes are dropped.
+ drop(scope.wait_group);
+ wg.wait();
+
+ // Join all remaining spawned threads.
+ let panics: Vec<_> = scope
+ .handles
+ .lock()
+ .unwrap()
+ // Filter handles that haven't been joined, join them, and collect errors.
+ .drain(..)
+ .filter_map(|handle| handle.lock().unwrap().take())
+ .filter_map(|handle| handle.join().err())
+ .collect();
+
+ // If `f` has panicked, resume unwinding.
+ // If any of the child threads have panicked, return the panic errors.
+ // Otherwise, everything is OK and return the result of `f`.
+ match result {
+ Err(err) => panic::resume_unwind(err),
+ Ok(res) => {
+ if panics.is_empty() {
+ Ok(res)
+ } else {
+ Err(Box::new(panics))
+ }
+ }
+ }
+}
+
+/// A scope for spawning threads.
+pub struct Scope<'env> {
+ /// The list of the thread join handles.
+ handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
+
+ /// Used to wait until all subscopes all dropped.
+ wait_group: WaitGroup,
+
+ /// Borrows data with invariant lifetime `'env`.
+ _marker: PhantomData<&'env mut &'env ()>,
+}
+
+unsafe impl Sync for Scope<'_> {}
+
+impl<'env> Scope<'env> {
+ /// Spawns a scoped thread.
+ ///
+ /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
+ /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
+ /// allowing it to reference variables outside the scope.
+ ///
+ /// The scoped thread is passed a reference to this scope as an argument, which can be used for
+ /// spawning nested threads.
+ ///
+ /// The returned [handle](ScopedJoinHandle) can be used to manually
+ /// [join](ScopedJoinHandle::join) the thread before the scope exits.
+ ///
+ /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
+ /// stack size or the name of the thread, use this API instead.
+ ///
+ /// [`spawn`]: std::thread::spawn
+ ///
+ /// # Panics
+ ///
+ /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
+ /// to recover from such errors.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let handle = s.spawn(|_| {
+ /// println!("A child thread is running");
+ /// 42
+ /// });
+ ///
+ /// // Join the thread and retrieve its result.
+ /// let res = handle.join().unwrap();
+ /// assert_eq!(res, 42);
+ /// }).unwrap();
+ /// ```
+ pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
+ where
+ F: FnOnce(&Scope<'env>) -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ self.builder()
+ .spawn(f)
+ .expect("failed to spawn scoped thread")
+ }
+
+ /// Creates a builder that can configure a thread before spawning.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// s.builder()
+ /// .spawn(|_| println!("A child thread is running"))
+ /// .unwrap();
+ /// }).unwrap();
+ /// ```
+ pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
+ ScopedThreadBuilder {
+ scope: self,
+ builder: thread::Builder::new(),
+ }
+ }
+}
+
+impl fmt::Debug for Scope<'_> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Scope { .. }")
+ }
+}
+
+/// Configures the properties of a new thread.
+///
+/// The two configurable properties are:
+///
+/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
+/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
+///
+/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
+/// thread handle with the given configuration.
+///
+/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
+/// value. You may want to use this builder when you want to recover from a failure to launch a
+/// thread.
+///
+/// # Examples
+///
+/// ```
+/// use crossbeam_utils::thread;
+///
+/// thread::scope(|s| {
+/// s.builder()
+/// .spawn(|_| println!("Running a child thread"))
+/// .unwrap();
+/// }).unwrap();
+/// ```
+///
+/// [`name`]: ScopedThreadBuilder::name
+/// [`stack_size`]: ScopedThreadBuilder::stack_size
+/// [`spawn`]: ScopedThreadBuilder::spawn
+/// [`io::Result`]: std::io::Result
+/// [naming-threads]: std::thread#naming-threads
+/// [stack-size]: std::thread#stack-size
+#[derive(Debug)]
+pub struct ScopedThreadBuilder<'scope, 'env> {
+ scope: &'scope Scope<'env>,
+ builder: thread::Builder,
+}
+
+impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
+ /// Sets the name for the new thread.
+ ///
+ /// The name must not contain null bytes (`\0`).
+ ///
+ /// For more information about named threads, see [here][naming-threads].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ /// use std::thread::current;
+ ///
+ /// thread::scope(|s| {
+ /// s.builder()
+ /// .name("my thread".to_string())
+ /// .spawn(|_| assert_eq!(current().name(), Some("my thread")))
+ /// .unwrap();
+ /// }).unwrap();
+ /// ```
+ ///
+ /// [naming-threads]: std::thread#naming-threads
+ pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
+ self.builder = self.builder.name(name);
+ self
+ }
+
+ /// Sets the size of the stack for the new thread.
+ ///
+ /// The stack size is measured in bytes.
+ ///
+ /// For more information about the stack size for threads, see [here][stack-size].
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// s.builder()
+ /// .stack_size(32 * 1024)
+ /// .spawn(|_| println!("Running a child thread"))
+ /// .unwrap();
+ /// }).unwrap();
+ /// ```
+ ///
+ /// [stack-size]: std::thread#stack-size
+ pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
+ self.builder = self.builder.stack_size(size);
+ self
+ }
+
+ /// Spawns a scoped thread with this configuration.
+ ///
+ /// The scoped thread is passed a reference to this scope as an argument, which can be used for
+ /// spawning nested threads.
+ ///
+ /// The returned handle can be used to manually join the thread before the scope exits.
+ ///
+ /// # Errors
+ ///
+ /// Unlike the [`Scope::spawn`] method, this method yields an
+ /// [`io::Result`] to capture any failure to create the thread at
+ /// the OS level.
+ ///
+ /// [`io::Result`]: std::io::Result
+ ///
+ /// # Panics
+ ///
+ /// Panics if a thread name was set and it contained null bytes.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let handle = s.builder()
+ /// .spawn(|_| {
+ /// println!("A child thread is running");
+ /// 42
+ /// })
+ /// .unwrap();
+ ///
+ /// // Join the thread and retrieve its result.
+ /// let res = handle.join().unwrap();
+ /// assert_eq!(res, 42);
+ /// }).unwrap();
+ /// ```
+ pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
+ where
+ F: FnOnce(&Scope<'env>) -> T,
+ F: Send + 'env,
+ T: Send + 'env,
+ {
+ // The result of `f` will be stored here.
+ let result = SharedOption::default();
+
+ // Spawn the thread and grab its join handle and thread handle.
+ let (handle, thread) = {
+ let result = Arc::clone(&result);
+
+ // A clone of the scope that will be moved into the new thread.
+ let scope = Scope::<'env> {
+ handles: Arc::clone(&self.scope.handles),
+ wait_group: self.scope.wait_group.clone(),
+ _marker: PhantomData,
+ };
+
+ // Spawn the thread.
+ let handle = {
+ let closure = move || {
+ // Make sure the scope is inside the closure with the proper `'env` lifetime.
+ let scope: Scope<'env> = scope;
+
+ // Run the closure.
+ let res = f(&scope);
+
+ // Store the result if the closure didn't panic.
+ *result.lock().unwrap() = Some(res);
+ };
+
+ // Allocate `closure` on the heap and erase the `'env` bound.
+ let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
+ let closure: Box<dyn FnOnce() + Send + 'static> =
+ unsafe { mem::transmute(closure) };
+
+ // Finally, spawn the closure.
+ self.builder.spawn(closure)?
+ };
+
+ let thread = handle.thread().clone();
+ let handle = Arc::new(Mutex::new(Some(handle)));
+ (handle, thread)
+ };
+
+ // Add the handle to the shared list of join handles.
+ self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
+
+ Ok(ScopedJoinHandle {
+ handle,
+ result,
+ thread,
+ _marker: PhantomData,
+ })
+ }
+}
+
+unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
+unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
+
+/// A handle that can be used to join its scoped thread.
+///
+/// This struct is created by the [`Scope::spawn`] method and the
+/// [`ScopedThreadBuilder::spawn`] method.
+pub struct ScopedJoinHandle<'scope, T> {
+ /// A join handle to the spawned thread.
+ handle: SharedOption<thread::JoinHandle<()>>,
+
+ /// Holds the result of the inner closure.
+ result: SharedOption<T>,
+
+ /// A handle to the the spawned thread.
+ thread: thread::Thread,
+
+ /// Borrows the parent scope with lifetime `'scope`.
+ _marker: PhantomData<&'scope ()>,
+}
+
+impl<T> ScopedJoinHandle<'_, T> {
+ /// Waits for the thread to finish and returns its result.
+ ///
+ /// If the child thread panics, an error is returned. Note that if panics are implemented by
+ /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
+ ///
+ /// # Panics
+ ///
+ /// This function may panic on some platforms if a thread attempts to join itself or otherwise
+ /// may create a deadlock with joining threads.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
+ /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
+ ///
+ /// // Join the first thread and verify that it succeeded.
+ /// let res = handle1.join();
+ /// assert!(res.is_ok());
+ ///
+ /// // Join the second thread and verify that it panicked.
+ /// let res = handle2.join();
+ /// assert!(res.is_err());
+ /// }).unwrap();
+ /// ```
+ pub fn join(self) -> thread::Result<T> {
+ // Take out the handle. The handle will surely be available because the root scope waits
+ // for nested scopes before joining remaining threads.
+ let handle = self.handle.lock().unwrap().take().unwrap();
+
+ // Join the thread and then take the result out of its inner closure.
+ handle
+ .join()
+ .map(|()| self.result.lock().unwrap().take().unwrap())
+ }
+
+ /// Returns a handle to the underlying thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use crossbeam_utils::thread;
+ ///
+ /// thread::scope(|s| {
+ /// let handle = s.spawn(|_| println!("A child thread is running"));
+ /// println!("The child thread ID: {:?}", handle.thread().id());
+ /// }).unwrap();
+ /// ```
+ pub fn thread(&self) -> &thread::Thread {
+ &self.thread
+ }
+}
+
+cfg_if! {
+ if #[cfg(unix)] {
+ use std::os::unix::thread::{JoinHandleExt, RawPthread};
+
+ impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
+ fn as_pthread_t(&self) -> RawPthread {
+ // Borrow the handle. The handle will surely be available because the root scope waits
+ // for nested scopes before joining remaining threads.
+ let handle = self.handle.lock().unwrap();
+ handle.as_ref().unwrap().as_pthread_t()
+ }
+ fn into_pthread_t(self) -> RawPthread {
+ self.as_pthread_t()
+ }
+ }
+ } else if #[cfg(windows)] {
+ use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
+
+ impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
+ fn as_raw_handle(&self) -> RawHandle {
+ // Borrow the handle. The handle will surely be available because the root scope waits
+ // for nested scopes before joining remaining threads.
+ let handle = self.handle.lock().unwrap();
+ handle.as_ref().unwrap().as_raw_handle()
+ }
+ }
+
+ impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
+ fn into_raw_handle(self) -> RawHandle {
+ self.as_raw_handle()
+ }
+ }
+ }
+}
+
+impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("ScopedJoinHandle { .. }")
+ }
+}