diff options
Diffstat (limited to 'third_party/rust/crossbeam-utils/src/thread.rs')
-rw-r--r-- | third_party/rust/crossbeam-utils/src/thread.rs | 587 |
1 files changed, 587 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-utils/src/thread.rs b/third_party/rust/crossbeam-utils/src/thread.rs new file mode 100644 index 0000000000..f1086d9ec3 --- /dev/null +++ b/third_party/rust/crossbeam-utils/src/thread.rs @@ -0,0 +1,587 @@ +//! Threads that can borrow variables from the stack. +//! +//! Create a scope when spawned threads need to access variables on the stack: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! thread::scope(|s| { +//! for person in &people { +//! s.spawn(move |_| { +//! println!("Hello, {}!", person); +//! }); +//! } +//! }).unwrap(); +//! ``` +//! +//! # Why scoped threads? +//! +//! Suppose we wanted to re-write the previous example using plain threads: +//! +//! ```compile_fail,E0597 +//! use std::thread; +//! +//! let people = vec![ +//! "Alice".to_string(), +//! "Bob".to_string(), +//! "Carol".to_string(), +//! ]; +//! +//! let mut threads = Vec::new(); +//! +//! for person in &people { +//! threads.push(thread::spawn(move || { +//! println!("Hello, {}!", person); +//! })); +//! } +//! +//! for thread in threads { +//! thread.join().unwrap(); +//! } +//! ``` +//! +//! This doesn't work because the borrow checker complains about `people` not living long enough: +//! +//! ```text +//! error[E0597]: `people` does not live long enough +//! --> src/main.rs:12:20 +//! | +//! 12 | for person in &people { +//! | ^^^^^^ borrowed value does not live long enough +//! ... +//! 21 | } +//! | - borrowed value only lives until here +//! | +//! = note: borrowed value must be valid for the static lifetime... +//! ``` +//! +//! The problem here is that spawned threads are not allowed to borrow variables on stack because +//! the compiler cannot prove they will be joined before `people` is destroyed. +//! +//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined +//! before the scope ends. +//! +//! # How scoped threads work +//! +//! If a variable is borrowed by a thread, the thread must complete before the variable is +//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the +//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. +//! +//! A scope creates a clear boundary between variables outside the scope and threads inside the +//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. +//! This way we guarantee to the borrow checker that scoped threads only live within the scope and +//! can safely access variables outside it. +//! +//! # Nesting scoped threads +//! +//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little +//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such +//! cannot be borrowed by scoped threads: +//! +//! ```compile_fail,E0373,E0521 +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! s.spawn(|_| { +//! // Not going to compile because we're trying to borrow `s`, +//! // which lives *inside* the scope! :( +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }); +//! ``` +//! +//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an +//! argument, which can be used for spawning nested threads: +//! +//! ``` +//! use crossbeam_utils::thread; +//! +//! thread::scope(|s| { +//! // Note the `|s|` here. +//! s.spawn(|s| { +//! // Yay, this works because we're using a fresh argument `s`! :) +//! s.spawn(|_| println!("nested thread")); +//! }); +//! }).unwrap(); +//! ``` + +use std::fmt; +use std::io; +use std::marker::PhantomData; +use std::mem; +use std::panic; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::sync::WaitGroup; +use cfg_if::cfg_if; + +type SharedVec<T> = Arc<Mutex<Vec<T>>>; +type SharedOption<T> = Arc<Mutex<Option<T>>>; + +/// Creates a new scope for spawning threads. +/// +/// All child threads that haven't been manually joined will be automatically joined just before +/// this function invocation ends. If all joined threads have successfully completed, `Ok` is +/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is +/// returned containing errors from panicked threads. Note that if panics are implemented by +/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// let var = vec![1, 2, 3]; +/// +/// thread::scope(|s| { +/// s.spawn(|_| { +/// println!("A child thread borrowing `var`: {:?}", var); +/// }); +/// }).unwrap(); +/// ``` +pub fn scope<'env, F, R>(f: F) -> thread::Result<R> +where + F: FnOnce(&Scope<'env>) -> R, +{ + let wg = WaitGroup::new(); + let scope = Scope::<'env> { + handles: SharedVec::default(), + wait_group: wg.clone(), + _marker: PhantomData, + }; + + // Execute the scoped function, but catch any panics. + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); + + // Wait until all nested scopes are dropped. + drop(scope.wait_group); + wg.wait(); + + // Join all remaining spawned threads. + let panics: Vec<_> = scope + .handles + .lock() + .unwrap() + // Filter handles that haven't been joined, join them, and collect errors. + .drain(..) + .filter_map(|handle| handle.lock().unwrap().take()) + .filter_map(|handle| handle.join().err()) + .collect(); + + // If `f` has panicked, resume unwinding. + // If any of the child threads have panicked, return the panic errors. + // Otherwise, everything is OK and return the result of `f`. + match result { + Err(err) => panic::resume_unwind(err), + Ok(res) => { + if panics.is_empty() { + Ok(res) + } else { + Err(Box::new(panics)) + } + } + } +} + +/// A scope for spawning threads. +pub struct Scope<'env> { + /// The list of the thread join handles. + handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, + + /// Used to wait until all subscopes all dropped. + wait_group: WaitGroup, + + /// Borrows data with invariant lifetime `'env`. + _marker: PhantomData<&'env mut &'env ()>, +} + +unsafe impl Sync for Scope<'_> {} + +impl<'env> Scope<'env> { + /// Spawns a scoped thread. + /// + /// This method is similar to the [`spawn`] function in Rust's standard library. The difference + /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits, + /// allowing it to reference variables outside the scope. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned [handle](ScopedJoinHandle) can be used to manually + /// [join](ScopedJoinHandle::join) the thread before the scope exits. + /// + /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the + /// stack size or the name of the thread, use this API instead. + /// + /// [`spawn`]: std::thread::spawn + /// + /// # Panics + /// + /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`] + /// to recover from such errors. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + self.builder() + .spawn(f) + .expect("failed to spawn scoped thread") + } + + /// Creates a builder that can configure a thread before spawning. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .spawn(|_| println!("A child thread is running")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { + ScopedThreadBuilder { + scope: self, + builder: thread::Builder::new(), + } + } +} + +impl fmt::Debug for Scope<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Scope { .. }") + } +} + +/// Configures the properties of a new thread. +/// +/// The two configurable properties are: +/// +/// - [`name`]: Specifies an [associated name for the thread][naming-threads]. +/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size]. +/// +/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the +/// thread handle with the given configuration. +/// +/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return +/// value. You may want to use this builder when you want to recover from a failure to launch a +/// thread. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_utils::thread; +/// +/// thread::scope(|s| { +/// s.builder() +/// .spawn(|_| println!("Running a child thread")) +/// .unwrap(); +/// }).unwrap(); +/// ``` +/// +/// [`name`]: ScopedThreadBuilder::name +/// [`stack_size`]: ScopedThreadBuilder::stack_size +/// [`spawn`]: ScopedThreadBuilder::spawn +/// [`io::Result`]: std::io::Result +/// [naming-threads]: std::thread#naming-threads +/// [stack-size]: std::thread#stack-size +#[derive(Debug)] +pub struct ScopedThreadBuilder<'scope, 'env> { + scope: &'scope Scope<'env>, + builder: thread::Builder, +} + +impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { + /// Sets the name for the new thread. + /// + /// The name must not contain null bytes (`\0`). + /// + /// For more information about named threads, see [here][naming-threads]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// use std::thread::current; + /// + /// thread::scope(|s| { + /// s.builder() + /// .name("my thread".to_string()) + /// .spawn(|_| assert_eq!(current().name(), Some("my thread"))) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [naming-threads]: std::thread#naming-threads + pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.name(name); + self + } + + /// Sets the size of the stack for the new thread. + /// + /// The stack size is measured in bytes. + /// + /// For more information about the stack size for threads, see [here][stack-size]. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// s.builder() + /// .stack_size(32 * 1024) + /// .spawn(|_| println!("Running a child thread")) + /// .unwrap(); + /// }).unwrap(); + /// ``` + /// + /// [stack-size]: std::thread#stack-size + pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { + self.builder = self.builder.stack_size(size); + self + } + + /// Spawns a scoped thread with this configuration. + /// + /// The scoped thread is passed a reference to this scope as an argument, which can be used for + /// spawning nested threads. + /// + /// The returned handle can be used to manually join the thread before the scope exits. + /// + /// # Errors + /// + /// Unlike the [`Scope::spawn`] method, this method yields an + /// [`io::Result`] to capture any failure to create the thread at + /// the OS level. + /// + /// [`io::Result`]: std::io::Result + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.builder() + /// .spawn(|_| { + /// println!("A child thread is running"); + /// 42 + /// }) + /// .unwrap(); + /// + /// // Join the thread and retrieve its result. + /// let res = handle.join().unwrap(); + /// assert_eq!(res, 42); + /// }).unwrap(); + /// ``` + pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce(&Scope<'env>) -> T, + F: Send + 'env, + T: Send + 'env, + { + // The result of `f` will be stored here. + let result = SharedOption::default(); + + // Spawn the thread and grab its join handle and thread handle. + let (handle, thread) = { + let result = Arc::clone(&result); + + // A clone of the scope that will be moved into the new thread. + let scope = Scope::<'env> { + handles: Arc::clone(&self.scope.handles), + wait_group: self.scope.wait_group.clone(), + _marker: PhantomData, + }; + + // Spawn the thread. + let handle = { + let closure = move || { + // Make sure the scope is inside the closure with the proper `'env` lifetime. + let scope: Scope<'env> = scope; + + // Run the closure. + let res = f(&scope); + + // Store the result if the closure didn't panic. + *result.lock().unwrap() = Some(res); + }; + + // Allocate `closure` on the heap and erase the `'env` bound. + let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure); + let closure: Box<dyn FnOnce() + Send + 'static> = + unsafe { mem::transmute(closure) }; + + // Finally, spawn the closure. + self.builder.spawn(closure)? + }; + + let thread = handle.thread().clone(); + let handle = Arc::new(Mutex::new(Some(handle))); + (handle, thread) + }; + + // Add the handle to the shared list of join handles. + self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); + + Ok(ScopedJoinHandle { + handle, + result, + thread, + _marker: PhantomData, + }) + } +} + +unsafe impl<T> Send for ScopedJoinHandle<'_, T> {} +unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {} + +/// A handle that can be used to join its scoped thread. +/// +/// This struct is created by the [`Scope::spawn`] method and the +/// [`ScopedThreadBuilder::spawn`] method. +pub struct ScopedJoinHandle<'scope, T> { + /// A join handle to the spawned thread. + handle: SharedOption<thread::JoinHandle<()>>, + + /// Holds the result of the inner closure. + result: SharedOption<T>, + + /// A handle to the the spawned thread. + thread: thread::Thread, + + /// Borrows the parent scope with lifetime `'scope`. + _marker: PhantomData<&'scope ()>, +} + +impl<T> ScopedJoinHandle<'_, T> { + /// Waits for the thread to finish and returns its result. + /// + /// If the child thread panics, an error is returned. Note that if panics are implemented by + /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. + /// + /// # Panics + /// + /// This function may panic on some platforms if a thread attempts to join itself or otherwise + /// may create a deadlock with joining threads. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)")); + /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :(")); + /// + /// // Join the first thread and verify that it succeeded. + /// let res = handle1.join(); + /// assert!(res.is_ok()); + /// + /// // Join the second thread and verify that it panicked. + /// let res = handle2.join(); + /// assert!(res.is_err()); + /// }).unwrap(); + /// ``` + pub fn join(self) -> thread::Result<T> { + // Take out the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap().take().unwrap(); + + // Join the thread and then take the result out of its inner closure. + handle + .join() + .map(|()| self.result.lock().unwrap().take().unwrap()) + } + + /// Returns a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_utils::thread; + /// + /// thread::scope(|s| { + /// let handle = s.spawn(|_| println!("A child thread is running")); + /// println!("The child thread ID: {:?}", handle.thread().id()); + /// }).unwrap(); + /// ``` + pub fn thread(&self) -> &thread::Thread { + &self.thread + } +} + +cfg_if! { + if #[cfg(unix)] { + use std::os::unix::thread::{JoinHandleExt, RawPthread}; + + impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> { + fn as_pthread_t(&self) -> RawPthread { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_pthread_t() + } + fn into_pthread_t(self) -> RawPthread { + self.as_pthread_t() + } + } + } else if #[cfg(windows)] { + use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle}; + + impl<T> AsRawHandle for ScopedJoinHandle<'_, T> { + fn as_raw_handle(&self) -> RawHandle { + // Borrow the handle. The handle will surely be available because the root scope waits + // for nested scopes before joining remaining threads. + let handle = self.handle.lock().unwrap(); + handle.as_ref().unwrap().as_raw_handle() + } + } + + impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> { + fn into_raw_handle(self) -> RawHandle { + self.as_raw_handle() + } + } + } +} + +impl<T> fmt::Debug for ScopedJoinHandle<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("ScopedJoinHandle { .. }") + } +} |