// Copyright 2016 Amanieu d'Antras // // Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be // copied, modified, or distributed except according to those terms. use crate::util::UncheckedOptionExt; use core::{ fmt, mem, sync::atomic::{fence, AtomicU8, Ordering}, }; use parking_lot_core::{self, SpinWait, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN}; const DONE_BIT: u8 = 1; const POISON_BIT: u8 = 2; const LOCKED_BIT: u8 = 4; const PARKED_BIT: u8 = 8; /// Current state of a `Once`. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum OnceState { /// A closure has not been executed yet New, /// A closure was executed but panicked. Poisoned, /// A thread is currently executing a closure. InProgress, /// A closure has completed successfully. Done, } impl OnceState { /// Returns whether the associated `Once` has been poisoned. /// /// Once an initialization routine for a `Once` has panicked it will forever /// indicate to future forced initialization routines that it is poisoned. #[inline] pub fn poisoned(self) -> bool { match self { OnceState::Poisoned => true, _ => false, } } /// Returns whether the associated `Once` has successfully executed a /// closure. #[inline] pub fn done(self) -> bool { match self { OnceState::Done => true, _ => false, } } } /// A synchronization primitive which can be used to run a one-time /// initialization. Useful for one-time initialization for globals, FFI or /// related functionality. /// /// # Differences from the standard library `Once` /// /// - Only requires 1 byte of space, instead of 1 word. /// - Not required to be `'static`. /// - Relaxed memory barriers in the fast path, which can significantly improve /// performance on some architectures. /// - Efficient handling of micro-contention using adaptive spinning. /// /// # Examples /// /// ``` /// use parking_lot::Once; /// /// static START: Once = Once::new(); /// /// START.call_once(|| { /// // run initialization here /// }); /// ``` pub struct Once(AtomicU8); impl Once { /// Creates a new `Once` value. #[inline] pub const fn new() -> Once { Once(AtomicU8::new(0)) } /// Returns the current state of this `Once`. #[inline] pub fn state(&self) -> OnceState { let state = self.0.load(Ordering::Acquire); if state & DONE_BIT != 0 { OnceState::Done } else if state & LOCKED_BIT != 0 { OnceState::InProgress } else if state & POISON_BIT != 0 { OnceState::Poisoned } else { OnceState::New } } /// Performs an initialization routine once and only once. The given closure /// will be executed if this is the first time `call_once` has been called, /// and otherwise the routine will *not* be invoked. /// /// This method will block the calling thread if another initialization /// routine is currently running. /// /// When this function returns, it is guaranteed that some initialization /// has run and completed (it may not be the closure specified). It is also /// guaranteed that any memory writes performed by the executed closure can /// be reliably observed by other threads at this point (there is a /// happens-before relation between the closure and code executing after the /// return). /// /// # Examples /// /// ``` /// use parking_lot::Once; /// /// static mut VAL: usize = 0; /// static INIT: Once = Once::new(); /// /// // Accessing a `static mut` is unsafe much of the time, but if we do so /// // in a synchronized fashion (e.g. write once or read all) then we're /// // good to go! /// // /// // This function will only call `expensive_computation` once, and will /// // otherwise always return the value returned from the first invocation. /// fn get_cached_val() -> usize { /// unsafe { /// INIT.call_once(|| { /// VAL = expensive_computation(); /// }); /// VAL /// } /// } /// /// fn expensive_computation() -> usize { /// // ... /// # 2 /// } /// ``` /// /// # Panics /// /// The closure `f` will only be executed once if this is called /// concurrently amongst many threads. If that closure panics, however, then /// it will *poison* this `Once` instance, causing all future invocations of /// `call_once` to also panic. #[inline] pub fn call_once(&self, f: F) where F: FnOnce(), { if self.0.load(Ordering::Acquire) == DONE_BIT { return; } let mut f = Some(f); self.call_once_slow(false, &mut |_| unsafe { f.take().unchecked_unwrap()() }); } /// Performs the same function as `call_once` except ignores poisoning. /// /// If this `Once` has been poisoned (some initialization panicked) then /// this function will continue to attempt to call initialization functions /// until one of them doesn't panic. /// /// The closure `f` is yielded a structure which can be used to query the /// state of this `Once` (whether initialization has previously panicked or /// not). #[inline] pub fn call_once_force(&self, f: F) where F: FnOnce(OnceState), { if self.0.load(Ordering::Acquire) == DONE_BIT { return; } let mut f = Some(f); self.call_once_slow(true, &mut |state| unsafe { f.take().unchecked_unwrap()(state) }); } // This is a non-generic function to reduce the monomorphization cost of // using `call_once` (this isn't exactly a trivial or small implementation). // // Additionally, this is tagged with `#[cold]` as it should indeed be cold // and it helps let LLVM know that calls to this function should be off the // fast path. Essentially, this should help generate more straight line code // in LLVM. // // Finally, this takes an `FnMut` instead of a `FnOnce` because there's // currently no way to take an `FnOnce` and call it via virtual dispatch // without some allocation overhead. #[cold] fn call_once_slow(&self, ignore_poison: bool, f: &mut dyn FnMut(OnceState)) { let mut spinwait = SpinWait::new(); let mut state = self.0.load(Ordering::Relaxed); loop { // If another thread called the closure, we're done if state & DONE_BIT != 0 { // An acquire fence is needed here since we didn't load the // state with Ordering::Acquire. fence(Ordering::Acquire); return; } // If the state has been poisoned and we aren't forcing, then panic if state & POISON_BIT != 0 && !ignore_poison { // Need the fence here as well for the same reason fence(Ordering::Acquire); panic!("Once instance has previously been poisoned"); } // Grab the lock if it isn't locked, even if there is a queue on it. // We also clear the poison bit since we are going to try running // the closure again. if state & LOCKED_BIT == 0 { match self.0.compare_exchange_weak( state, (state | LOCKED_BIT) & !POISON_BIT, Ordering::Acquire, Ordering::Relaxed, ) { Ok(_) => break, Err(x) => state = x, } continue; } // If there is no queue, try spinning a few times if state & PARKED_BIT == 0 && spinwait.spin() { state = self.0.load(Ordering::Relaxed); continue; } // Set the parked bit if state & PARKED_BIT == 0 { if let Err(x) = self.0.compare_exchange_weak( state, state | PARKED_BIT, Ordering::Relaxed, Ordering::Relaxed, ) { state = x; continue; } } // Park our thread until we are woken up by the thread that owns the // lock. unsafe { let addr = self as *const _ as usize; let validate = || self.0.load(Ordering::Relaxed) == LOCKED_BIT | PARKED_BIT; let before_sleep = || {}; let timed_out = |_, _| unreachable!(); parking_lot_core::park( addr, validate, before_sleep, timed_out, DEFAULT_PARK_TOKEN, None, ); } // Loop back and check if the done bit was set spinwait.reset(); state = self.0.load(Ordering::Relaxed); } struct PanicGuard<'a>(&'a Once); impl<'a> Drop for PanicGuard<'a> { fn drop(&mut self) { // Mark the state as poisoned, unlock it and unpark all threads. let once = self.0; let state = once.0.swap(POISON_BIT, Ordering::Release); if state & PARKED_BIT != 0 { unsafe { let addr = once as *const _ as usize; parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); } } } } // At this point we have the lock, so run the closure. Make sure we // properly clean up if the closure panicks. let guard = PanicGuard(self); let once_state = if state & POISON_BIT != 0 { OnceState::Poisoned } else { OnceState::New }; f(once_state); mem::forget(guard); // Now unlock the state, set the done bit and unpark all threads let state = self.0.swap(DONE_BIT, Ordering::Release); if state & PARKED_BIT != 0 { unsafe { let addr = self as *const _ as usize; parking_lot_core::unpark_all(addr, DEFAULT_UNPARK_TOKEN); } } } } impl Default for Once { #[inline] fn default() -> Once { Once::new() } } impl fmt::Debug for Once { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Once") .field("state", &self.state()) .finish() } } #[cfg(test)] mod tests { use crate::Once; use std::panic; use std::sync::mpsc::channel; use std::thread; #[test] fn smoke_once() { static O: Once = Once::new(); let mut a = 0; O.call_once(|| a += 1); assert_eq!(a, 1); O.call_once(|| a += 1); assert_eq!(a, 1); } #[test] fn stampede_once() { static O: Once = Once::new(); static mut RUN: bool = false; let (tx, rx) = channel(); for _ in 0..10 { let tx = tx.clone(); thread::spawn(move || { for _ in 0..4 { thread::yield_now() } unsafe { O.call_once(|| { assert!(!RUN); RUN = true; }); assert!(RUN); } tx.send(()).unwrap(); }); } unsafe { O.call_once(|| { assert!(!RUN); RUN = true; }); assert!(RUN); } for _ in 0..10 { rx.recv().unwrap(); } } #[test] fn poison_bad() { static O: Once = Once::new(); // poison the once let t = panic::catch_unwind(|| { O.call_once(|| panic!()); }); assert!(t.is_err()); // poisoning propagates let t = panic::catch_unwind(|| { O.call_once(|| {}); }); assert!(t.is_err()); // we can subvert poisoning, however let mut called = false; O.call_once_force(|p| { called = true; assert!(p.poisoned()) }); assert!(called); // once any success happens, we stop propagating the poison O.call_once(|| {}); } #[test] fn wait_for_force_to_finish() { static O: Once = Once::new(); // poison the once let t = panic::catch_unwind(|| { O.call_once(|| panic!()); }); assert!(t.is_err()); // make sure someone's waiting inside the once via a force let (tx1, rx1) = channel(); let (tx2, rx2) = channel(); let t1 = thread::spawn(move || { O.call_once_force(|p| { assert!(p.poisoned()); tx1.send(()).unwrap(); rx2.recv().unwrap(); }); }); rx1.recv().unwrap(); // put another waiter on the once let t2 = thread::spawn(|| { let mut called = false; O.call_once(|| { called = true; }); assert!(!called); }); tx2.send(()).unwrap(); assert!(t1.join().is_ok()); assert!(t2.join().is_ok()); } #[test] fn test_once_debug() { static O: Once = Once::new(); assert_eq!(format!("{:?}", O), "Once { state: New }"); } }