summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/once_cell.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/sync/once_cell.rs')
-rw-r--r--third_party/rust/tokio/src/sync/once_cell.rs457
1 files changed, 457 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/once_cell.rs b/third_party/rust/tokio/src/sync/once_cell.rs
new file mode 100644
index 0000000000..d31a40e2c8
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/once_cell.rs
@@ -0,0 +1,457 @@
+use super::{Semaphore, SemaphorePermit, TryAcquireError};
+use crate::loom::cell::UnsafeCell;
+use std::error::Error;
+use std::fmt;
+use std::future::Future;
+use std::mem::MaybeUninit;
+use std::ops::Drop;
+use std::ptr;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+// This file contains an implementation of an OnceCell. The principle
+// behind the safety the of the cell is that any thread with an `&OnceCell` may
+// access the `value` field according the following rules:
+//
+// 1. When `value_set` is false, the `value` field may be modified by the
+// thread holding the permit on the semaphore.
+// 2. When `value_set` is true, the `value` field may be accessed immutably by
+// any thread.
+//
+// It is an invariant that if the semaphore is closed, then `value_set` is true.
+// The reverse does not necessarily hold — but if not, the semaphore may not
+// have any available permits.
+//
+// A thread with a `&mut OnceCell` may modify the value in any way it wants as
+// long as the invariants are upheld.
+
+/// A thread-safe cell that can be written to only once.
+///
+/// A `OnceCell` is typically used for global variables that need to be
+/// initialized once on first use, but need no further changes. The `OnceCell`
+/// in Tokio allows the initialization procedure to be asynchronous.
+///
+/// # Examples
+///
+/// ```
+/// use tokio::sync::OnceCell;
+///
+/// async fn some_computation() -> u32 {
+/// 1 + 1
+/// }
+///
+/// static ONCE: OnceCell<u32> = OnceCell::const_new();
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let result = ONCE.get_or_init(some_computation).await;
+/// assert_eq!(*result, 2);
+/// }
+/// ```
+///
+/// It is often useful to write a wrapper method for accessing the value.
+///
+/// ```
+/// use tokio::sync::OnceCell;
+///
+/// static ONCE: OnceCell<u32> = OnceCell::const_new();
+///
+/// async fn get_global_integer() -> &'static u32 {
+/// ONCE.get_or_init(|| async {
+/// 1 + 1
+/// }).await
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let result = get_global_integer().await;
+/// assert_eq!(*result, 2);
+/// }
+/// ```
+pub struct OnceCell<T> {
+ value_set: AtomicBool,
+ value: UnsafeCell<MaybeUninit<T>>,
+ semaphore: Semaphore,
+}
+
+impl<T> Default for OnceCell<T> {
+ fn default() -> OnceCell<T> {
+ OnceCell::new()
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for OnceCell<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("OnceCell")
+ .field("value", &self.get())
+ .finish()
+ }
+}
+
+impl<T: Clone> Clone for OnceCell<T> {
+ fn clone(&self) -> OnceCell<T> {
+ OnceCell::new_with(self.get().cloned())
+ }
+}
+
+impl<T: PartialEq> PartialEq for OnceCell<T> {
+ fn eq(&self, other: &OnceCell<T>) -> bool {
+ self.get() == other.get()
+ }
+}
+
+impl<T: Eq> Eq for OnceCell<T> {}
+
+impl<T> Drop for OnceCell<T> {
+ fn drop(&mut self) {
+ if self.initialized_mut() {
+ unsafe {
+ self.value
+ .with_mut(|ptr| ptr::drop_in_place((&mut *ptr).as_mut_ptr()));
+ };
+ }
+ }
+}
+
+impl<T> From<T> for OnceCell<T> {
+ fn from(value: T) -> Self {
+ let semaphore = Semaphore::new(0);
+ semaphore.close();
+ OnceCell {
+ value_set: AtomicBool::new(true),
+ value: UnsafeCell::new(MaybeUninit::new(value)),
+ semaphore,
+ }
+ }
+}
+
+impl<T> OnceCell<T> {
+ /// Creates a new empty `OnceCell` instance.
+ pub fn new() -> Self {
+ OnceCell {
+ value_set: AtomicBool::new(false),
+ value: UnsafeCell::new(MaybeUninit::uninit()),
+ semaphore: Semaphore::new(1),
+ }
+ }
+
+ /// Creates a new `OnceCell` that contains the provided value, if any.
+ ///
+ /// If the `Option` is `None`, this is equivalent to `OnceCell::new`.
+ ///
+ /// [`OnceCell::new`]: crate::sync::OnceCell::new
+ pub fn new_with(value: Option<T>) -> Self {
+ if let Some(v) = value {
+ OnceCell::from(v)
+ } else {
+ OnceCell::new()
+ }
+ }
+
+ /// Creates a new empty `OnceCell` instance.
+ ///
+ /// Equivalent to `OnceCell::new`, except that it can be used in static
+ /// variables.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use tokio::sync::OnceCell;
+ ///
+ /// static ONCE: OnceCell<u32> = OnceCell::const_new();
+ ///
+ /// async fn get_global_integer() -> &'static u32 {
+ /// ONCE.get_or_init(|| async {
+ /// 1 + 1
+ /// }).await
+ /// }
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let result = get_global_integer().await;
+ /// assert_eq!(*result, 2);
+ /// }
+ /// ```
+ #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
+ #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
+ pub const fn const_new() -> Self {
+ OnceCell {
+ value_set: AtomicBool::new(false),
+ value: UnsafeCell::new(MaybeUninit::uninit()),
+ semaphore: Semaphore::const_new(1),
+ }
+ }
+
+ /// Returns `true` if the `OnceCell` currently contains a value, and `false`
+ /// otherwise.
+ pub fn initialized(&self) -> bool {
+ // Using acquire ordering so any threads that read a true from this
+ // atomic is able to read the value.
+ self.value_set.load(Ordering::Acquire)
+ }
+
+ /// Returns `true` if the `OnceCell` currently contains a value, and `false`
+ /// otherwise.
+ fn initialized_mut(&mut self) -> bool {
+ *self.value_set.get_mut()
+ }
+
+ // SAFETY: The OnceCell must not be empty.
+ unsafe fn get_unchecked(&self) -> &T {
+ &*self.value.with(|ptr| (*ptr).as_ptr())
+ }
+
+ // SAFETY: The OnceCell must not be empty.
+ unsafe fn get_unchecked_mut(&mut self) -> &mut T {
+ &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr())
+ }
+
+ fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T {
+ // SAFETY: We are holding the only permit on the semaphore.
+ unsafe {
+ self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
+ }
+
+ // Using release ordering so any threads that read a true from this
+ // atomic is able to read the value we just stored.
+ self.value_set.store(true, Ordering::Release);
+ self.semaphore.close();
+ permit.forget();
+
+ // SAFETY: We just initialized the cell.
+ unsafe { self.get_unchecked() }
+ }
+
+ /// Returns a reference to the value currently stored in the `OnceCell`, or
+ /// `None` if the `OnceCell` is empty.
+ pub fn get(&self) -> Option<&T> {
+ if self.initialized() {
+ Some(unsafe { self.get_unchecked() })
+ } else {
+ None
+ }
+ }
+
+ /// Returns a mutable reference to the value currently stored in the
+ /// `OnceCell`, or `None` if the `OnceCell` is empty.
+ ///
+ /// Since this call borrows the `OnceCell` mutably, it is safe to mutate the
+ /// value inside the `OnceCell` — the mutable borrow statically guarantees
+ /// no other references exist.
+ pub fn get_mut(&mut self) -> Option<&mut T> {
+ if self.initialized_mut() {
+ Some(unsafe { self.get_unchecked_mut() })
+ } else {
+ None
+ }
+ }
+
+ /// Sets the value of the `OnceCell` to the given value if the `OnceCell` is
+ /// empty.
+ ///
+ /// If the `OnceCell` already has a value, this call will fail with an
+ /// [`SetError::AlreadyInitializedError`].
+ ///
+ /// If the `OnceCell` is empty, but some other task is currently trying to
+ /// set the value, this call will fail with [`SetError::InitializingError`].
+ ///
+ /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError
+ /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError
+ pub fn set(&self, value: T) -> Result<(), SetError<T>> {
+ if self.initialized() {
+ return Err(SetError::AlreadyInitializedError(value));
+ }
+
+ // Another task might be initializing the cell, in which case
+ // `try_acquire` will return an error. If we succeed to acquire the
+ // permit, then we can set the value.
+ match self.semaphore.try_acquire() {
+ Ok(permit) => {
+ debug_assert!(!self.initialized());
+ self.set_value(value, permit);
+ Ok(())
+ }
+ Err(TryAcquireError::NoPermits) => {
+ // Some other task is holding the permit. That task is
+ // currently trying to initialize the value.
+ Err(SetError::InitializingError(value))
+ }
+ Err(TryAcquireError::Closed) => {
+ // The semaphore was closed. Some other task has initialized
+ // the value.
+ Err(SetError::AlreadyInitializedError(value))
+ }
+ }
+ }
+
+ /// Gets the value currently in the `OnceCell`, or initialize it with the
+ /// given asynchronous operation.
+ ///
+ /// If some other task is currently working on initializing the `OnceCell`,
+ /// this call will wait for that other task to finish, then return the value
+ /// that the other task produced.
+ ///
+ /// If the provided operation is cancelled or panics, the initialization
+ /// attempt is cancelled. If there are other tasks waiting for the value to
+ /// be initialized, one of them will start another attempt at initializing
+ /// the value.
+ ///
+ /// This will deadlock if `f` tries to initialize the cell recursively.
+ pub async fn get_or_init<F, Fut>(&self, f: F) -> &T
+ where
+ F: FnOnce() -> Fut,
+ Fut: Future<Output = T>,
+ {
+ if self.initialized() {
+ // SAFETY: The OnceCell has been fully initialized.
+ unsafe { self.get_unchecked() }
+ } else {
+ // Here we try to acquire the semaphore permit. Holding the permit
+ // will allow us to set the value of the OnceCell, and prevents
+ // other tasks from initializing the OnceCell while we are holding
+ // it.
+ match self.semaphore.acquire().await {
+ Ok(permit) => {
+ debug_assert!(!self.initialized());
+
+ // If `f()` panics or `select!` is called, this
+ // `get_or_init` call is aborted and the semaphore permit is
+ // dropped.
+ let value = f().await;
+
+ self.set_value(value, permit)
+ }
+ Err(_) => {
+ debug_assert!(self.initialized());
+
+ // SAFETY: The semaphore has been closed. This only happens
+ // when the OnceCell is fully initialized.
+ unsafe { self.get_unchecked() }
+ }
+ }
+ }
+ }
+
+ /// Gets the value currently in the `OnceCell`, or initialize it with the
+ /// given asynchronous operation.
+ ///
+ /// If some other task is currently working on initializing the `OnceCell`,
+ /// this call will wait for that other task to finish, then return the value
+ /// that the other task produced.
+ ///
+ /// If the provided operation returns an error, is cancelled or panics, the
+ /// initialization attempt is cancelled. If there are other tasks waiting
+ /// for the value to be initialized, one of them will start another attempt
+ /// at initializing the value.
+ ///
+ /// This will deadlock if `f` tries to initialize the cell recursively.
+ pub async fn get_or_try_init<E, F, Fut>(&self, f: F) -> Result<&T, E>
+ where
+ F: FnOnce() -> Fut,
+ Fut: Future<Output = Result<T, E>>,
+ {
+ if self.initialized() {
+ // SAFETY: The OnceCell has been fully initialized.
+ unsafe { Ok(self.get_unchecked()) }
+ } else {
+ // Here we try to acquire the semaphore permit. Holding the permit
+ // will allow us to set the value of the OnceCell, and prevents
+ // other tasks from initializing the OnceCell while we are holding
+ // it.
+ match self.semaphore.acquire().await {
+ Ok(permit) => {
+ debug_assert!(!self.initialized());
+
+ // If `f()` panics or `select!` is called, this
+ // `get_or_try_init` call is aborted and the semaphore
+ // permit is dropped.
+ let value = f().await;
+
+ match value {
+ Ok(value) => Ok(self.set_value(value, permit)),
+ Err(e) => Err(e),
+ }
+ }
+ Err(_) => {
+ debug_assert!(self.initialized());
+
+ // SAFETY: The semaphore has been closed. This only happens
+ // when the OnceCell is fully initialized.
+ unsafe { Ok(self.get_unchecked()) }
+ }
+ }
+ }
+ }
+
+ /// Takes the value from the cell, destroying the cell in the process.
+ /// Returns `None` if the cell is empty.
+ pub fn into_inner(mut self) -> Option<T> {
+ if self.initialized_mut() {
+ // Set to uninitialized for the destructor of `OnceCell` to work properly
+ *self.value_set.get_mut() = false;
+ Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) })
+ } else {
+ None
+ }
+ }
+
+ /// Takes ownership of the current value, leaving the cell empty. Returns
+ /// `None` if the cell is empty.
+ pub fn take(&mut self) -> Option<T> {
+ std::mem::take(self).into_inner()
+ }
+}
+
+// Since `get` gives us access to immutable references of the OnceCell, OnceCell
+// can only be Sync if T is Sync, otherwise OnceCell would allow sharing
+// references of !Sync values across threads. We need T to be Send in order for
+// OnceCell to by Sync because we can use `set` on `&OnceCell<T>` to send values
+// (of type T) across threads.
+unsafe impl<T: Sync + Send> Sync for OnceCell<T> {}
+
+// Access to OnceCell's value is guarded by the semaphore permit
+// and atomic operations on `value_set`, so as long as T itself is Send
+// it's safe to send it to another thread
+unsafe impl<T: Send> Send for OnceCell<T> {}
+
+/// Errors that can be returned from [`OnceCell::set`].
+///
+/// [`OnceCell::set`]: crate::sync::OnceCell::set
+#[derive(Debug, PartialEq)]
+pub enum SetError<T> {
+ /// The cell was already initialized when [`OnceCell::set`] was called.
+ ///
+ /// [`OnceCell::set`]: crate::sync::OnceCell::set
+ AlreadyInitializedError(T),
+
+ /// The cell is currently being initialized.
+ InitializingError(T),
+}
+
+impl<T> fmt::Display for SetError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
+ SetError::InitializingError(_) => write!(f, "InitializingError"),
+ }
+ }
+}
+
+impl<T: fmt::Debug> Error for SetError<T> {}
+
+impl<T> SetError<T> {
+ /// Whether `SetError` is `SetError::AlreadyInitializedError`.
+ pub fn is_already_init_err(&self) -> bool {
+ match self {
+ SetError::AlreadyInitializedError(_) => true,
+ SetError::InitializingError(_) => false,
+ }
+ }
+
+ /// Whether `SetError` is `SetError::InitializingError`
+ pub fn is_initializing_err(&self) -> bool {
+ match self {
+ SetError::AlreadyInitializedError(_) => false,
+ SetError::InitializingError(_) => true,
+ }
+ }
+}