diff options
Diffstat (limited to 'third_party/rust/parking_lot/src/once.rs')
-rw-r--r-- | third_party/rust/parking_lot/src/once.rs | 458 |
1 files changed, 458 insertions, 0 deletions
diff --git a/third_party/rust/parking_lot/src/once.rs b/third_party/rust/parking_lot/src/once.rs new file mode 100644 index 0000000000..f458c9c04b --- /dev/null +++ b/third_party/rust/parking_lot/src/once.rs @@ -0,0 +1,458 @@ +// Copyright 2016 Amanieu d'Antras +// +// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or +// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or +// http://opensource.org/licenses/MIT>, at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use crate::util::UncheckedOptionExt; +use core::{ + fmt, mem, + sync::atomic::{fence, AtomicU8, Ordering}, +}; +use parking_lot_core::{self, SpinWait, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; + +const DONE_BIT: u8 = 1; +const POISON_BIT: u8 = 2; +const LOCKED_BIT: u8 = 4; +const PARKED_BIT: u8 = 8; + +/// Current state of a `Once`. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum OnceState { + /// A closure has not been executed yet + New, + + /// A closure was executed but panicked. + Poisoned, + + /// A thread is currently executing a closure. + InProgress, + + /// A closure has completed successfully. + Done, +} + +impl OnceState { + /// Returns whether the associated `Once` has been poisoned. + /// + /// Once an initialization routine for a `Once` has panicked it will forever + /// indicate to future forced initialization routines that it is poisoned. + #[inline] + pub fn poisoned(self) -> bool { + match self { + OnceState::Poisoned => true, + _ => false, + } + } + + /// Returns whether the associated `Once` has successfully executed a + /// closure. + #[inline] + pub fn done(self) -> bool { + match self { + OnceState::Done => true, + _ => false, + } + } +} + +/// A synchronization primitive which can be used to run a one-time +/// initialization. Useful for one-time initialization for globals, FFI or +/// related functionality. +/// +/// # Differences from the standard library `Once` +/// +/// - Only requires 1 byte of space, instead of 1 word. +/// - Not required to be `'static`. +/// - Relaxed memory barriers in the fast path, which can significantly improve +/// performance on some architectures. +/// - Efficient handling of micro-contention using adaptive spinning. +/// +/// # Examples +/// +/// ``` +/// use parking_lot::Once; +/// +/// static START: Once = Once::new(); +/// +/// START.call_once(|| { +/// // run initialization here +/// }); +/// ``` +pub struct Once(AtomicU8); + +impl Once { + /// Creates a new `Once` value. + #[inline] + pub const fn new() -> Once { + Once(AtomicU8::new(0)) + } + + /// Returns the current state of this `Once`. + #[inline] + pub fn state(&self) -> OnceState { + let state = self.0.load(Ordering::Acquire); + if state & DONE_BIT != 0 { + OnceState::Done + } else if state & LOCKED_BIT != 0 { + OnceState::InProgress + } else if state & POISON_BIT != 0 { + OnceState::Poisoned + } else { + OnceState::New + } + } + + /// Performs an initialization routine once and only once. The given closure + /// will be executed if this is the first time `call_once` has been called, + /// and otherwise the routine will *not* be invoked. + /// + /// This method will block the calling thread if another initialization + /// routine is currently running. + /// + /// When this function returns, it is guaranteed that some initialization + /// has run and completed (it may not be the closure specified). It is also + /// guaranteed that any memory writes performed by the executed closure can + /// be reliably observed by other threads at this point (there is a + /// happens-before relation between the closure and code executing after the + /// return). + /// + /// # Examples + /// + /// ``` + /// use parking_lot::Once; + /// + /// static mut VAL: usize = 0; + /// static INIT: Once = Once::new(); + /// + /// // Accessing a `static mut` is unsafe much of the time, but if we do so + /// // in a synchronized fashion (e.g. write once or read all) then we're + /// // good to go! + /// // + /// // This function will only call `expensive_computation` once, and will + /// // otherwise always return the value returned from the first invocation. + /// fn get_cached_val() -> usize { + /// unsafe { + /// INIT.call_once(|| { + /// VAL = expensive_computation(); + /// }); + /// VAL + /// } + /// } + /// + /// fn expensive_computation() -> usize { + /// // ... + /// # 2 + /// } + /// ``` + /// + /// # Panics + /// + /// The closure `f` will only be executed once if this is called + /// concurrently amongst many threads. If that closure panics, however, then + /// it will *poison* this `Once` instance, causing all future invocations of + /// `call_once` to also panic. + #[inline] + pub fn call_once<F>(&self, f: F) + where + F: FnOnce(), + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(false, &mut |_| unsafe { f.take().unchecked_unwrap()() }); + } + + /// Performs the same function as `call_once` except ignores poisoning. + /// + /// If this `Once` has been poisoned (some initialization panicked) then + /// this function will continue to attempt to call initialization functions + /// until one of them doesn't panic. + /// + /// The closure `f` is yielded a structure which can be used to query the + /// state of this `Once` (whether initialization has previously panicked or + /// not). + #[inline] + pub fn call_once_force<F>(&self, f: F) + where + F: FnOnce(OnceState), + { + if self.0.load(Ordering::Acquire) == DONE_BIT { + return; + } + + let mut f = Some(f); + self.call_once_slow(true, &mut |state| unsafe { + f.take().unchecked_unwrap()(state) + }); + } + + // This is a non-generic function to reduce the monomorphization cost of + // using `call_once` (this isn't exactly a trivial or small implementation). + // + // Additionally, this is tagged with `#[cold]` as it should indeed be cold + // and it helps let LLVM know that calls to this function should be off the + // fast path. Essentially, this should help generate more straight line code + // in LLVM. + // + // Finally, this takes an `FnMut` instead of a `FnOnce` because there's + // currently no way to take an `FnOnce` and call it via virtual dispatch + // without some allocation overhead. + #[cold] + fn call_once_slow(&self, ignore_poison: bool, f: &mut dyn FnMut(OnceState)) { + let mut spinwait = SpinWait::new(); + let mut state = self.0.load(Ordering::Relaxed); + loop { + // If another thread called the closure, we're done + if state & DONE_BIT != 0 { + // An acquire fence is needed here since we didn't load the + // state with Ordering::Acquire. + fence(Ordering::Acquire); + return; + } + + // If the state has been poisoned and we aren't forcing, then panic + if state & POISON_BIT != 0 && !ignore_poison { + // Need the fence here as well for the same reason + fence(Ordering::Acquire); + panic!("Once instance has previously been poisoned"); + } + + // Grab the lock if it isn't locked, even if there is a queue on it. + // We also clear the poison bit since we are going to try running + // the closure again. + if state & LOCKED_BIT == 0 { + match self.0.compare_exchange_weak( + state, + (state | LOCKED_BIT) & !POISON_BIT, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(x) => state = x, + } + continue; + } + + // If there is no queue, try spinning a few times + if state & PARKED_BIT == 0 && spinwait.spin() { + state = self.0.load(Ordering::Relaxed); + continue; + } + + // Set the parked bit + if state & PARKED_BIT == 0 { + if let Err(x) = self.0.compare_exchange_weak( + state, + state | PARKED_BIT, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + state = x; + continue; + } + } + + // Park our thread until we are woken up by the thread that owns the + // lock. + unsafe { + let addr = self as *const _ as usize; + let validate = || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT; + let before_sleep = || {}; + let timed_out = |_, _| unreachable!(); + parking_lot_core::park( + addr, + validate, + before_sleep, + timed_out, + DEFAULT_PARK_TOKEN, + None, + ); + } + + // Loop back and check if the done bit was set + spinwait.reset(); + state = self.0.load(Ordering::Relaxed); + } + + struct PanicGuard<'a>(&'a Once); + impl<'a> Drop for PanicGuard<'a> { + fn drop(&mut self) { + // Mark the state as poisoned, unlock it and unpark all threads. + let once = self.0; + let state = once.0.swap(POISON_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = once as *const _ as usize; + parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); + } + } + } + } + + // At this point we have the lock, so run the closure. Make sure we + // properly clean up if the closure panicks. + let guard = PanicGuard(self); + let once_state = if state & POISON_BIT != 0 { + OnceState::Poisoned + } else { + OnceState::New + }; + f(once_state); + mem::forget(guard); + + // Now unlock the state, set the done bit and unpark all threads + let state = self.0.swap(DONE_BIT, Ordering::Release); + if state & PARKED_BIT != 0 { + unsafe { + let addr = self as *const _ as usize; + parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); + } + } + } +} + +impl Default for Once { + #[inline] + fn default() -> Once { + Once::new() + } +} + +impl fmt::Debug for Once { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Once") + .field("state", &self.state()) + .finish() + } +} + +#[cfg(test)] +mod tests { + use crate::Once; + use std::panic; + use std::sync::mpsc::channel; + use std::thread; + + #[test] + fn smoke_once() { + static O: Once = Once::new(); + let mut a = 0; + O.call_once(|| a += 1); + assert_eq!(a, 1); + O.call_once(|| a += 1); + assert_eq!(a, 1); + } + + #[test] + fn stampede_once() { + static O: Once = Once::new(); + static mut RUN: bool = false; + + let (tx, rx) = channel(); + for _ in 0..10 { + let tx = tx.clone(); + thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + tx.send(()).unwrap(); + }); + } + + unsafe { + O.call_once(|| { + assert!(!RUN); + RUN = true; + }); + assert!(RUN); + } + + for _ in 0..10 { + rx.recv().unwrap(); + } + } + + #[test] + fn poison_bad() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // poisoning propagates + let t = panic::catch_unwind(|| { + O.call_once(|| {}); + }); + assert!(t.is_err()); + + // we can subvert poisoning, however + let mut called = false; + O.call_once_force(|p| { + called = true; + assert!(p.poisoned()) + }); + assert!(called); + + // once any success happens, we stop propagating the poison + O.call_once(|| {}); + } + + #[test] + fn wait_for_force_to_finish() { + static O: Once = Once::new(); + + // poison the once + let t = panic::catch_unwind(|| { + O.call_once(|| panic!()); + }); + assert!(t.is_err()); + + // make sure someone's waiting inside the once via a force + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t1 = thread::spawn(move || { + O.call_once_force(|p| { + assert!(p.poisoned()); + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + }); + + rx1.recv().unwrap(); + + // put another waiter on the once + let t2 = thread::spawn(|| { + let mut called = false; + O.call_once(|| { + called = true; + }); + assert!(!called); + }); + + tx2.send(()).unwrap(); + + assert!(t1.join().is_ok()); + assert!(t2.join().is_ok()); + } + + #[test] + fn test_once_debug() { + static O: Once = Once::new(); + + assert_eq!(format!("{:?}", O), "Once { state: New }"); + } +} |