diff options
Diffstat (limited to '')
-rw-r--r-- | library/std/src/thread/local.rs | 1141 | ||||
-rw-r--r-- | library/std/src/thread/local/dynamic_tests.rs | 40 | ||||
-rw-r--r-- | library/std/src/thread/local/tests.rs | 317 | ||||
-rw-r--r-- | library/std/src/thread/mod.rs | 1621 | ||||
-rw-r--r-- | library/std/src/thread/scoped.rs | 343 | ||||
-rw-r--r-- | library/std/src/thread/tests.rs | 331 |
6 files changed, 3793 insertions, 0 deletions
diff --git a/library/std/src/thread/local.rs b/library/std/src/thread/local.rs new file mode 100644 index 000000000..f4750cdf7 --- /dev/null +++ b/library/std/src/thread/local.rs @@ -0,0 +1,1141 @@ +//! Thread local storage + +#![unstable(feature = "thread_local_internals", issue = "none")] + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +#[cfg(test)] +mod dynamic_tests; + +use crate::cell::{Cell, RefCell}; +use crate::error::Error; +use crate::fmt; + +/// A thread local storage key which owns its contents. +/// +/// This key uses the fastest possible implementation available to it for the +/// target platform. It is instantiated with the [`thread_local!`] macro and the +/// primary method is the [`with`] method. +/// +/// The [`with`] method yields a reference to the contained value which cannot be +/// sent across threads or escape the given closure. +/// +/// [`thread_local!`]: crate::thread_local +/// +/// # Initialization and Destruction +/// +/// Initialization is dynamically performed on the first call to [`with`] +/// within a thread, and values that implement [`Drop`] get destructed when a +/// thread exits. Some caveats apply, which are explained below. +/// +/// A `LocalKey`'s initializer cannot recursively depend on itself, and using +/// a `LocalKey` in this way will cause the initializer to infinitely recurse +/// on the first call to `with`. +/// +/// # Examples +/// +/// ``` +/// use std::cell::RefCell; +/// use std::thread; +/// +/// thread_local!(static FOO: RefCell<u32> = RefCell::new(1)); +/// +/// FOO.with(|f| { +/// assert_eq!(*f.borrow(), 1); +/// *f.borrow_mut() = 2; +/// }); +/// +/// // each thread starts out with the initial value of 1 +/// let t = thread::spawn(move|| { +/// FOO.with(|f| { +/// assert_eq!(*f.borrow(), 1); +/// *f.borrow_mut() = 3; +/// }); +/// }); +/// +/// // wait for the thread to complete and bail out on panic +/// t.join().unwrap(); +/// +/// // we retain our original value of 2 despite the child thread +/// FOO.with(|f| { +/// assert_eq!(*f.borrow(), 2); +/// }); +/// ``` +/// +/// # Platform-specific behavior +/// +/// Note that a "best effort" is made to ensure that destructors for types +/// stored in thread local storage are run, but not all platforms can guarantee +/// that destructors will be run for all types in thread local storage. For +/// example, there are a number of known caveats where destructors are not run: +/// +/// 1. On Unix systems when pthread-based TLS is being used, destructors will +/// not be run for TLS values on the main thread when it exits. Note that the +/// application will exit immediately after the main thread exits as well. +/// 2. On all platforms it's possible for TLS to re-initialize other TLS slots +/// during destruction. Some platforms ensure that this cannot happen +/// infinitely by preventing re-initialization of any slot that has been +/// destroyed, but not all platforms have this guard. Those platforms that do +/// not guard typically have a synthetic limit after which point no more +/// destructors are run. +/// 3. When the process exits on Windows systems, TLS destructors may only be +/// run on the thread that causes the process to exit. This is because the +/// other threads may be forcibly terminated. +/// +/// ## Synchronization in thread-local destructors +/// +/// On Windows, synchronization operations (such as [`JoinHandle::join`]) in +/// thread local destructors are prone to deadlocks and so should be avoided. +/// This is because the [loader lock] is held while a destructor is run. The +/// lock is acquired whenever a thread starts or exits or when a DLL is loaded +/// or unloaded. Therefore these events are blocked for as long as a thread +/// local destructor is running. +/// +/// [loader lock]: https://docs.microsoft.com/en-us/windows/win32/dlls/dynamic-link-library-best-practices +/// [`JoinHandle::join`]: crate::thread::JoinHandle::join +/// [`with`]: LocalKey::with +#[stable(feature = "rust1", since = "1.0.0")] +pub struct LocalKey<T: 'static> { + // This outer `LocalKey<T>` type is what's going to be stored in statics, + // but actual data inside will sometimes be tagged with #[thread_local]. + // It's not valid for a true static to reference a #[thread_local] static, + // so we get around that by exposing an accessor through a layer of function + // indirection (this thunk). + // + // Note that the thunk is itself unsafe because the returned lifetime of the + // slot where data lives, `'static`, is not actually valid. The lifetime + // here is actually slightly shorter than the currently running thread! + // + // Although this is an extra layer of indirection, it should in theory be + // trivially devirtualizable by LLVM because the value of `inner` never + // changes and the constant should be readonly within a crate. This mainly + // only runs into problems when TLS statics are exported across crates. + inner: unsafe fn(Option<&mut Option<T>>) -> Option<&'static T>, +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl<T: 'static> fmt::Debug for LocalKey<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalKey").finish_non_exhaustive() + } +} + +/// Declare a new thread local storage key of type [`std::thread::LocalKey`]. +/// +/// # Syntax +/// +/// The macro wraps any number of static declarations and makes them thread local. +/// Publicity and attributes for each static are allowed. Example: +/// +/// ``` +/// use std::cell::RefCell; +/// thread_local! { +/// pub static FOO: RefCell<u32> = RefCell::new(1); +/// +/// #[allow(unused)] +/// static BAR: RefCell<f32> = RefCell::new(1.0); +/// } +/// # fn main() {} +/// ``` +/// +/// See [`LocalKey` documentation][`std::thread::LocalKey`] for more +/// information. +/// +/// [`std::thread::LocalKey`]: crate::thread::LocalKey +#[macro_export] +#[stable(feature = "rust1", since = "1.0.0")] +#[cfg_attr(not(test), rustc_diagnostic_item = "thread_local_macro")] +#[allow_internal_unstable(thread_local_internals)] +macro_rules! thread_local { + // empty (base case for the recursion) + () => {}; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty = const { $init:expr }; $($rest:tt)*) => ( + $crate::__thread_local_inner!($(#[$attr])* $vis $name, $t, const $init); + $crate::thread_local!($($rest)*); + ); + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty = const { $init:expr }) => ( + $crate::__thread_local_inner!($(#[$attr])* $vis $name, $t, const $init); + ); + + // process multiple declarations + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty = $init:expr; $($rest:tt)*) => ( + $crate::__thread_local_inner!($(#[$attr])* $vis $name, $t, $init); + $crate::thread_local!($($rest)*); + ); + + // handle a single declaration + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty = $init:expr) => ( + $crate::__thread_local_inner!($(#[$attr])* $vis $name, $t, $init); + ); +} + +#[doc(hidden)] +#[unstable(feature = "thread_local_internals", reason = "should not be necessary", issue = "none")] +#[macro_export] +#[allow_internal_unstable(thread_local_internals, cfg_target_thread_local, thread_local)] +#[allow_internal_unsafe] +macro_rules! __thread_local_inner { + // used to generate the `LocalKey` value for const-initialized thread locals + (@key $t:ty, const $init:expr) => {{ + #[cfg_attr(not(windows), inline)] // see comments below + #[deny(unsafe_op_in_unsafe_fn)] + unsafe fn __getit( + _init: $crate::option::Option<&mut $crate::option::Option<$t>>, + ) -> $crate::option::Option<&'static $t> { + const INIT_EXPR: $t = $init; + + // wasm without atomics maps directly to `static mut`, and dtors + // aren't implemented because thread dtors aren't really a thing + // on wasm right now + // + // FIXME(#84224) this should come after the `target_thread_local` + // block. + #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] + { + static mut VAL: $t = INIT_EXPR; + unsafe { $crate::option::Option::Some(&VAL) } + } + + // If the platform has support for `#[thread_local]`, use it. + #[cfg(all( + target_thread_local, + not(all(target_family = "wasm", not(target_feature = "atomics"))), + ))] + { + #[thread_local] + static mut VAL: $t = INIT_EXPR; + + // If a dtor isn't needed we can do something "very raw" and + // just get going. + if !$crate::mem::needs_drop::<$t>() { + unsafe { + return $crate::option::Option::Some(&VAL) + } + } + + // 0 == dtor not registered + // 1 == dtor registered, dtor not run + // 2 == dtor registered and is running or has run + #[thread_local] + static mut STATE: $crate::primitive::u8 = 0; + + unsafe extern "C" fn destroy(ptr: *mut $crate::primitive::u8) { + let ptr = ptr as *mut $t; + + unsafe { + $crate::debug_assert_eq!(STATE, 1); + STATE = 2; + $crate::ptr::drop_in_place(ptr); + } + } + + unsafe { + match STATE { + // 0 == we haven't registered a destructor, so do + // so now. + 0 => { + $crate::thread::__FastLocalKeyInner::<$t>::register_dtor( + $crate::ptr::addr_of_mut!(VAL) as *mut $crate::primitive::u8, + destroy, + ); + STATE = 1; + $crate::option::Option::Some(&VAL) + } + // 1 == the destructor is registered and the value + // is valid, so return the pointer. + 1 => $crate::option::Option::Some(&VAL), + // otherwise the destructor has already run, so we + // can't give access. + _ => $crate::option::Option::None, + } + } + } + + // On platforms without `#[thread_local]` we fall back to the + // same implementation as below for os thread locals. + #[cfg(all( + not(target_thread_local), + not(all(target_family = "wasm", not(target_feature = "atomics"))), + ))] + { + #[inline] + const fn __init() -> $t { INIT_EXPR } + static __KEY: $crate::thread::__OsLocalKeyInner<$t> = + $crate::thread::__OsLocalKeyInner::new(); + #[allow(unused_unsafe)] + unsafe { + __KEY.get(move || { + if let $crate::option::Option::Some(init) = _init { + if let $crate::option::Option::Some(value) = init.take() { + return value; + } else if $crate::cfg!(debug_assertions) { + $crate::unreachable!("missing initial value"); + } + } + __init() + }) + } + } + } + + unsafe { + $crate::thread::LocalKey::new(__getit) + } + }}; + + // used to generate the `LocalKey` value for `thread_local!` + (@key $t:ty, $init:expr) => { + { + #[inline] + fn __init() -> $t { $init } + + // When reading this function you might ask "why is this inlined + // everywhere other than Windows?", and that's a very reasonable + // question to ask. The short story is that it segfaults rustc if + // this function is inlined. The longer story is that Windows looks + // to not support `extern` references to thread locals across DLL + // boundaries. This appears to at least not be supported in the ABI + // that LLVM implements. + // + // Because of this we never inline on Windows, but we do inline on + // other platforms (where external references to thread locals + // across DLLs are supported). A better fix for this would be to + // inline this function on Windows, but only for "statically linked" + // components. For example if two separately compiled rlibs end up + // getting linked into a DLL then it's fine to inline this function + // across that boundary. It's only not fine to inline this function + // across a DLL boundary. Unfortunately rustc doesn't currently + // have this sort of logic available in an attribute, and it's not + // clear that rustc is even equipped to answer this (it's more of a + // Cargo question kinda). This means that, unfortunately, Windows + // gets the pessimistic path for now where it's never inlined. + // + // The issue of "should enable on Windows sometimes" is #84933 + #[cfg_attr(not(windows), inline)] + unsafe fn __getit( + init: $crate::option::Option<&mut $crate::option::Option<$t>>, + ) -> $crate::option::Option<&'static $t> { + #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] + static __KEY: $crate::thread::__StaticLocalKeyInner<$t> = + $crate::thread::__StaticLocalKeyInner::new(); + + #[thread_local] + #[cfg(all( + target_thread_local, + not(all(target_family = "wasm", not(target_feature = "atomics"))), + ))] + static __KEY: $crate::thread::__FastLocalKeyInner<$t> = + $crate::thread::__FastLocalKeyInner::new(); + + #[cfg(all( + not(target_thread_local), + not(all(target_family = "wasm", not(target_feature = "atomics"))), + ))] + static __KEY: $crate::thread::__OsLocalKeyInner<$t> = + $crate::thread::__OsLocalKeyInner::new(); + + // FIXME: remove the #[allow(...)] marker when macros don't + // raise warning for missing/extraneous unsafe blocks anymore. + // See https://github.com/rust-lang/rust/issues/74838. + #[allow(unused_unsafe)] + unsafe { + __KEY.get(move || { + if let $crate::option::Option::Some(init) = init { + if let $crate::option::Option::Some(value) = init.take() { + return value; + } else if $crate::cfg!(debug_assertions) { + $crate::unreachable!("missing default value"); + } + } + __init() + }) + } + } + + unsafe { + $crate::thread::LocalKey::new(__getit) + } + } + }; + ($(#[$attr:meta])* $vis:vis $name:ident, $t:ty, $($init:tt)*) => { + $(#[$attr])* $vis const $name: $crate::thread::LocalKey<$t> = + $crate::__thread_local_inner!(@key $t, $($init)*); + } +} + +/// An error returned by [`LocalKey::try_with`](struct.LocalKey.html#method.try_with). +#[stable(feature = "thread_local_try_with", since = "1.26.0")] +#[non_exhaustive] +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct AccessError; + +#[stable(feature = "thread_local_try_with", since = "1.26.0")] +impl fmt::Debug for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AccessError").finish() + } +} + +#[stable(feature = "thread_local_try_with", since = "1.26.0")] +impl fmt::Display for AccessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("already destroyed", f) + } +} + +#[stable(feature = "thread_local_try_with", since = "1.26.0")] +impl Error for AccessError {} + +impl<T: 'static> LocalKey<T> { + #[doc(hidden)] + #[unstable( + feature = "thread_local_internals", + reason = "recently added to create a key", + issue = "none" + )] + #[rustc_const_unstable(feature = "thread_local_internals", issue = "none")] + pub const unsafe fn new( + inner: unsafe fn(Option<&mut Option<T>>) -> Option<&'static T>, + ) -> LocalKey<T> { + LocalKey { inner } + } + + /// Acquires a reference to the value in this TLS key. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// This function will `panic!()` if the key currently has its + /// destructor running, and it **may** panic if the destructor has + /// previously been run for this thread. + #[stable(feature = "rust1", since = "1.0.0")] + pub fn with<F, R>(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + self.try_with(f).expect( + "cannot access a Thread Local Storage value \ + during or after destruction", + ) + } + + /// Acquires a reference to the value in this TLS key. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. If the key has been destroyed (which may happen if this is called + /// in a destructor), this function will return an [`AccessError`]. + /// + /// # Panics + /// + /// This function will still `panic!()` if the key is uninitialized and the + /// key's initializer panics. + #[stable(feature = "thread_local_try_with", since = "1.26.0")] + #[inline] + pub fn try_with<F, R>(&'static self, f: F) -> Result<R, AccessError> + where + F: FnOnce(&T) -> R, + { + unsafe { + let thread_local = (self.inner)(None).ok_or(AccessError)?; + Ok(f(thread_local)) + } + } + + /// Acquires a reference to the value in this TLS key, initializing it with + /// `init` if it wasn't already initialized on this thread. + /// + /// If `init` was used to initialize the thread local variable, `None` is + /// passed as the first argument to `f`. If it was already initialized, + /// `Some(init)` is passed to `f`. + /// + /// # Panics + /// + /// This function will panic if the key currently has its destructor + /// running, and it **may** panic if the destructor has previously been run + /// for this thread. + fn initialize_with<F, R>(&'static self, init: T, f: F) -> R + where + F: FnOnce(Option<T>, &T) -> R, + { + unsafe { + let mut init = Some(init); + let reference = (self.inner)(Some(&mut init)).expect( + "cannot access a Thread Local Storage value \ + during or after destruction", + ); + f(init, reference) + } + } +} + +impl<T: 'static> LocalKey<Cell<T>> { + /// Sets or initializes the contained value. + /// + /// Unlike the other methods, this will *not* run the lazy initializer of + /// the thread local. Instead, it will be directly initialized with the + /// given value if it wasn't initialized yet. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::Cell; + /// + /// thread_local! { + /// static X: Cell<i32> = panic!("!"); + /// } + /// + /// // Calling X.get() here would result in a panic. + /// + /// X.set(123); // But X.set() is fine, as it skips the initializer above. + /// + /// assert_eq!(X.get(), 123); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn set(&'static self, value: T) { + self.initialize_with(Cell::new(value), |value, cell| { + if let Some(value) = value { + // The cell was already initialized, so `value` wasn't used to + // initialize it. So we overwrite the current value with the + // new one instead. + cell.set(value.into_inner()); + } + }); + } + + /// Returns a copy of the contained value. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::Cell; + /// + /// thread_local! { + /// static X: Cell<i32> = Cell::new(1); + /// } + /// + /// assert_eq!(X.get(), 1); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn get(&'static self) -> T + where + T: Copy, + { + self.with(|cell| cell.get()) + } + + /// Takes the contained value, leaving `Default::default()` in its place. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::Cell; + /// + /// thread_local! { + /// static X: Cell<Option<i32>> = Cell::new(Some(1)); + /// } + /// + /// assert_eq!(X.take(), Some(1)); + /// assert_eq!(X.take(), None); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn take(&'static self) -> T + where + T: Default, + { + self.with(|cell| cell.take()) + } + + /// Replaces the contained value, returning the old value. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::Cell; + /// + /// thread_local! { + /// static X: Cell<i32> = Cell::new(1); + /// } + /// + /// assert_eq!(X.replace(2), 1); + /// assert_eq!(X.replace(3), 2); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn replace(&'static self, value: T) -> T { + self.with(|cell| cell.replace(value)) + } +} + +impl<T: 'static> LocalKey<RefCell<T>> { + /// Acquires a reference to the contained value. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the value is currently mutably borrowed. + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Example + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::RefCell; + /// + /// thread_local! { + /// static X: RefCell<Vec<i32>> = RefCell::new(Vec::new()); + /// } + /// + /// X.with_borrow(|v| assert!(v.is_empty())); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn with_borrow<F, R>(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + self.with(|cell| f(&cell.borrow())) + } + + /// Acquires a mutable reference to the contained value. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the value is currently borrowed. + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Example + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::RefCell; + /// + /// thread_local! { + /// static X: RefCell<Vec<i32>> = RefCell::new(Vec::new()); + /// } + /// + /// X.with_borrow_mut(|v| v.push(1)); + /// + /// X.with_borrow(|v| assert_eq!(*v, vec![1])); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn with_borrow_mut<F, R>(&'static self, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + self.with(|cell| f(&mut cell.borrow_mut())) + } + + /// Sets or initializes the contained value. + /// + /// Unlike the other methods, this will *not* run the lazy initializer of + /// the thread local. Instead, it will be directly initialized with the + /// given value if it wasn't initialized yet. + /// + /// # Panics + /// + /// Panics if the value is currently borrowed. + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::RefCell; + /// + /// thread_local! { + /// static X: RefCell<Vec<i32>> = panic!("!"); + /// } + /// + /// // Calling X.with() here would result in a panic. + /// + /// X.set(vec![1, 2, 3]); // But X.set() is fine, as it skips the initializer above. + /// + /// X.with_borrow(|v| assert_eq!(*v, vec![1, 2, 3])); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn set(&'static self, value: T) { + self.initialize_with(RefCell::new(value), |value, cell| { + if let Some(value) = value { + // The cell was already initialized, so `value` wasn't used to + // initialize it. So we overwrite the current value with the + // new one instead. + *cell.borrow_mut() = value.into_inner(); + } + }); + } + + /// Takes the contained value, leaving `Default::default()` in its place. + /// + /// This will lazily initialize the value if this thread has not referenced + /// this key yet. + /// + /// # Panics + /// + /// Panics if the value is currently borrowed. + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::RefCell; + /// + /// thread_local! { + /// static X: RefCell<Vec<i32>> = RefCell::new(Vec::new()); + /// } + /// + /// X.with_borrow_mut(|v| v.push(1)); + /// + /// let a = X.take(); + /// + /// assert_eq!(a, vec![1]); + /// + /// X.with_borrow(|v| assert!(v.is_empty())); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn take(&'static self) -> T + where + T: Default, + { + self.with(|cell| cell.take()) + } + + /// Replaces the contained value, returning the old value. + /// + /// # Panics + /// + /// Panics if the value is currently borrowed. + /// + /// Panics if the key currently has its destructor running, + /// and it **may** panic if the destructor has previously been run for this thread. + /// + /// # Examples + /// + /// ``` + /// #![feature(local_key_cell_methods)] + /// use std::cell::RefCell; + /// + /// thread_local! { + /// static X: RefCell<Vec<i32>> = RefCell::new(Vec::new()); + /// } + /// + /// let prev = X.replace(vec![1, 2, 3]); + /// assert!(prev.is_empty()); + /// + /// X.with_borrow(|v| assert_eq!(*v, vec![1, 2, 3])); + /// ``` + #[unstable(feature = "local_key_cell_methods", issue = "92122")] + pub fn replace(&'static self, value: T) -> T { + self.with(|cell| cell.replace(value)) + } +} + +mod lazy { + use crate::cell::UnsafeCell; + use crate::hint; + use crate::mem; + + pub struct LazyKeyInner<T> { + inner: UnsafeCell<Option<T>>, + } + + impl<T> LazyKeyInner<T> { + pub const fn new() -> LazyKeyInner<T> { + LazyKeyInner { inner: UnsafeCell::new(None) } + } + + pub unsafe fn get(&self) -> Option<&'static T> { + // SAFETY: The caller must ensure no reference is ever handed out to + // the inner cell nor mutable reference to the Option<T> inside said + // cell. This make it safe to hand a reference, though the lifetime + // of 'static is itself unsafe, making the get method unsafe. + unsafe { (*self.inner.get()).as_ref() } + } + + /// The caller must ensure that no reference is active: this method + /// needs unique access. + pub unsafe fn initialize<F: FnOnce() -> T>(&self, init: F) -> &'static T { + // Execute the initialization up front, *then* move it into our slot, + // just in case initialization fails. + let value = init(); + let ptr = self.inner.get(); + + // SAFETY: + // + // note that this can in theory just be `*ptr = Some(value)`, but due to + // the compiler will currently codegen that pattern with something like: + // + // ptr::drop_in_place(ptr) + // ptr::write(ptr, Some(value)) + // + // Due to this pattern it's possible for the destructor of the value in + // `ptr` (e.g., if this is being recursively initialized) to re-access + // TLS, in which case there will be a `&` and `&mut` pointer to the same + // value (an aliasing violation). To avoid setting the "I'm running a + // destructor" flag we just use `mem::replace` which should sequence the + // operations a little differently and make this safe to call. + // + // The precondition also ensures that we are the only one accessing + // `self` at the moment so replacing is fine. + unsafe { + let _ = mem::replace(&mut *ptr, Some(value)); + } + + // SAFETY: With the call to `mem::replace` it is guaranteed there is + // a `Some` behind `ptr`, not a `None` so `unreachable_unchecked` + // will never be reached. + unsafe { + // After storing `Some` we want to get a reference to the contents of + // what we just stored. While we could use `unwrap` here and it should + // always work it empirically doesn't seem to always get optimized away, + // which means that using something like `try_with` can pull in + // panicking code and cause a large size bloat. + match *ptr { + Some(ref x) => x, + None => hint::unreachable_unchecked(), + } + } + } + + /// The other methods hand out references while taking &self. + /// As such, callers of this method must ensure no `&` and `&mut` are + /// available and used at the same time. + #[allow(unused)] + pub unsafe fn take(&mut self) -> Option<T> { + // SAFETY: See doc comment for this method. + unsafe { (*self.inner.get()).take() } + } + } +} + +/// On some targets like wasm there's no threads, so no need to generate +/// thread locals and we can instead just use plain statics! +#[doc(hidden)] +#[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] +pub mod statik { + use super::lazy::LazyKeyInner; + use crate::fmt; + + pub struct Key<T> { + inner: LazyKeyInner<T>, + } + + unsafe impl<T> Sync for Key<T> {} + + impl<T> fmt::Debug for Key<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Key").finish_non_exhaustive() + } + } + + impl<T> Key<T> { + pub const fn new() -> Key<T> { + Key { inner: LazyKeyInner::new() } + } + + pub unsafe fn get(&self, init: impl FnOnce() -> T) -> Option<&'static T> { + // SAFETY: The caller must ensure no reference is ever handed out to + // the inner cell nor mutable reference to the Option<T> inside said + // cell. This make it safe to hand a reference, though the lifetime + // of 'static is itself unsafe, making the get method unsafe. + let value = unsafe { + match self.inner.get() { + Some(ref value) => value, + None => self.inner.initialize(init), + } + }; + + Some(value) + } + } +} + +#[doc(hidden)] +#[cfg(target_thread_local)] +pub mod fast { + use super::lazy::LazyKeyInner; + use crate::cell::Cell; + use crate::fmt; + use crate::mem; + use crate::sys::thread_local_dtor::register_dtor; + + #[derive(Copy, Clone)] + enum DtorState { + Unregistered, + Registered, + RunningOrHasRun, + } + + // This data structure has been carefully constructed so that the fast path + // only contains one branch on x86. That optimization is necessary to avoid + // duplicated tls lookups on OSX. + // + // LLVM issue: https://bugs.llvm.org/show_bug.cgi?id=41722 + pub struct Key<T> { + // If `LazyKeyInner::get` returns `None`, that indicates either: + // * The value has never been initialized + // * The value is being recursively initialized + // * The value has already been destroyed or is being destroyed + // To determine which kind of `None`, check `dtor_state`. + // + // This is very optimizer friendly for the fast path - initialized but + // not yet dropped. + inner: LazyKeyInner<T>, + + // Metadata to keep track of the state of the destructor. Remember that + // this variable is thread-local, not global. + dtor_state: Cell<DtorState>, + } + + impl<T> fmt::Debug for Key<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Key").finish_non_exhaustive() + } + } + + impl<T> Key<T> { + pub const fn new() -> Key<T> { + Key { inner: LazyKeyInner::new(), dtor_state: Cell::new(DtorState::Unregistered) } + } + + // note that this is just a publicly-callable function only for the + // const-initialized form of thread locals, basically a way to call the + // free `register_dtor` function defined elsewhere in libstd. + pub unsafe fn register_dtor(a: *mut u8, dtor: unsafe extern "C" fn(*mut u8)) { + unsafe { + register_dtor(a, dtor); + } + } + + pub unsafe fn get<F: FnOnce() -> T>(&self, init: F) -> Option<&'static T> { + // SAFETY: See the definitions of `LazyKeyInner::get` and + // `try_initialize` for more information. + // + // The caller must ensure no mutable references are ever active to + // the inner cell or the inner T when this is called. + // The `try_initialize` is dependant on the passed `init` function + // for this. + unsafe { + match self.inner.get() { + Some(val) => Some(val), + None => self.try_initialize(init), + } + } + } + + // `try_initialize` is only called once per fast thread local variable, + // except in corner cases where thread_local dtors reference other + // thread_local's, or it is being recursively initialized. + // + // Macos: Inlining this function can cause two `tlv_get_addr` calls to + // be performed for every call to `Key::get`. + // LLVM issue: https://bugs.llvm.org/show_bug.cgi?id=41722 + #[inline(never)] + unsafe fn try_initialize<F: FnOnce() -> T>(&self, init: F) -> Option<&'static T> { + // SAFETY: See comment above (this function doc). + if !mem::needs_drop::<T>() || unsafe { self.try_register_dtor() } { + // SAFETY: See comment above (this function doc). + Some(unsafe { self.inner.initialize(init) }) + } else { + None + } + } + + // `try_register_dtor` is only called once per fast thread local + // variable, except in corner cases where thread_local dtors reference + // other thread_local's, or it is being recursively initialized. + unsafe fn try_register_dtor(&self) -> bool { + match self.dtor_state.get() { + DtorState::Unregistered => { + // SAFETY: dtor registration happens before initialization. + // Passing `self` as a pointer while using `destroy_value<T>` + // is safe because the function will build a pointer to a + // Key<T>, which is the type of self and so find the correct + // size. + unsafe { register_dtor(self as *const _ as *mut u8, destroy_value::<T>) }; + self.dtor_state.set(DtorState::Registered); + true + } + DtorState::Registered => { + // recursively initialized + true + } + DtorState::RunningOrHasRun => false, + } + } + } + + unsafe extern "C" fn destroy_value<T>(ptr: *mut u8) { + let ptr = ptr as *mut Key<T>; + + // SAFETY: + // + // The pointer `ptr` has been built just above and comes from + // `try_register_dtor` where it is originally a Key<T> coming from `self`, + // making it non-NUL and of the correct type. + // + // Right before we run the user destructor be sure to set the + // `Option<T>` to `None`, and `dtor_state` to `RunningOrHasRun`. This + // causes future calls to `get` to run `try_initialize_drop` again, + // which will now fail, and return `None`. + unsafe { + let value = (*ptr).inner.take(); + (*ptr).dtor_state.set(DtorState::RunningOrHasRun); + drop(value); + } + } +} + +#[doc(hidden)] +pub mod os { + use super::lazy::LazyKeyInner; + use crate::cell::Cell; + use crate::fmt; + use crate::marker; + use crate::ptr; + use crate::sys_common::thread_local_key::StaticKey as OsStaticKey; + + pub struct Key<T> { + // OS-TLS key that we'll use to key off. + os: OsStaticKey, + marker: marker::PhantomData<Cell<T>>, + } + + impl<T> fmt::Debug for Key<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Key").finish_non_exhaustive() + } + } + + unsafe impl<T> Sync for Key<T> {} + + struct Value<T: 'static> { + inner: LazyKeyInner<T>, + key: &'static Key<T>, + } + + impl<T: 'static> Key<T> { + #[rustc_const_unstable(feature = "thread_local_internals", issue = "none")] + pub const fn new() -> Key<T> { + Key { os: OsStaticKey::new(Some(destroy_value::<T>)), marker: marker::PhantomData } + } + + /// It is a requirement for the caller to ensure that no mutable + /// reference is active when this method is called. + pub unsafe fn get(&'static self, init: impl FnOnce() -> T) -> Option<&'static T> { + // SAFETY: See the documentation for this method. + let ptr = unsafe { self.os.get() as *mut Value<T> }; + if ptr.addr() > 1 { + // SAFETY: the check ensured the pointer is safe (its destructor + // is not running) + it is coming from a trusted source (self). + if let Some(ref value) = unsafe { (*ptr).inner.get() } { + return Some(value); + } + } + // SAFETY: At this point we are sure we have no value and so + // initializing (or trying to) is safe. + unsafe { self.try_initialize(init) } + } + + // `try_initialize` is only called once per os thread local variable, + // except in corner cases where thread_local dtors reference other + // thread_local's, or it is being recursively initialized. + unsafe fn try_initialize(&'static self, init: impl FnOnce() -> T) -> Option<&'static T> { + // SAFETY: No mutable references are ever handed out meaning getting + // the value is ok. + let ptr = unsafe { self.os.get() as *mut Value<T> }; + if ptr.addr() == 1 { + // destructor is running + return None; + } + + let ptr = if ptr.is_null() { + // If the lookup returned null, we haven't initialized our own + // local copy, so do that now. + let ptr: Box<Value<T>> = box Value { inner: LazyKeyInner::new(), key: self }; + let ptr = Box::into_raw(ptr); + // SAFETY: At this point we are sure there is no value inside + // ptr so setting it will not affect anyone else. + unsafe { + self.os.set(ptr as *mut u8); + } + ptr + } else { + // recursive initialization + ptr + }; + + // SAFETY: ptr has been ensured as non-NUL just above an so can be + // dereferenced safely. + unsafe { Some((*ptr).inner.initialize(init)) } + } + } + + unsafe extern "C" fn destroy_value<T: 'static>(ptr: *mut u8) { + // SAFETY: + // + // The OS TLS ensures that this key contains a null value when this + // destructor starts to run. We set it back to a sentinel value of 1 to + // ensure that any future calls to `get` for this thread will return + // `None`. + // + // Note that to prevent an infinite loop we reset it back to null right + // before we return from the destructor ourselves. + unsafe { + let ptr = Box::from_raw(ptr as *mut Value<T>); + let key = ptr.key; + key.os.set(ptr::invalid_mut(1)); + drop(ptr); + key.os.set(ptr::null_mut()); + } + } +} diff --git a/library/std/src/thread/local/dynamic_tests.rs b/library/std/src/thread/local/dynamic_tests.rs new file mode 100644 index 000000000..dd1800416 --- /dev/null +++ b/library/std/src/thread/local/dynamic_tests.rs @@ -0,0 +1,40 @@ +use crate::cell::RefCell; +use crate::collections::HashMap; +use crate::thread_local; + +#[test] +fn smoke() { + fn square(i: i32) -> i32 { + i * i + } + thread_local!(static FOO: i32 = square(3)); + + FOO.with(|f| { + assert_eq!(*f, 9); + }); +} + +#[test] +fn hashmap() { + fn map() -> RefCell<HashMap<i32, i32>> { + let mut m = HashMap::new(); + m.insert(1, 2); + RefCell::new(m) + } + thread_local!(static FOO: RefCell<HashMap<i32, i32>> = map()); + + FOO.with(|map| { + assert_eq!(map.borrow()[&1], 2); + }); +} + +#[test] +fn refcell_vec() { + thread_local!(static FOO: RefCell<Vec<u32>> = RefCell::new(vec![1, 2, 3])); + + FOO.with(|vec| { + assert_eq!(vec.borrow().len(), 3); + vec.borrow_mut().push(4); + assert_eq!(vec.borrow()[3], 4); + }); +} diff --git a/library/std/src/thread/local/tests.rs b/library/std/src/thread/local/tests.rs new file mode 100644 index 000000000..1df1ca758 --- /dev/null +++ b/library/std/src/thread/local/tests.rs @@ -0,0 +1,317 @@ +use crate::cell::{Cell, UnsafeCell}; +use crate::sync::atomic::{AtomicU8, Ordering}; +use crate::sync::mpsc::{channel, Sender}; +use crate::thread::{self, LocalKey}; +use crate::thread_local; + +struct Foo(Sender<()>); + +impl Drop for Foo { + fn drop(&mut self) { + let Foo(ref s) = *self; + s.send(()).unwrap(); + } +} + +#[test] +fn smoke_no_dtor() { + thread_local!(static FOO: Cell<i32> = Cell::new(1)); + run(&FOO); + thread_local!(static FOO2: Cell<i32> = const { Cell::new(1) }); + run(&FOO2); + + fn run(key: &'static LocalKey<Cell<i32>>) { + key.with(|f| { + assert_eq!(f.get(), 1); + f.set(2); + }); + let t = thread::spawn(move || { + key.with(|f| { + assert_eq!(f.get(), 1); + }); + }); + t.join().unwrap(); + + key.with(|f| { + assert_eq!(f.get(), 2); + }); + } +} + +#[test] +fn states() { + struct Foo(&'static LocalKey<Foo>); + impl Drop for Foo { + fn drop(&mut self) { + assert!(self.0.try_with(|_| ()).is_err()); + } + } + + thread_local!(static FOO: Foo = Foo(&FOO)); + run(&FOO); + thread_local!(static FOO2: Foo = const { Foo(&FOO2) }); + run(&FOO2); + + fn run(foo: &'static LocalKey<Foo>) { + thread::spawn(move || { + assert!(foo.try_with(|_| ()).is_ok()); + }) + .join() + .unwrap(); + } +} + +#[test] +fn smoke_dtor() { + thread_local!(static FOO: UnsafeCell<Option<Foo>> = UnsafeCell::new(None)); + run(&FOO); + thread_local!(static FOO2: UnsafeCell<Option<Foo>> = const { UnsafeCell::new(None) }); + run(&FOO2); + + fn run(key: &'static LocalKey<UnsafeCell<Option<Foo>>>) { + let (tx, rx) = channel(); + let t = thread::spawn(move || unsafe { + let mut tx = Some(tx); + key.with(|f| { + *f.get() = Some(Foo(tx.take().unwrap())); + }); + }); + rx.recv().unwrap(); + t.join().unwrap(); + } +} + +#[test] +fn circular() { + struct S1(&'static LocalKey<UnsafeCell<Option<S1>>>, &'static LocalKey<UnsafeCell<Option<S2>>>); + struct S2(&'static LocalKey<UnsafeCell<Option<S1>>>, &'static LocalKey<UnsafeCell<Option<S2>>>); + thread_local!(static K1: UnsafeCell<Option<S1>> = UnsafeCell::new(None)); + thread_local!(static K2: UnsafeCell<Option<S2>> = UnsafeCell::new(None)); + thread_local!(static K3: UnsafeCell<Option<S1>> = const { UnsafeCell::new(None) }); + thread_local!(static K4: UnsafeCell<Option<S2>> = const { UnsafeCell::new(None) }); + static mut HITS: usize = 0; + + impl Drop for S1 { + fn drop(&mut self) { + unsafe { + HITS += 1; + if self.1.try_with(|_| ()).is_err() { + assert_eq!(HITS, 3); + } else { + if HITS == 1 { + self.1.with(|s| *s.get() = Some(S2(self.0, self.1))); + } else { + assert_eq!(HITS, 3); + } + } + } + } + } + impl Drop for S2 { + fn drop(&mut self) { + unsafe { + HITS += 1; + assert!(self.0.try_with(|_| ()).is_ok()); + assert_eq!(HITS, 2); + self.0.with(|s| *s.get() = Some(S1(self.0, self.1))); + } + } + } + + thread::spawn(move || { + drop(S1(&K1, &K2)); + }) + .join() + .unwrap(); + + unsafe { + HITS = 0; + } + + thread::spawn(move || { + drop(S1(&K3, &K4)); + }) + .join() + .unwrap(); +} + +#[test] +fn self_referential() { + struct S1(&'static LocalKey<UnsafeCell<Option<S1>>>); + + thread_local!(static K1: UnsafeCell<Option<S1>> = UnsafeCell::new(None)); + thread_local!(static K2: UnsafeCell<Option<S1>> = const { UnsafeCell::new(None) }); + + impl Drop for S1 { + fn drop(&mut self) { + assert!(self.0.try_with(|_| ()).is_err()); + } + } + + thread::spawn(move || unsafe { + K1.with(|s| *s.get() = Some(S1(&K1))); + }) + .join() + .unwrap(); + + thread::spawn(move || unsafe { + K2.with(|s| *s.get() = Some(S1(&K2))); + }) + .join() + .unwrap(); +} + +// Note that this test will deadlock if TLS destructors aren't run (this +// requires the destructor to be run to pass the test). +#[test] +fn dtors_in_dtors_in_dtors() { + struct S1(Sender<()>); + thread_local!(static K1: UnsafeCell<Option<S1>> = UnsafeCell::new(None)); + thread_local!(static K2: UnsafeCell<Option<Foo>> = UnsafeCell::new(None)); + + impl Drop for S1 { + fn drop(&mut self) { + let S1(ref tx) = *self; + unsafe { + let _ = K2.try_with(|s| *s.get() = Some(Foo(tx.clone()))); + } + } + } + + let (tx, rx) = channel(); + let _t = thread::spawn(move || unsafe { + let mut tx = Some(tx); + K1.with(|s| *s.get() = Some(S1(tx.take().unwrap()))); + }); + rx.recv().unwrap(); +} + +#[test] +fn dtors_in_dtors_in_dtors_const_init() { + struct S1(Sender<()>); + thread_local!(static K1: UnsafeCell<Option<S1>> = const { UnsafeCell::new(None) }); + thread_local!(static K2: UnsafeCell<Option<Foo>> = const { UnsafeCell::new(None) }); + + impl Drop for S1 { + fn drop(&mut self) { + let S1(ref tx) = *self; + unsafe { + let _ = K2.try_with(|s| *s.get() = Some(Foo(tx.clone()))); + } + } + } + + let (tx, rx) = channel(); + let _t = thread::spawn(move || unsafe { + let mut tx = Some(tx); + K1.with(|s| *s.get() = Some(S1(tx.take().unwrap()))); + }); + rx.recv().unwrap(); +} + +// This test tests that TLS destructors have run before the thread joins. The +// test has no false positives (meaning: if the test fails, there's actually +// an ordering problem). It may have false negatives, where the test passes but +// join is not guaranteed to be after the TLS destructors. However, false +// negatives should be exceedingly rare due to judicious use of +// thread::yield_now and running the test several times. +#[test] +fn join_orders_after_tls_destructors() { + // We emulate a synchronous MPSC rendezvous channel using only atomics and + // thread::yield_now. We can't use std::mpsc as the implementation itself + // may rely on thread locals. + // + // The basic state machine for an SPSC rendezvous channel is: + // FRESH -> THREAD1_WAITING -> MAIN_THREAD_RENDEZVOUS + // where the first transition is done by the “receiving” thread and the 2nd + // transition is done by the “sending” thread. + // + // We add an additional state `THREAD2_LAUNCHED` between `FRESH` and + // `THREAD1_WAITING` to block until all threads are actually running. + // + // A thread that joins on the “receiving” thread completion should never + // observe the channel in the `THREAD1_WAITING` state. If this does occur, + // we switch to the “poison” state `THREAD2_JOINED` and panic all around. + // (This is equivalent to “sending” from an alternate producer thread.) + const FRESH: u8 = 0; + const THREAD2_LAUNCHED: u8 = 1; + const THREAD1_WAITING: u8 = 2; + const MAIN_THREAD_RENDEZVOUS: u8 = 3; + const THREAD2_JOINED: u8 = 4; + static SYNC_STATE: AtomicU8 = AtomicU8::new(FRESH); + + for _ in 0..10 { + SYNC_STATE.store(FRESH, Ordering::SeqCst); + + let jh = thread::Builder::new() + .name("thread1".into()) + .spawn(move || { + struct TlDrop; + + impl Drop for TlDrop { + fn drop(&mut self) { + let mut sync_state = SYNC_STATE.swap(THREAD1_WAITING, Ordering::SeqCst); + loop { + match sync_state { + THREAD2_LAUNCHED | THREAD1_WAITING => thread::yield_now(), + MAIN_THREAD_RENDEZVOUS => break, + THREAD2_JOINED => panic!( + "Thread 1 still running after thread 2 joined on thread 1" + ), + v => unreachable!("sync state: {}", v), + } + sync_state = SYNC_STATE.load(Ordering::SeqCst); + } + } + } + + thread_local! { + static TL_DROP: TlDrop = TlDrop; + } + + TL_DROP.with(|_| {}); + + loop { + match SYNC_STATE.load(Ordering::SeqCst) { + FRESH => thread::yield_now(), + THREAD2_LAUNCHED => break, + v => unreachable!("sync state: {}", v), + } + } + }) + .unwrap(); + + let jh2 = thread::Builder::new() + .name("thread2".into()) + .spawn(move || { + assert_eq!(SYNC_STATE.swap(THREAD2_LAUNCHED, Ordering::SeqCst), FRESH); + jh.join().unwrap(); + match SYNC_STATE.swap(THREAD2_JOINED, Ordering::SeqCst) { + MAIN_THREAD_RENDEZVOUS => return, + THREAD2_LAUNCHED | THREAD1_WAITING => { + panic!("Thread 2 running after thread 1 join before main thread rendezvous") + } + v => unreachable!("sync state: {:?}", v), + } + }) + .unwrap(); + + loop { + match SYNC_STATE.compare_exchange( + THREAD1_WAITING, + MAIN_THREAD_RENDEZVOUS, + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => break, + Err(FRESH) => thread::yield_now(), + Err(THREAD2_LAUNCHED) => thread::yield_now(), + Err(THREAD2_JOINED) => { + panic!("Main thread rendezvous after thread 2 joined thread 1") + } + v => unreachable!("sync state: {:?}", v), + } + } + jh2.join().unwrap(); + } +} diff --git a/library/std/src/thread/mod.rs b/library/std/src/thread/mod.rs new file mode 100644 index 000000000..44c8a50fd --- /dev/null +++ b/library/std/src/thread/mod.rs @@ -0,0 +1,1621 @@ +//! Native threads. +//! +//! ## The threading model +//! +//! An executing Rust program consists of a collection of native OS threads, +//! each with their own stack and local state. Threads can be named, and +//! provide some built-in support for low-level synchronization. +//! +//! Communication between threads can be done through +//! [channels], Rust's message-passing types, along with [other forms of thread +//! synchronization](../../std/sync/index.html) and shared-memory data +//! structures. In particular, types that are guaranteed to be +//! threadsafe are easily shared between threads using the +//! atomically-reference-counted container, [`Arc`]. +//! +//! Fatal logic errors in Rust cause *thread panic*, during which +//! a thread will unwind the stack, running destructors and freeing +//! owned resources. While not meant as a 'try/catch' mechanism, panics +//! in Rust can nonetheless be caught (unless compiling with `panic=abort`) with +//! [`catch_unwind`](../../std/panic/fn.catch_unwind.html) and recovered +//! from, or alternatively be resumed with +//! [`resume_unwind`](../../std/panic/fn.resume_unwind.html). If the panic +//! is not caught the thread will exit, but the panic may optionally be +//! detected from a different thread with [`join`]. If the main thread panics +//! without the panic being caught, the application will exit with a +//! non-zero exit code. +//! +//! When the main thread of a Rust program terminates, the entire program shuts +//! down, even if other threads are still running. However, this module provides +//! convenient facilities for automatically waiting for the termination of a +//! thread (i.e., join). +//! +//! ## Spawning a thread +//! +//! A new thread can be spawned using the [`thread::spawn`][`spawn`] function: +//! +//! ```rust +//! use std::thread; +//! +//! thread::spawn(move || { +//! // some work here +//! }); +//! ``` +//! +//! In this example, the spawned thread is "detached," which means that there is +//! no way for the program to learn when the spawned thread completes or otherwise +//! terminates. +//! +//! To learn when a thread completes, it is necessary to capture the [`JoinHandle`] +//! object that is returned by the call to [`spawn`], which provides +//! a `join` method that allows the caller to wait for the completion of the +//! spawned thread: +//! +//! ```rust +//! use std::thread; +//! +//! let thread_join_handle = thread::spawn(move || { +//! // some work here +//! }); +//! // some work here +//! let res = thread_join_handle.join(); +//! ``` +//! +//! The [`join`] method returns a [`thread::Result`] containing [`Ok`] of the final +//! value produced by the spawned thread, or [`Err`] of the value given to +//! a call to [`panic!`] if the thread panicked. +//! +//! Note that there is no parent/child relationship between a thread that spawns a +//! new thread and the thread being spawned. In particular, the spawned thread may or +//! may not outlive the spawning thread, unless the spawning thread is the main thread. +//! +//! ## Configuring threads +//! +//! A new thread can be configured before it is spawned via the [`Builder`] type, +//! which currently allows you to set the name and stack size for the thread: +//! +//! ```rust +//! # #![allow(unused_must_use)] +//! use std::thread; +//! +//! thread::Builder::new().name("thread1".to_string()).spawn(move || { +//! println!("Hello, world!"); +//! }); +//! ``` +//! +//! ## The `Thread` type +//! +//! Threads are represented via the [`Thread`] type, which you can get in one of +//! two ways: +//! +//! * By spawning a new thread, e.g., using the [`thread::spawn`][`spawn`] +//! function, and calling [`thread`][`JoinHandle::thread`] on the [`JoinHandle`]. +//! * By requesting the current thread, using the [`thread::current`] function. +//! +//! The [`thread::current`] function is available even for threads not spawned +//! by the APIs of this module. +//! +//! ## Thread-local storage +//! +//! This module also provides an implementation of thread-local storage for Rust +//! programs. Thread-local storage is a method of storing data into a global +//! variable that each thread in the program will have its own copy of. +//! Threads do not share this data, so accesses do not need to be synchronized. +//! +//! A thread-local key owns the value it contains and will destroy the value when the +//! thread exits. It is created with the [`thread_local!`] macro and can contain any +//! value that is `'static` (no borrowed pointers). It provides an accessor function, +//! [`with`], that yields a shared reference to the value to the specified +//! closure. Thread-local keys allow only shared access to values, as there would be no +//! way to guarantee uniqueness if mutable borrows were allowed. Most values +//! will want to make use of some form of **interior mutability** through the +//! [`Cell`] or [`RefCell`] types. +//! +//! ## Naming threads +//! +//! Threads are able to have associated names for identification purposes. By default, spawned +//! threads are unnamed. To specify a name for a thread, build the thread with [`Builder`] and pass +//! the desired thread name to [`Builder::name`]. To retrieve the thread name from within the +//! thread, use [`Thread::name`]. A couple examples of where the name of a thread gets used: +//! +//! * If a panic occurs in a named thread, the thread name will be printed in the panic message. +//! * The thread name is provided to the OS where applicable (e.g., `pthread_setname_np` in +//! unix-like platforms). +//! +//! ## Stack size +//! +//! The default stack size for spawned threads is 2 MiB, though this particular stack size is +//! subject to change in the future. There are two ways to manually specify the stack size for +//! spawned threads: +//! +//! * Build the thread with [`Builder`] and pass the desired stack size to [`Builder::stack_size`]. +//! * Set the `RUST_MIN_STACK` environment variable to an integer representing the desired stack +//! size (in bytes). Note that setting [`Builder::stack_size`] will override this. +//! +//! Note that the stack size of the main thread is *not* determined by Rust. +//! +//! [channels]: crate::sync::mpsc +//! [`join`]: JoinHandle::join +//! [`Result`]: crate::result::Result +//! [`Ok`]: crate::result::Result::Ok +//! [`Err`]: crate::result::Result::Err +//! [`thread::current`]: current +//! [`thread::Result`]: Result +//! [`unpark`]: Thread::unpark +//! [`thread::park_timeout`]: park_timeout +//! [`Cell`]: crate::cell::Cell +//! [`RefCell`]: crate::cell::RefCell +//! [`with`]: LocalKey::with +//! [`thread_local!`]: crate::thread_local + +#![stable(feature = "rust1", since = "1.0.0")] +#![deny(unsafe_op_in_unsafe_fn)] + +#[cfg(all(test, not(target_os = "emscripten")))] +mod tests; + +use crate::any::Any; +use crate::cell::UnsafeCell; +use crate::ffi::{CStr, CString}; +use crate::fmt; +use crate::io; +use crate::marker::PhantomData; +use crate::mem; +use crate::num::NonZeroU64; +use crate::num::NonZeroUsize; +use crate::panic; +use crate::panicking; +use crate::pin::Pin; +use crate::ptr::addr_of_mut; +use crate::str; +use crate::sync::Arc; +use crate::sys::thread as imp; +use crate::sys_common::mutex; +use crate::sys_common::thread; +use crate::sys_common::thread_info; +use crate::sys_common::thread_parker::Parker; +use crate::sys_common::{AsInner, IntoInner}; +use crate::time::Duration; + +//////////////////////////////////////////////////////////////////////////////// +// Thread-local storage +//////////////////////////////////////////////////////////////////////////////// + +#[macro_use] +mod local; + +#[stable(feature = "scoped_threads", since = "1.63.0")] +mod scoped; + +#[stable(feature = "scoped_threads", since = "1.63.0")] +pub use scoped::{scope, Scope, ScopedJoinHandle}; + +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::local::{AccessError, LocalKey}; + +// The types used by the thread_local! macro to access TLS keys. Note that there +// are two types, the "OS" type and the "fast" type. The OS thread local key +// type is accessed via platform-specific API calls and is slow, while the fast +// key type is accessed via code generated via LLVM, where TLS keys are set up +// by the elf linker. Note that the OS TLS type is always available: on macOS +// the standard library is compiled with support for older platform versions +// where fast TLS was not available; end-user code is compiled with fast TLS +// where available, but both are needed. + +#[unstable(feature = "libstd_thread_internals", issue = "none")] +#[cfg(target_thread_local)] +#[doc(hidden)] +pub use self::local::fast::Key as __FastLocalKeyInner; +#[unstable(feature = "libstd_thread_internals", issue = "none")] +#[doc(hidden)] +pub use self::local::os::Key as __OsLocalKeyInner; +#[unstable(feature = "libstd_thread_internals", issue = "none")] +#[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] +#[doc(hidden)] +pub use self::local::statik::Key as __StaticLocalKeyInner; + +//////////////////////////////////////////////////////////////////////////////// +// Builder +//////////////////////////////////////////////////////////////////////////////// + +/// Thread factory, which can be used in order to configure the properties of +/// a new thread. +/// +/// Methods can be chained on it in order to configure it. +/// +/// The two configurations available are: +/// +/// - [`name`]: specifies an [associated name for the thread][naming-threads] +/// - [`stack_size`]: specifies the [desired stack size for the thread][stack-size] +/// +/// The [`spawn`] method will take ownership of the builder and create an +/// [`io::Result`] to the thread handle with the given configuration. +/// +/// The [`thread::spawn`] free function uses a `Builder` with default +/// configuration and [`unwrap`]s its return value. +/// +/// You may want to use [`spawn`] instead of [`thread::spawn`], when you want +/// to recover from a failure to launch a thread, indeed the free function will +/// panic where the `Builder` method will return a [`io::Result`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// +/// let builder = thread::Builder::new(); +/// +/// let handler = builder.spawn(|| { +/// // thread code +/// }).unwrap(); +/// +/// handler.join().unwrap(); +/// ``` +/// +/// [`stack_size`]: Builder::stack_size +/// [`name`]: Builder::name +/// [`spawn`]: Builder::spawn +/// [`thread::spawn`]: spawn +/// [`io::Result`]: crate::io::Result +/// [`unwrap`]: crate::result::Result::unwrap +/// [naming-threads]: ./index.html#naming-threads +/// [stack-size]: ./index.html#stack-size +#[must_use = "must eventually spawn the thread"] +#[stable(feature = "rust1", since = "1.0.0")] +#[derive(Debug)] +pub struct Builder { + // A name for the thread-to-be, for identification in panic messages + name: Option<String>, + // The size of the stack for the spawned thread in bytes + stack_size: Option<usize>, +} + +impl Builder { + /// Generates the base configuration for spawning a thread, from which + /// configuration methods can be chained. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new() + /// .name("foo".into()) + /// .stack_size(32 * 1024); + /// + /// let handler = builder.spawn(|| { + /// // thread code + /// }).unwrap(); + /// + /// handler.join().unwrap(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new() -> Builder { + Builder { name: None, stack_size: None } + } + + /// Names the thread-to-be. Currently the name is used for identification + /// only in panic messages. + /// + /// The name must not contain null bytes (`\0`). + /// + /// For more information about named threads, see + /// [this module-level documentation][naming-threads]. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new() + /// .name("foo".into()); + /// + /// let handler = builder.spawn(|| { + /// assert_eq!(thread::current().name(), Some("foo")) + /// }).unwrap(); + /// + /// handler.join().unwrap(); + /// ``` + /// + /// [naming-threads]: ./index.html#naming-threads + #[stable(feature = "rust1", since = "1.0.0")] + pub fn name(mut self, name: String) -> Builder { + self.name = Some(name); + self + } + + /// Sets the size of the stack (in bytes) for the new thread. + /// + /// The actual stack size may be greater than this value if + /// the platform specifies a minimal stack size. + /// + /// For more information about the stack size for threads, see + /// [this module-level documentation][stack-size]. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new().stack_size(32 * 1024); + /// ``` + /// + /// [stack-size]: ./index.html#stack-size + #[stable(feature = "rust1", since = "1.0.0")] + pub fn stack_size(mut self, size: usize) -> Builder { + self.stack_size = Some(size); + self + } + + /// Spawns a new thread by taking ownership of the `Builder`, and returns an + /// [`io::Result`] to its [`JoinHandle`]. + /// + /// The spawned thread may outlive the caller (unless the caller thread + /// is the main thread; the whole process is terminated when the main + /// thread finishes). The join handle can be used to block on + /// termination of the spawned thread, including recovering its panics. + /// + /// For a more complete documentation see [`thread::spawn`][`spawn`]. + /// + /// # Errors + /// + /// Unlike the [`spawn`] free function, this method yields an + /// [`io::Result`] to capture any failure to create the thread at + /// the OS level. + /// + /// [`io::Result`]: crate::io::Result + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new(); + /// + /// let handler = builder.spawn(|| { + /// // thread code + /// }).unwrap(); + /// + /// handler.join().unwrap(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn spawn<F, T>(self, f: F) -> io::Result<JoinHandle<T>> + where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, + { + unsafe { self.spawn_unchecked(f) } + } + + /// Spawns a new thread without any lifetime restrictions by taking ownership + /// of the `Builder`, and returns an [`io::Result`] to its [`JoinHandle`]. + /// + /// The spawned thread may outlive the caller (unless the caller thread + /// is the main thread; the whole process is terminated when the main + /// thread finishes). The join handle can be used to block on + /// termination of the spawned thread, including recovering its panics. + /// + /// This method is identical to [`thread::Builder::spawn`][`Builder::spawn`], + /// except for the relaxed lifetime bounds, which render it unsafe. + /// For a more complete documentation see [`thread::spawn`][`spawn`]. + /// + /// # Errors + /// + /// Unlike the [`spawn`] free function, this method yields an + /// [`io::Result`] to capture any failure to create the thread at + /// the OS level. + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Safety + /// + /// The caller has to ensure that the spawned thread does not outlive any + /// references in the supplied thread closure and its return type. + /// This can be guaranteed in two ways: + /// + /// - ensure that [`join`][`JoinHandle::join`] is called before any referenced + /// data is dropped + /// - use only types with `'static` lifetime bounds, i.e., those with no or only + /// `'static` references (both [`thread::Builder::spawn`][`Builder::spawn`] + /// and [`thread::spawn`][`spawn`] enforce this property statically) + /// + /// # Examples + /// + /// ``` + /// #![feature(thread_spawn_unchecked)] + /// use std::thread; + /// + /// let builder = thread::Builder::new(); + /// + /// let x = 1; + /// let thread_x = &x; + /// + /// let handler = unsafe { + /// builder.spawn_unchecked(move || { + /// println!("x = {}", *thread_x); + /// }).unwrap() + /// }; + /// + /// // caller has to ensure `join()` is called, otherwise + /// // it is possible to access freed memory if `x` gets + /// // dropped before the thread closure is executed! + /// handler.join().unwrap(); + /// ``` + /// + /// [`io::Result`]: crate::io::Result + #[unstable(feature = "thread_spawn_unchecked", issue = "55132")] + pub unsafe fn spawn_unchecked<'a, F, T>(self, f: F) -> io::Result<JoinHandle<T>> + where + F: FnOnce() -> T, + F: Send + 'a, + T: Send + 'a, + { + Ok(JoinHandle(unsafe { self.spawn_unchecked_(f, None) }?)) + } + + unsafe fn spawn_unchecked_<'a, 'scope, F, T>( + self, + f: F, + scope_data: Option<Arc<scoped::ScopeData>>, + ) -> io::Result<JoinInner<'scope, T>> + where + F: FnOnce() -> T, + F: Send + 'a, + T: Send + 'a, + 'scope: 'a, + { + let Builder { name, stack_size } = self; + + let stack_size = stack_size.unwrap_or_else(thread::min_stack); + + let my_thread = Thread::new(name.map(|name| { + CString::new(name).expect("thread name may not contain interior null bytes") + })); + let their_thread = my_thread.clone(); + + let my_packet: Arc<Packet<'scope, T>> = Arc::new(Packet { + scope: scope_data, + result: UnsafeCell::new(None), + _marker: PhantomData, + }); + let their_packet = my_packet.clone(); + + let output_capture = crate::io::set_output_capture(None); + crate::io::set_output_capture(output_capture.clone()); + + let main = move || { + if let Some(name) = their_thread.cname() { + imp::Thread::set_name(name); + } + + crate::io::set_output_capture(output_capture); + + // SAFETY: the stack guard passed is the one for the current thread. + // This means the current thread's stack and the new thread's stack + // are properly set and protected from each other. + thread_info::set(unsafe { imp::guard::current() }, their_thread); + let try_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { + crate::sys_common::backtrace::__rust_begin_short_backtrace(f) + })); + // SAFETY: `their_packet` as been built just above and moved by the + // closure (it is an Arc<...>) and `my_packet` will be stored in the + // same `JoinInner` as this closure meaning the mutation will be + // safe (not modify it and affect a value far away). + unsafe { *their_packet.result.get() = Some(try_result) }; + }; + + if let Some(scope_data) = &my_packet.scope { + scope_data.increment_num_running_threads(); + } + + Ok(JoinInner { + // SAFETY: + // + // `imp::Thread::new` takes a closure with a `'static` lifetime, since it's passed + // through FFI or otherwise used with low-level threading primitives that have no + // notion of or way to enforce lifetimes. + // + // As mentioned in the `Safety` section of this function's documentation, the caller of + // this function needs to guarantee that the passed-in lifetime is sufficiently long + // for the lifetime of the thread. + // + // Similarly, the `sys` implementation must guarantee that no references to the closure + // exist after the thread has terminated, which is signaled by `Thread::join` + // returning. + native: unsafe { + imp::Thread::new( + stack_size, + mem::transmute::<Box<dyn FnOnce() + 'a>, Box<dyn FnOnce() + 'static>>( + Box::new(main), + ), + )? + }, + thread: my_thread, + packet: my_packet, + }) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Free functions +//////////////////////////////////////////////////////////////////////////////// + +/// Spawns a new thread, returning a [`JoinHandle`] for it. +/// +/// The join handle provides a [`join`] method that can be used to join the spawned +/// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing +/// the argument given to [`panic!`]. +/// +/// If the join handle is dropped, the spawned thread will implicitly be *detached*. +/// In this case, the spawned thread may no longer be joined. +/// (It is the responsibility of the program to either eventually join threads it +/// creates or detach them; otherwise, a resource leak will result.) +/// +/// This call will create a thread using default parameters of [`Builder`], if you +/// want to specify the stack size or the name of the thread, use this API +/// instead. +/// +/// As you can see in the signature of `spawn` there are two constraints on +/// both the closure given to `spawn` and its return value, let's explain them: +/// +/// - The `'static` constraint means that the closure and its return value +/// must have a lifetime of the whole program execution. The reason for this +/// is that threads can outlive the lifetime they have been created in. +/// +/// Indeed if the thread, and by extension its return value, can outlive their +/// caller, we need to make sure that they will be valid afterwards, and since +/// we *can't* know when it will return we need to have them valid as long as +/// possible, that is until the end of the program, hence the `'static` +/// lifetime. +/// - The [`Send`] constraint is because the closure will need to be passed +/// *by value* from the thread where it is spawned to the new thread. Its +/// return value will need to be passed from the new thread to the thread +/// where it is `join`ed. +/// As a reminder, the [`Send`] marker trait expresses that it is safe to be +/// passed from thread to thread. [`Sync`] expresses that it is safe to have a +/// reference be passed from thread to thread. +/// +/// # Panics +/// +/// Panics if the OS fails to create a thread; use [`Builder::spawn`] +/// to recover from such errors. +/// +/// # Examples +/// +/// Creating a thread. +/// +/// ``` +/// use std::thread; +/// +/// let handler = thread::spawn(|| { +/// // thread code +/// }); +/// +/// handler.join().unwrap(); +/// ``` +/// +/// As mentioned in the module documentation, threads are usually made to +/// communicate using [`channels`], here is how it usually looks. +/// +/// This example also shows how to use `move`, in order to give ownership +/// of values to a thread. +/// +/// ``` +/// use std::thread; +/// use std::sync::mpsc::channel; +/// +/// let (tx, rx) = channel(); +/// +/// let sender = thread::spawn(move || { +/// tx.send("Hello, thread".to_owned()) +/// .expect("Unable to send on channel"); +/// }); +/// +/// let receiver = thread::spawn(move || { +/// let value = rx.recv().expect("Unable to receive from channel"); +/// println!("{value}"); +/// }); +/// +/// sender.join().expect("The sender thread has panicked"); +/// receiver.join().expect("The receiver thread has panicked"); +/// ``` +/// +/// A thread can also return a value through its [`JoinHandle`], you can use +/// this to make asynchronous computations (futures might be more appropriate +/// though). +/// +/// ``` +/// use std::thread; +/// +/// let computation = thread::spawn(|| { +/// // Some expensive computation. +/// 42 +/// }); +/// +/// let result = computation.join().unwrap(); +/// println!("{result}"); +/// ``` +/// +/// [`channels`]: crate::sync::mpsc +/// [`join`]: JoinHandle::join +/// [`Err`]: crate::result::Result::Err +#[stable(feature = "rust1", since = "1.0.0")] +pub fn spawn<F, T>(f: F) -> JoinHandle<T> +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + Builder::new().spawn(f).expect("failed to spawn thread") +} + +/// Gets a handle to the thread that invokes it. +/// +/// # Examples +/// +/// Getting a handle to the current thread with `thread::current()`: +/// +/// ``` +/// use std::thread; +/// +/// let handler = thread::Builder::new() +/// .name("named thread".into()) +/// .spawn(|| { +/// let handle = thread::current(); +/// assert_eq!(handle.name(), Some("named thread")); +/// }) +/// .unwrap(); +/// +/// handler.join().unwrap(); +/// ``` +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn current() -> Thread { + thread_info::current_thread().expect( + "use of std::thread::current() is not possible \ + after the thread's local data has been destroyed", + ) +} + +/// Cooperatively gives up a timeslice to the OS scheduler. +/// +/// This calls the underlying OS scheduler's yield primitive, signaling +/// that the calling thread is willing to give up its remaining timeslice +/// so that the OS may schedule other threads on the CPU. +/// +/// A drawback of yielding in a loop is that if the OS does not have any +/// other ready threads to run on the current CPU, the thread will effectively +/// busy-wait, which wastes CPU time and energy. +/// +/// Therefore, when waiting for events of interest, a programmer's first +/// choice should be to use synchronization devices such as [`channel`]s, +/// [`Condvar`]s, [`Mutex`]es or [`join`] since these primitives are +/// implemented in a blocking manner, giving up the CPU until the event +/// of interest has occurred which avoids repeated yielding. +/// +/// `yield_now` should thus be used only rarely, mostly in situations where +/// repeated polling is required because there is no other suitable way to +/// learn when an event of interest has occurred. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// +/// thread::yield_now(); +/// ``` +/// +/// [`channel`]: crate::sync::mpsc +/// [`join`]: JoinHandle::join +/// [`Condvar`]: crate::sync::Condvar +/// [`Mutex`]: crate::sync::Mutex +#[stable(feature = "rust1", since = "1.0.0")] +pub fn yield_now() { + imp::Thread::yield_now() +} + +/// Determines whether the current thread is unwinding because of panic. +/// +/// A common use of this feature is to poison shared resources when writing +/// unsafe code, by checking `panicking` when the `drop` is called. +/// +/// This is usually not needed when writing safe code, as [`Mutex`es][Mutex] +/// already poison themselves when a thread panics while holding the lock. +/// +/// This can also be used in multithreaded applications, in order to send a +/// message to other threads warning that a thread has panicked (e.g., for +/// monitoring purposes). +/// +/// # Examples +/// +/// ```should_panic +/// use std::thread; +/// +/// struct SomeStruct; +/// +/// impl Drop for SomeStruct { +/// fn drop(&mut self) { +/// if thread::panicking() { +/// println!("dropped while unwinding"); +/// } else { +/// println!("dropped while not unwinding"); +/// } +/// } +/// } +/// +/// { +/// print!("a: "); +/// let a = SomeStruct; +/// } +/// +/// { +/// print!("b: "); +/// let b = SomeStruct; +/// panic!() +/// } +/// ``` +/// +/// [Mutex]: crate::sync::Mutex +#[inline] +#[must_use] +#[stable(feature = "rust1", since = "1.0.0")] +pub fn panicking() -> bool { + panicking::panicking() +} + +/// Puts the current thread to sleep for at least the specified amount of time. +/// +/// The thread may sleep longer than the duration specified due to scheduling +/// specifics or platform-dependent functionality. It will never sleep less. +/// +/// This function is blocking, and should not be used in `async` functions. +/// +/// # Platform-specific behavior +/// +/// On Unix platforms, the underlying syscall may be interrupted by a +/// spurious wakeup or signal handler. To ensure the sleep occurs for at least +/// the specified duration, this function may invoke that system call multiple +/// times. +/// +/// # Examples +/// +/// ```no_run +/// use std::thread; +/// +/// // Let's sleep for 2 seconds: +/// thread::sleep_ms(2000); +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +#[deprecated(since = "1.6.0", note = "replaced by `std::thread::sleep`")] +pub fn sleep_ms(ms: u32) { + sleep(Duration::from_millis(ms as u64)) +} + +/// Puts the current thread to sleep for at least the specified amount of time. +/// +/// The thread may sleep longer than the duration specified due to scheduling +/// specifics or platform-dependent functionality. It will never sleep less. +/// +/// This function is blocking, and should not be used in `async` functions. +/// +/// # Platform-specific behavior +/// +/// On Unix platforms, the underlying syscall may be interrupted by a +/// spurious wakeup or signal handler. To ensure the sleep occurs for at least +/// the specified duration, this function may invoke that system call multiple +/// times. +/// Platforms which do not support nanosecond precision for sleeping will +/// have `dur` rounded up to the nearest granularity of time they can sleep for. +/// +/// Currently, specifying a zero duration on Unix platforms returns immediately +/// without invoking the underlying [`nanosleep`] syscall, whereas on Windows +/// platforms the underlying [`Sleep`] syscall is always invoked. +/// If the intention is to yield the current time-slice you may want to use +/// [`yield_now`] instead. +/// +/// [`nanosleep`]: https://linux.die.net/man/2/nanosleep +/// [`Sleep`]: https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-sleep +/// +/// # Examples +/// +/// ```no_run +/// use std::{thread, time}; +/// +/// let ten_millis = time::Duration::from_millis(10); +/// let now = time::Instant::now(); +/// +/// thread::sleep(ten_millis); +/// +/// assert!(now.elapsed() >= ten_millis); +/// ``` +#[stable(feature = "thread_sleep", since = "1.4.0")] +pub fn sleep(dur: Duration) { + imp::Thread::sleep(dur) +} + +/// Blocks unless or until the current thread's token is made available. +/// +/// A call to `park` does not guarantee that the thread will remain parked +/// forever, and callers should be prepared for this possibility. +/// +/// # park and unpark +/// +/// Every thread is equipped with some basic low-level blocking support, via the +/// [`thread::park`][`park`] function and [`thread::Thread::unpark`][`unpark`] +/// method. [`park`] blocks the current thread, which can then be resumed from +/// another thread by calling the [`unpark`] method on the blocked thread's +/// handle. +/// +/// Conceptually, each [`Thread`] handle has an associated token, which is +/// initially not present: +/// +/// * The [`thread::park`][`park`] function blocks the current thread unless or +/// until the token is available for its thread handle, at which point it +/// atomically consumes the token. It may also return *spuriously*, without +/// consuming the token. [`thread::park_timeout`] does the same, but allows +/// specifying a maximum time to block the thread for. +/// +/// * The [`unpark`] method on a [`Thread`] atomically makes the token available +/// if it wasn't already. Because the token is initially absent, [`unpark`] +/// followed by [`park`] will result in the second call returning immediately. +/// +/// In other words, each [`Thread`] acts a bit like a spinlock that can be +/// locked and unlocked using `park` and `unpark`. +/// +/// Notice that being unblocked does not imply any synchronization with someone +/// that unparked this thread, it could also be spurious. +/// For example, it would be a valid, but inefficient, implementation to make both [`park`] and +/// [`unpark`] return immediately without doing anything. +/// +/// The API is typically used by acquiring a handle to the current thread, +/// placing that handle in a shared data structure so that other threads can +/// find it, and then `park`ing in a loop. When some desired condition is met, another +/// thread calls [`unpark`] on the handle. +/// +/// The motivation for this design is twofold: +/// +/// * It avoids the need to allocate mutexes and condvars when building new +/// synchronization primitives; the threads already provide basic +/// blocking/signaling. +/// +/// * It can be implemented very efficiently on many platforms. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use std::sync::{Arc, atomic::{Ordering, AtomicBool}}; +/// use std::time::Duration; +/// +/// let flag = Arc::new(AtomicBool::new(false)); +/// let flag2 = Arc::clone(&flag); +/// +/// let parked_thread = thread::spawn(move || { +/// // We want to wait until the flag is set. We *could* just spin, but using +/// // park/unpark is more efficient. +/// while !flag2.load(Ordering::Acquire) { +/// println!("Parking thread"); +/// thread::park(); +/// // We *could* get here spuriously, i.e., way before the 10ms below are over! +/// // But that is no problem, we are in a loop until the flag is set anyway. +/// println!("Thread unparked"); +/// } +/// println!("Flag received"); +/// }); +/// +/// // Let some time pass for the thread to be spawned. +/// thread::sleep(Duration::from_millis(10)); +/// +/// // Set the flag, and let the thread wake up. +/// // There is no race condition here, if `unpark` +/// // happens first, `park` will return immediately. +/// // Hence there is no risk of a deadlock. +/// flag.store(true, Ordering::Release); +/// println!("Unpark the thread"); +/// parked_thread.thread().unpark(); +/// +/// parked_thread.join().unwrap(); +/// ``` +/// +/// [`unpark`]: Thread::unpark +/// [`thread::park_timeout`]: park_timeout +#[stable(feature = "rust1", since = "1.0.0")] +pub fn park() { + // SAFETY: park_timeout is called on the parker owned by this thread. + unsafe { + current().inner.as_ref().parker().park(); + } +} + +/// Use [`park_timeout`]. +/// +/// Blocks unless or until the current thread's token is made available or +/// the specified duration has been reached (may wake spuriously). +/// +/// The semantics of this function are equivalent to [`park`] except +/// that the thread will be blocked for roughly no longer than `dur`. This +/// method should not be used for precise timing due to anomalies such as +/// preemption or platform differences that might not cause the maximum +/// amount of time waited to be precisely `ms` long. +/// +/// See the [park documentation][`park`] for more detail. +#[stable(feature = "rust1", since = "1.0.0")] +#[deprecated(since = "1.6.0", note = "replaced by `std::thread::park_timeout`")] +pub fn park_timeout_ms(ms: u32) { + park_timeout(Duration::from_millis(ms as u64)) +} + +/// Blocks unless or until the current thread's token is made available or +/// the specified duration has been reached (may wake spuriously). +/// +/// The semantics of this function are equivalent to [`park`][park] except +/// that the thread will be blocked for roughly no longer than `dur`. This +/// method should not be used for precise timing due to anomalies such as +/// preemption or platform differences that might not cause the maximum +/// amount of time waited to be precisely `dur` long. +/// +/// See the [park documentation][park] for more details. +/// +/// # Platform-specific behavior +/// +/// Platforms which do not support nanosecond precision for sleeping will have +/// `dur` rounded up to the nearest granularity of time they can sleep for. +/// +/// # Examples +/// +/// Waiting for the complete expiration of the timeout: +/// +/// ```rust,no_run +/// use std::thread::park_timeout; +/// use std::time::{Instant, Duration}; +/// +/// let timeout = Duration::from_secs(2); +/// let beginning_park = Instant::now(); +/// +/// let mut timeout_remaining = timeout; +/// loop { +/// park_timeout(timeout_remaining); +/// let elapsed = beginning_park.elapsed(); +/// if elapsed >= timeout { +/// break; +/// } +/// println!("restarting park_timeout after {elapsed:?}"); +/// timeout_remaining = timeout - elapsed; +/// } +/// ``` +#[stable(feature = "park_timeout", since = "1.4.0")] +pub fn park_timeout(dur: Duration) { + // SAFETY: park_timeout is called on the parker owned by this thread. + unsafe { + current().inner.as_ref().parker().park_timeout(dur); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// ThreadId +//////////////////////////////////////////////////////////////////////////////// + +/// A unique identifier for a running thread. +/// +/// A `ThreadId` is an opaque object that uniquely identifies each thread +/// created during the lifetime of a process. `ThreadId`s are guaranteed not to +/// be reused, even when a thread terminates. `ThreadId`s are under the control +/// of Rust's standard library and there may not be any relationship between +/// `ThreadId` and the underlying platform's notion of a thread identifier -- +/// the two concepts cannot, therefore, be used interchangeably. A `ThreadId` +/// can be retrieved from the [`id`] method on a [`Thread`]. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// +/// let other_thread = thread::spawn(|| { +/// thread::current().id() +/// }); +/// +/// let other_thread_id = other_thread.join().unwrap(); +/// assert!(thread::current().id() != other_thread_id); +/// ``` +/// +/// [`id`]: Thread::id +#[stable(feature = "thread_id", since = "1.19.0")] +#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)] +pub struct ThreadId(NonZeroU64); + +impl ThreadId { + // Generate a new unique thread ID. + fn new() -> ThreadId { + // It is UB to attempt to acquire this mutex reentrantly! + static GUARD: mutex::StaticMutex = mutex::StaticMutex::new(); + static mut COUNTER: u64 = 1; + + unsafe { + let guard = GUARD.lock(); + + // If we somehow use up all our bits, panic so that we're not + // covering up subtle bugs of IDs being reused. + if COUNTER == u64::MAX { + drop(guard); // in case the panic handler ends up calling `ThreadId::new()`, avoid reentrant lock acquire. + panic!("failed to generate unique thread ID: bitspace exhausted"); + } + + let id = COUNTER; + COUNTER += 1; + + ThreadId(NonZeroU64::new(id).unwrap()) + } + } + + /// This returns a numeric identifier for the thread identified by this + /// `ThreadId`. + /// + /// As noted in the documentation for the type itself, it is essentially an + /// opaque ID, but is guaranteed to be unique for each thread. The returned + /// value is entirely opaque -- only equality testing is stable. Note that + /// it is not guaranteed which values new threads will return, and this may + /// change across Rust versions. + #[must_use] + #[unstable(feature = "thread_id_value", issue = "67939")] + pub fn as_u64(&self) -> NonZeroU64 { + self.0 + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Thread +//////////////////////////////////////////////////////////////////////////////// + +/// The internal representation of a `Thread` handle +struct Inner { + name: Option<CString>, // Guaranteed to be UTF-8 + id: ThreadId, + parker: Parker, +} + +impl Inner { + fn parker(self: Pin<&Self>) -> Pin<&Parker> { + unsafe { Pin::map_unchecked(self, |inner| &inner.parker) } + } +} + +#[derive(Clone)] +#[stable(feature = "rust1", since = "1.0.0")] +/// A handle to a thread. +/// +/// Threads are represented via the `Thread` type, which you can get in one of +/// two ways: +/// +/// * By spawning a new thread, e.g., using the [`thread::spawn`][`spawn`] +/// function, and calling [`thread`][`JoinHandle::thread`] on the +/// [`JoinHandle`]. +/// * By requesting the current thread, using the [`thread::current`] function. +/// +/// The [`thread::current`] function is available even for threads not spawned +/// by the APIs of this module. +/// +/// There is usually no need to create a `Thread` struct yourself, one +/// should instead use a function like `spawn` to create new threads, see the +/// docs of [`Builder`] and [`spawn`] for more details. +/// +/// [`thread::current`]: current +pub struct Thread { + inner: Pin<Arc<Inner>>, +} + +impl Thread { + // Used only internally to construct a thread object without spawning + // Panics if the name contains nuls. + pub(crate) fn new(name: Option<CString>) -> Thread { + // We have to use `unsafe` here to construct the `Parker` in-place, + // which is required for the UNIX implementation. + // + // SAFETY: We pin the Arc immediately after creation, so its address never + // changes. + let inner = unsafe { + let mut arc = Arc::<Inner>::new_uninit(); + let ptr = Arc::get_mut_unchecked(&mut arc).as_mut_ptr(); + addr_of_mut!((*ptr).name).write(name); + addr_of_mut!((*ptr).id).write(ThreadId::new()); + Parker::new(addr_of_mut!((*ptr).parker)); + Pin::new_unchecked(arc.assume_init()) + }; + + Thread { inner } + } + + /// Atomically makes the handle's token available if it is not already. + /// + /// Every thread is equipped with some basic low-level blocking support, via + /// the [`park`][park] function and the `unpark()` method. These can be + /// used as a more CPU-efficient implementation of a spinlock. + /// + /// See the [park documentation][park] for more details. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// use std::time::Duration; + /// + /// let parked_thread = thread::Builder::new() + /// .spawn(|| { + /// println!("Parking thread"); + /// thread::park(); + /// println!("Thread unparked"); + /// }) + /// .unwrap(); + /// + /// // Let some time pass for the thread to be spawned. + /// thread::sleep(Duration::from_millis(10)); + /// + /// println!("Unpark the thread"); + /// parked_thread.thread().unpark(); + /// + /// parked_thread.join().unwrap(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[inline] + pub fn unpark(&self) { + self.inner.as_ref().parker().unpark(); + } + + /// Gets the thread's unique identifier. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let other_thread = thread::spawn(|| { + /// thread::current().id() + /// }); + /// + /// let other_thread_id = other_thread.join().unwrap(); + /// assert!(thread::current().id() != other_thread_id); + /// ``` + #[stable(feature = "thread_id", since = "1.19.0")] + #[must_use] + pub fn id(&self) -> ThreadId { + self.inner.id + } + + /// Gets the thread's name. + /// + /// For more information about named threads, see + /// [this module-level documentation][naming-threads]. + /// + /// # Examples + /// + /// Threads by default have no name specified: + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new(); + /// + /// let handler = builder.spawn(|| { + /// assert!(thread::current().name().is_none()); + /// }).unwrap(); + /// + /// handler.join().unwrap(); + /// ``` + /// + /// Thread with a specified name: + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new() + /// .name("foo".into()); + /// + /// let handler = builder.spawn(|| { + /// assert_eq!(thread::current().name(), Some("foo")) + /// }).unwrap(); + /// + /// handler.join().unwrap(); + /// ``` + /// + /// [naming-threads]: ./index.html#naming-threads + #[stable(feature = "rust1", since = "1.0.0")] + #[must_use] + pub fn name(&self) -> Option<&str> { + self.cname().map(|s| unsafe { str::from_utf8_unchecked(s.to_bytes()) }) + } + + fn cname(&self) -> Option<&CStr> { + self.inner.name.as_deref() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl fmt::Debug for Thread { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Thread") + .field("id", &self.id()) + .field("name", &self.name()) + .finish_non_exhaustive() + } +} + +//////////////////////////////////////////////////////////////////////////////// +// JoinHandle +//////////////////////////////////////////////////////////////////////////////// + +/// A specialized [`Result`] type for threads. +/// +/// Indicates the manner in which a thread exited. +/// +/// The value contained in the `Result::Err` variant +/// is the value the thread panicked with; +/// that is, the argument the `panic!` macro was called with. +/// Unlike with normal errors, this value doesn't implement +/// the [`Error`](crate::error::Error) trait. +/// +/// Thus, a sensible way to handle a thread panic is to either: +/// +/// 1. propagate the panic with [`std::panic::resume_unwind`] +/// 2. or in case the thread is intended to be a subsystem boundary +/// that is supposed to isolate system-level failures, +/// match on the `Err` variant and handle the panic in an appropriate way +/// +/// A thread that completes without panicking is considered to exit successfully. +/// +/// # Examples +/// +/// Matching on the result of a joined thread: +/// +/// ```no_run +/// use std::{fs, thread, panic}; +/// +/// fn copy_in_thread() -> thread::Result<()> { +/// thread::spawn(|| { +/// fs::copy("foo.txt", "bar.txt").unwrap(); +/// }).join() +/// } +/// +/// fn main() { +/// match copy_in_thread() { +/// Ok(_) => println!("copy succeeded"), +/// Err(e) => panic::resume_unwind(e), +/// } +/// } +/// ``` +/// +/// [`Result`]: crate::result::Result +/// [`std::panic::resume_unwind`]: crate::panic::resume_unwind +#[stable(feature = "rust1", since = "1.0.0")] +pub type Result<T> = crate::result::Result<T, Box<dyn Any + Send + 'static>>; + +// This packet is used to communicate the return value between the spawned +// thread and the rest of the program. It is shared through an `Arc` and +// there's no need for a mutex here because synchronization happens with `join()` +// (the caller will never read this packet until the thread has exited). +// +// An Arc to the packet is stored into a `JoinInner` which in turns is placed +// in `JoinHandle`. +struct Packet<'scope, T> { + scope: Option<Arc<scoped::ScopeData>>, + result: UnsafeCell<Option<Result<T>>>, + _marker: PhantomData<Option<&'scope scoped::ScopeData>>, +} + +// Due to the usage of `UnsafeCell` we need to manually implement Sync. +// The type `T` should already always be Send (otherwise the thread could not +// have been created) and the Packet is Sync because all access to the +// `UnsafeCell` synchronized (by the `join()` boundary), and `ScopeData` is Sync. +unsafe impl<'scope, T: Sync> Sync for Packet<'scope, T> {} + +impl<'scope, T> Drop for Packet<'scope, T> { + fn drop(&mut self) { + // If this packet was for a thread that ran in a scope, the thread + // panicked, and nobody consumed the panic payload, we make sure + // the scope function will panic. + let unhandled_panic = matches!(self.result.get_mut(), Some(Err(_))); + // Drop the result without causing unwinding. + // This is only relevant for threads that aren't join()ed, as + // join() will take the `result` and set it to None, such that + // there is nothing left to drop here. + // If this panics, we should handle that, because we're outside the + // outermost `catch_unwind` of our thread. + // We just abort in that case, since there's nothing else we can do. + // (And even if we tried to handle it somehow, we'd also need to handle + // the case where the panic payload we get out of it also panics on + // drop, and so on. See issue #86027.) + if let Err(_) = panic::catch_unwind(panic::AssertUnwindSafe(|| { + *self.result.get_mut() = None; + })) { + rtabort!("thread result panicked on drop"); + } + // Book-keeping so the scope knows when it's done. + if let Some(scope) = &self.scope { + // Now that there will be no more user code running on this thread + // that can use 'scope, mark the thread as 'finished'. + // It's important we only do this after the `result` has been dropped, + // since dropping it might still use things it borrowed from 'scope. + scope.decrement_num_running_threads(unhandled_panic); + } + } +} + +/// Inner representation for JoinHandle +struct JoinInner<'scope, T> { + native: imp::Thread, + thread: Thread, + packet: Arc<Packet<'scope, T>>, +} + +impl<'scope, T> JoinInner<'scope, T> { + fn join(mut self) -> Result<T> { + self.native.join(); + Arc::get_mut(&mut self.packet).unwrap().result.get_mut().take().unwrap() + } +} + +/// An owned permission to join on a thread (block on its termination). +/// +/// A `JoinHandle` *detaches* the associated thread when it is dropped, which +/// means that there is no longer any handle to the thread and no way to `join` +/// on it. +/// +/// Due to platform restrictions, it is not possible to [`Clone`] this +/// handle: the ability to join a thread is a uniquely-owned permission. +/// +/// This `struct` is created by the [`thread::spawn`] function and the +/// [`thread::Builder::spawn`] method. +/// +/// # Examples +/// +/// Creation from [`thread::spawn`]: +/// +/// ``` +/// use std::thread; +/// +/// let join_handle: thread::JoinHandle<_> = thread::spawn(|| { +/// // some work here +/// }); +/// ``` +/// +/// Creation from [`thread::Builder::spawn`]: +/// +/// ``` +/// use std::thread; +/// +/// let builder = thread::Builder::new(); +/// +/// let join_handle: thread::JoinHandle<_> = builder.spawn(|| { +/// // some work here +/// }).unwrap(); +/// ``` +/// +/// A thread being detached and outliving the thread that spawned it: +/// +/// ```no_run +/// use std::thread; +/// use std::time::Duration; +/// +/// let original_thread = thread::spawn(|| { +/// let _detached_thread = thread::spawn(|| { +/// // Here we sleep to make sure that the first thread returns before. +/// thread::sleep(Duration::from_millis(10)); +/// // This will be called, even though the JoinHandle is dropped. +/// println!("♫ Still alive ♫"); +/// }); +/// }); +/// +/// original_thread.join().expect("The thread being joined has panicked"); +/// println!("Original thread is joined."); +/// +/// // We make sure that the new thread has time to run, before the main +/// // thread returns. +/// +/// thread::sleep(Duration::from_millis(1000)); +/// ``` +/// +/// [`thread::Builder::spawn`]: Builder::spawn +/// [`thread::spawn`]: spawn +#[stable(feature = "rust1", since = "1.0.0")] +pub struct JoinHandle<T>(JoinInner<'static, T>); + +#[stable(feature = "joinhandle_impl_send_sync", since = "1.29.0")] +unsafe impl<T> Send for JoinHandle<T> {} +#[stable(feature = "joinhandle_impl_send_sync", since = "1.29.0")] +unsafe impl<T> Sync for JoinHandle<T> {} + +impl<T> JoinHandle<T> { + /// Extracts a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new(); + /// + /// let join_handle: thread::JoinHandle<_> = builder.spawn(|| { + /// // some work here + /// }).unwrap(); + /// + /// let thread = join_handle.thread(); + /// println!("thread id: {:?}", thread.id()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + #[must_use] + pub fn thread(&self) -> &Thread { + &self.0.thread + } + + /// Waits for the associated thread to finish. + /// + /// This function will return immediately if the associated thread has already finished. + /// + /// In terms of [atomic memory orderings], the completion of the associated + /// thread synchronizes with this function returning. In other words, all + /// operations performed by that thread [happen + /// before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses) all + /// operations that happen after `join` returns. + /// + /// If the associated thread panics, [`Err`] is returned with the parameter given + /// to [`panic!`]. + /// + /// [`Err`]: crate::result::Result::Err + /// [atomic memory orderings]: crate::sync::atomic + /// + /// # Panics + /// + /// This function may panic on some platforms if a thread attempts to join + /// itself or otherwise may create a deadlock with joining threads. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// let builder = thread::Builder::new(); + /// + /// let join_handle: thread::JoinHandle<_> = builder.spawn(|| { + /// // some work here + /// }).unwrap(); + /// join_handle.join().expect("Couldn't join on the associated thread"); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn join(self) -> Result<T> { + self.0.join() + } + + /// Checks if the associated thread has finished running its main function. + /// + /// `is_finished` supports implementing a non-blocking join operation, by checking + /// `is_finished`, and calling `join` if it returns `true`. This function does not block. To + /// block while waiting on the thread to finish, use [`join`][Self::join]. + /// + /// This might return `true` for a brief moment after the thread's main + /// function has returned, but before the thread itself has stopped running. + /// However, once this returns `true`, [`join`][Self::join] can be expected + /// to return quickly, without blocking for any significant amount of time. + #[stable(feature = "thread_is_running", since = "1.61.0")] + pub fn is_finished(&self) -> bool { + Arc::strong_count(&self.0.packet) == 1 + } +} + +impl<T> AsInner<imp::Thread> for JoinHandle<T> { + fn as_inner(&self) -> &imp::Thread { + &self.0.native + } +} + +impl<T> IntoInner<imp::Thread> for JoinHandle<T> { + fn into_inner(self) -> imp::Thread { + self.0.native + } +} + +#[stable(feature = "std_debug", since = "1.16.0")] +impl<T> fmt::Debug for JoinHandle<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JoinHandle").finish_non_exhaustive() + } +} + +fn _assert_sync_and_send() { + fn _assert_both<T: Send + Sync>() {} + _assert_both::<JoinHandle<()>>(); + _assert_both::<Thread>(); +} + +/// Returns an estimate of the default amount of parallelism a program should use. +/// +/// Parallelism is a resource. A given machine provides a certain capacity for +/// parallelism, i.e., a bound on the number of computations it can perform +/// simultaneously. This number often corresponds to the amount of CPUs a +/// computer has, but it may diverge in various cases. +/// +/// Host environments such as VMs or container orchestrators may want to +/// restrict the amount of parallelism made available to programs in them. This +/// is often done to limit the potential impact of (unintentionally) +/// resource-intensive programs on other programs running on the same machine. +/// +/// # Limitations +/// +/// The purpose of this API is to provide an easy and portable way to query +/// the default amount of parallelism the program should use. Among other things it +/// does not expose information on NUMA regions, does not account for +/// differences in (co)processor capabilities or current system load, +/// and will not modify the program's global state in order to more accurately +/// query the amount of available parallelism. +/// +/// Where both fixed steady-state and burst limits are available the steady-state +/// capacity will be used to ensure more predictable latencies. +/// +/// Resource limits can be changed during the runtime of a program, therefore the value is +/// not cached and instead recomputed every time this function is called. It should not be +/// called from hot code. +/// +/// The value returned by this function should be considered a simplified +/// approximation of the actual amount of parallelism available at any given +/// time. To get a more detailed or precise overview of the amount of +/// parallelism available to the program, you may wish to use +/// platform-specific APIs as well. The following platform limitations currently +/// apply to `available_parallelism`: +/// +/// On Windows: +/// - It may undercount the amount of parallelism available on systems with more +/// than 64 logical CPUs. However, programs typically need specific support to +/// take advantage of more than 64 logical CPUs, and in the absence of such +/// support, the number returned by this function accurately reflects the +/// number of logical CPUs the program can use by default. +/// - It may overcount the amount of parallelism available on systems limited by +/// process-wide affinity masks, or job object limitations. +/// +/// On Linux: +/// - It may overcount the amount of parallelism available when limited by a +/// process-wide affinity mask or cgroup quotas and `sched_getaffinity()` or cgroup fs can't be +/// queried, e.g. due to sandboxing. +/// - It may undercount the amount of parallelism if the current thread's affinity mask +/// does not reflect the process' cpuset, e.g. due to pinned threads. +/// - If the process is in a cgroup v1 cpu controller, this may need to +/// scan mountpoints to find the corresponding cgroup v1 controller, +/// which may take time on systems with large numbers of mountpoints. +/// (This does not apply to cgroup v2, or to processes not in a +/// cgroup.) +/// +/// On all targets: +/// - It may overcount the amount of parallelism available when running in a VM +/// with CPU usage limits (e.g. an overcommitted host). +/// +/// # Errors +/// +/// This function will, but is not limited to, return errors in the following +/// cases: +/// +/// - If the amount of parallelism is not known for the target platform. +/// - If the program lacks permission to query the amount of parallelism made +/// available to it. +/// +/// # Examples +/// +/// ``` +/// # #![allow(dead_code)] +/// use std::{io, thread}; +/// +/// fn main() -> io::Result<()> { +/// let count = thread::available_parallelism()?.get(); +/// assert!(count >= 1_usize); +/// Ok(()) +/// } +/// ``` +#[doc(alias = "available_concurrency")] // Alias for a previous name we gave this API on unstable. +#[doc(alias = "hardware_concurrency")] // Alias for C++ `std::thread::hardware_concurrency`. +#[doc(alias = "num_cpus")] // Alias for a popular ecosystem crate which provides similar functionality. +#[stable(feature = "available_parallelism", since = "1.59.0")] +pub fn available_parallelism() -> io::Result<NonZeroUsize> { + imp::available_parallelism() +} diff --git a/library/std/src/thread/scoped.rs b/library/std/src/thread/scoped.rs new file mode 100644 index 000000000..e6dbf35bd --- /dev/null +++ b/library/std/src/thread/scoped.rs @@ -0,0 +1,343 @@ +use super::{current, park, Builder, JoinInner, Result, Thread}; +use crate::fmt; +use crate::io; +use crate::marker::PhantomData; +use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; +use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use crate::sync::Arc; + +/// A scope to spawn scoped threads in. +/// +/// See [`scope`] for details. +#[stable(feature = "scoped_threads", since = "1.63.0")] +pub struct Scope<'scope, 'env: 'scope> { + data: Arc<ScopeData>, + /// Invariance over 'scope, to make sure 'scope cannot shrink, + /// which is necessary for soundness. + /// + /// Without invariance, this would compile fine but be unsound: + /// + /// ```compile_fail,E0373 + /// std::thread::scope(|s| { + /// s.spawn(|| { + /// let a = String::from("abcd"); + /// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped + /// }); + /// }); + /// ``` + scope: PhantomData<&'scope mut &'scope ()>, + env: PhantomData<&'env mut &'env ()>, +} + +/// An owned permission to join on a scoped thread (block on its termination). +/// +/// See [`Scope::spawn`] for details. +#[stable(feature = "scoped_threads", since = "1.63.0")] +pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>); + +pub(super) struct ScopeData { + num_running_threads: AtomicUsize, + a_thread_panicked: AtomicBool, + main_thread: Thread, +} + +impl ScopeData { + pub(super) fn increment_num_running_threads(&self) { + // We check for 'overflow' with usize::MAX / 2, to make sure there's no + // chance it overflows to 0, which would result in unsoundness. + if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 { + // This can only reasonably happen by mem::forget()'ing many many ScopedJoinHandles. + self.decrement_num_running_threads(false); + panic!("too many running threads in thread scope"); + } + } + pub(super) fn decrement_num_running_threads(&self, panic: bool) { + if panic { + self.a_thread_panicked.store(true, Ordering::Relaxed); + } + if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { + self.main_thread.unpark(); + } + } +} + +/// Create a scope for spawning scoped threads. +/// +/// The function passed to `scope` will be provided a [`Scope`] object, +/// through which scoped threads can be [spawned][`Scope::spawn`]. +/// +/// Unlike non-scoped threads, scoped threads can borrow non-`'static` data, +/// as the scope guarantees all threads will be joined at the end of the scope. +/// +/// All threads spawned within the scope that haven't been manually joined +/// will be automatically joined before this function returns. +/// +/// # Panics +/// +/// If any of the automatically joined threads panicked, this function will panic. +/// +/// If you want to handle panics from spawned threads, +/// [`join`][ScopedJoinHandle::join] them before the end of the scope. +/// +/// # Example +/// +/// ``` +/// use std::thread; +/// +/// let mut a = vec![1, 2, 3]; +/// let mut x = 0; +/// +/// thread::scope(|s| { +/// s.spawn(|| { +/// println!("hello from the first scoped thread"); +/// // We can borrow `a` here. +/// dbg!(&a); +/// }); +/// s.spawn(|| { +/// println!("hello from the second scoped thread"); +/// // We can even mutably borrow `x` here, +/// // because no other threads are using it. +/// x += a[0] + a[2]; +/// }); +/// println!("hello from the main thread"); +/// }); +/// +/// // After the scope, we can modify and access our variables again: +/// a.push(4); +/// assert_eq!(x, a.len()); +/// ``` +/// +/// # Lifetimes +/// +/// Scoped threads involve two lifetimes: `'scope` and `'env`. +/// +/// The `'scope` lifetime represents the lifetime of the scope itself. +/// That is: the time during which new scoped threads may be spawned, +/// and also the time during which they might still be running. +/// Once this lifetime ends, all scoped threads are joined. +/// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts. +/// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns. +/// +/// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads. +/// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`. +/// It can be as small as the call to `scope`, meaning that anything that outlives this call, +/// such as local variables defined right before the scope, can be borrowed by the scoped threads. +/// +/// The `'env: 'scope` bound is part of the definition of the `Scope` type. +#[track_caller] +#[stable(feature = "scoped_threads", since = "1.63.0")] +pub fn scope<'env, F, T>(f: F) -> T +where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, +{ + // We put the `ScopeData` into an `Arc` so that other threads can finish their + // `decrement_num_running_threads` even after this function returns. + let scope = Scope { + data: Arc::new(ScopeData { + num_running_threads: AtomicUsize::new(0), + main_thread: current(), + a_thread_panicked: AtomicBool::new(false), + }), + env: PhantomData, + scope: PhantomData, + }; + + // Run `f`, but catch panics so we can make sure to wait for all the threads to join. + let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); + + // Wait until all the threads are finished. + while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { + park(); + } + + // Throw any panic from `f`, or the return value of `f` if no thread panicked. + match result { + Err(e) => resume_unwind(e), + Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => { + panic!("a scoped thread panicked") + } + Ok(result) => result, + } +} + +impl<'scope, 'env> Scope<'scope, 'env> { + /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it. + /// + /// Unlike non-scoped threads, threads spawned with this function may + /// borrow non-`'static` data from the outside the scope. See [`scope`] for + /// details. + /// + /// The join handle provides a [`join`] method that can be used to join the spawned + /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing + /// the panic payload. + /// + /// If the join handle is dropped, the spawned thread will implicitly joined at the + /// end of the scope. In that case, if the spawned thread panics, [`scope`] will + /// panic after all threads are joined. + /// + /// This call will create a thread using default parameters of [`Builder`]. + /// If you want to specify the stack size or the name of the thread, use + /// [`Builder::spawn_scoped`] instead. + /// + /// # Panics + /// + /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`] + /// to recover from such errors. + /// + /// [`join`]: ScopedJoinHandle::join + #[stable(feature = "scoped_threads", since = "1.63.0")] + pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + Builder::new().spawn_scoped(self, f).expect("failed to spawn thread") + } +} + +impl Builder { + /// Spawns a new scoped thread using the settings set through this `Builder`. + /// + /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to + /// capture any failure to create the thread at the OS level. + /// + /// [`io::Result`]: crate::io::Result + /// + /// # Panics + /// + /// Panics if a thread name was set and it contained null bytes. + /// + /// # Example + /// + /// ``` + /// use std::thread; + /// + /// let mut a = vec![1, 2, 3]; + /// let mut x = 0; + /// + /// thread::scope(|s| { + /// thread::Builder::new() + /// .name("first".to_string()) + /// .spawn_scoped(s, || + /// { + /// println!("hello from the {:?} scoped thread", thread::current().name()); + /// // We can borrow `a` here. + /// dbg!(&a); + /// }) + /// .unwrap(); + /// thread::Builder::new() + /// .name("second".to_string()) + /// .spawn_scoped(s, || + /// { + /// println!("hello from the {:?} scoped thread", thread::current().name()); + /// // We can even mutably borrow `x` here, + /// // because no other threads are using it. + /// x += a[0] + a[2]; + /// }) + /// .unwrap(); + /// println!("hello from the main thread"); + /// }); + /// + /// // After the scope, we can modify and access our variables again: + /// a.push(4); + /// assert_eq!(x, a.len()); + /// ``` + #[stable(feature = "scoped_threads", since = "1.63.0")] + pub fn spawn_scoped<'scope, 'env, F, T>( + self, + scope: &'scope Scope<'scope, 'env>, + f: F, + ) -> io::Result<ScopedJoinHandle<'scope, T>> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?)) + } +} + +impl<'scope, T> ScopedJoinHandle<'scope, T> { + /// Extracts a handle to the underlying thread. + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// thread::scope(|s| { + /// let t = s.spawn(|| { + /// println!("hello"); + /// }); + /// println!("thread id: {:?}", t.thread().id()); + /// }); + /// ``` + #[must_use] + #[stable(feature = "scoped_threads", since = "1.63.0")] + pub fn thread(&self) -> &Thread { + &self.0.thread + } + + /// Waits for the associated thread to finish. + /// + /// This function will return immediately if the associated thread has already finished. + /// + /// In terms of [atomic memory orderings], the completion of the associated + /// thread synchronizes with this function returning. + /// In other words, all operations performed by that thread + /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses) + /// all operations that happen after `join` returns. + /// + /// If the associated thread panics, [`Err`] is returned with the panic payload. + /// + /// [atomic memory orderings]: crate::sync::atomic + /// + /// # Examples + /// + /// ``` + /// use std::thread; + /// + /// thread::scope(|s| { + /// let t = s.spawn(|| { + /// panic!("oh no"); + /// }); + /// assert!(t.join().is_err()); + /// }); + /// ``` + #[stable(feature = "scoped_threads", since = "1.63.0")] + pub fn join(self) -> Result<T> { + self.0.join() + } + + /// Checks if the associated thread has finished running its main function. + /// + /// `is_finished` supports implementing a non-blocking join operation, by checking + /// `is_finished`, and calling `join` if it returns `false`. This function does not block. To + /// block while waiting on the thread to finish, use [`join`][Self::join]. + /// + /// This might return `true` for a brief moment after the thread's main + /// function has returned, but before the thread itself has stopped running. + /// However, once this returns `true`, [`join`][Self::join] can be expected + /// to return quickly, without blocking for any significant amount of time. + #[stable(feature = "scoped_threads", since = "1.63.0")] + pub fn is_finished(&self) -> bool { + Arc::strong_count(&self.0.packet) == 1 + } +} + +#[stable(feature = "scoped_threads", since = "1.63.0")] +impl fmt::Debug for Scope<'_, '_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Scope") + .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed)) + .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed)) + .field("main_thread", &self.data.main_thread) + .finish_non_exhaustive() + } +} + +#[stable(feature = "scoped_threads", since = "1.63.0")] +impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ScopedJoinHandle").finish_non_exhaustive() + } +} diff --git a/library/std/src/thread/tests.rs b/library/std/src/thread/tests.rs new file mode 100644 index 000000000..ec68b5291 --- /dev/null +++ b/library/std/src/thread/tests.rs @@ -0,0 +1,331 @@ +use super::Builder; +use crate::any::Any; +use crate::mem; +use crate::panic::panic_any; +use crate::result; +use crate::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Sender}, + Arc, Barrier, +}; +use crate::thread::{self, Scope, ThreadId}; +use crate::time::Duration; +use crate::time::Instant; + +// !!! These tests are dangerous. If something is buggy, they will hang, !!! +// !!! instead of exiting cleanly. This might wedge the buildbots. !!! + +#[test] +fn test_unnamed_thread() { + thread::spawn(move || { + assert!(thread::current().name().is_none()); + }) + .join() + .ok() + .expect("thread panicked"); +} + +#[test] +fn test_named_thread() { + Builder::new() + .name("ada lovelace".to_string()) + .spawn(move || { + assert!(thread::current().name().unwrap() == "ada lovelace".to_string()); + }) + .unwrap() + .join() + .unwrap(); +} + +#[test] +#[should_panic] +fn test_invalid_named_thread() { + let _ = Builder::new().name("ada l\0velace".to_string()).spawn(|| {}); +} + +#[test] +fn test_run_basic() { + let (tx, rx) = channel(); + thread::spawn(move || { + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); +} + +#[test] +fn test_is_finished() { + let b = Arc::new(Barrier::new(2)); + let t = thread::spawn({ + let b = b.clone(); + move || { + b.wait(); + 1234 + } + }); + + // Thread is definitely running here, since it's still waiting for the barrier. + assert_eq!(t.is_finished(), false); + + // Unblock the barrier. + b.wait(); + + // Now check that t.is_finished() becomes true within a reasonable time. + let start = Instant::now(); + while !t.is_finished() { + assert!(start.elapsed() < Duration::from_secs(2)); + thread::sleep(Duration::from_millis(15)); + } + + // Joining the thread should not block for a significant time now. + let join_time = Instant::now(); + assert_eq!(t.join().unwrap(), 1234); + assert!(join_time.elapsed() < Duration::from_secs(2)); +} + +#[test] +fn test_join_panic() { + match thread::spawn(move || panic!()).join() { + result::Result::Err(_) => (), + result::Result::Ok(()) => panic!(), + } +} + +#[test] +fn test_spawn_sched() { + let (tx, rx) = channel(); + + fn f(i: i32, tx: Sender<()>) { + let tx = tx.clone(); + thread::spawn(move || { + if i == 0 { + tx.send(()).unwrap(); + } else { + f(i - 1, tx); + } + }); + } + f(10, tx); + rx.recv().unwrap(); +} + +#[test] +fn test_spawn_sched_childs_on_default_sched() { + let (tx, rx) = channel(); + + thread::spawn(move || { + thread::spawn(move || { + tx.send(()).unwrap(); + }); + }); + + rx.recv().unwrap(); +} + +fn avoid_copying_the_body<F>(spawnfn: F) +where + F: FnOnce(Box<dyn Fn() + Send>), +{ + let (tx, rx) = channel(); + + let x: Box<_> = Box::new(1); + let x_in_parent = (&*x) as *const i32 as usize; + + spawnfn(Box::new(move || { + let x_in_child = (&*x) as *const i32 as usize; + tx.send(x_in_child).unwrap(); + })); + + let x_in_child = rx.recv().unwrap(); + assert_eq!(x_in_parent, x_in_child); +} + +#[test] +fn test_avoid_copying_the_body_spawn() { + avoid_copying_the_body(|v| { + thread::spawn(move || v()); + }); +} + +#[test] +fn test_avoid_copying_the_body_thread_spawn() { + avoid_copying_the_body(|f| { + thread::spawn(move || { + f(); + }); + }) +} + +#[test] +fn test_avoid_copying_the_body_join() { + avoid_copying_the_body(|f| { + let _ = thread::spawn(move || f()).join(); + }) +} + +#[test] +fn test_child_doesnt_ref_parent() { + // If the child refcounts the parent thread, this will stack overflow when + // climbing the thread tree to dereference each ancestor. (See #1789) + // (well, it would if the constant were 8000+ - I lowered it to be more + // valgrind-friendly. try this at home, instead..!) + const GENERATIONS: u32 = 16; + fn child_no(x: u32) -> Box<dyn Fn() + Send> { + return Box::new(move || { + if x < GENERATIONS { + thread::spawn(move || child_no(x + 1)()); + } + }); + } + thread::spawn(|| child_no(0)()); +} + +#[test] +fn test_simple_newsched_spawn() { + thread::spawn(move || {}); +} + +#[test] +fn test_try_panic_message_string_literal() { + match thread::spawn(move || { + panic!("static string"); + }) + .join() + { + Err(e) => { + type T = &'static str; + assert!(e.is::<T>()); + assert_eq!(*e.downcast::<T>().unwrap(), "static string"); + } + Ok(()) => panic!(), + } +} + +#[test] +fn test_try_panic_any_message_owned_str() { + match thread::spawn(move || { + panic_any("owned string".to_string()); + }) + .join() + { + Err(e) => { + type T = String; + assert!(e.is::<T>()); + assert_eq!(*e.downcast::<T>().unwrap(), "owned string".to_string()); + } + Ok(()) => panic!(), + } +} + +#[test] +fn test_try_panic_any_message_any() { + match thread::spawn(move || { + panic_any(Box::new(413u16) as Box<dyn Any + Send>); + }) + .join() + { + Err(e) => { + type T = Box<dyn Any + Send>; + assert!(e.is::<T>()); + let any = e.downcast::<T>().unwrap(); + assert!(any.is::<u16>()); + assert_eq!(*any.downcast::<u16>().unwrap(), 413); + } + Ok(()) => panic!(), + } +} + +#[test] +fn test_try_panic_any_message_unit_struct() { + struct Juju; + + match thread::spawn(move || panic_any(Juju)).join() { + Err(ref e) if e.is::<Juju>() => {} + Err(_) | Ok(()) => panic!(), + } +} + +#[test] +fn test_park_timeout_unpark_before() { + for _ in 0..10 { + thread::current().unpark(); + thread::park_timeout(Duration::from_millis(u32::MAX as u64)); + } +} + +#[test] +fn test_park_timeout_unpark_not_called() { + for _ in 0..10 { + thread::park_timeout(Duration::from_millis(10)); + } +} + +#[test] +fn test_park_timeout_unpark_called_other_thread() { + for _ in 0..10 { + let th = thread::current(); + + let _guard = thread::spawn(move || { + super::sleep(Duration::from_millis(50)); + th.unpark(); + }); + + thread::park_timeout(Duration::from_millis(u32::MAX as u64)); + } +} + +#[test] +fn sleep_ms_smoke() { + thread::sleep(Duration::from_millis(2)); +} + +#[test] +fn test_size_of_option_thread_id() { + assert_eq!(mem::size_of::<Option<ThreadId>>(), mem::size_of::<ThreadId>()); +} + +#[test] +fn test_thread_id_equal() { + assert!(thread::current().id() == thread::current().id()); +} + +#[test] +fn test_thread_id_not_equal() { + let spawned_id = thread::spawn(|| thread::current().id()).join().unwrap(); + assert!(thread::current().id() != spawned_id); +} + +#[test] +fn test_scoped_threads_drop_result_before_join() { + let actually_finished = &AtomicBool::new(false); + struct X<'scope, 'env>(&'scope Scope<'scope, 'env>, &'env AtomicBool); + impl Drop for X<'_, '_> { + fn drop(&mut self) { + thread::sleep(Duration::from_millis(20)); + let actually_finished = self.1; + self.0.spawn(move || { + thread::sleep(Duration::from_millis(20)); + actually_finished.store(true, Ordering::Relaxed); + }); + } + } + thread::scope(|s| { + s.spawn(move || { + thread::sleep(Duration::from_millis(20)); + X(s, actually_finished) + }); + }); + assert!(actually_finished.load(Ordering::Relaxed)); +} + +#[test] +fn test_scoped_threads_nll() { + // this is mostly a *compilation test* for this exact function: + fn foo(x: &u8) { + thread::scope(|s| { + s.spawn(|| drop(x)); + }); + } + // let's also run it for good measure + let x = 42_u8; + foo(&x); +} |